Annotation of mqtt/src/pubmqtt.c, revision 1.1.2.15

1.1.2.1   misho       1: #include "global.h"
                      2: 
                      3: 
1.1.2.12  misho       4: extern const char sql_schema[];
                      5: 
                      6: 
1.1.2.2   misho       7: /*
                      8:  * mqtt_db_log() Log database connection message
                      9:  *
                     10:  * @fmt = format string
                     11:  * @... = argument list
                     12:  * return: none
                     13:  */
                     14: static void
                     15: mqtt_rtlm_log(const char *fmt, ...)
                     16: {
                     17:        va_list lst;
                     18: 
                     19:        va_start(lst, fmt);
                     20:        vsyslog(LOG_ERR, fmt, lst);
                     21:        va_end(lst);
                     22: }
1.1.2.3   misho      23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
1.1.2.2   misho      24:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     25: 
                     26: 
                     27: /*
                     28:  * mqtt_rtlm_open() Open database connection
                     29:  *
1.1.2.3   misho      30:  * @cfg = config filename
1.1.2.2   misho      31:  * return: NULL error or SQL handle
                     32:  */
                     33: sqlite3 *
                     34: mqtt_rtlm_open(sl_config *cfg)
                     35: {
                     36:        sqlite3 *sql = NULL;
                     37:        const char *str = NULL;
                     38: 
                     39:        if (!cfg)
                     40:                return NULL;
                     41: 
1.1.2.12  misho      42:        sqlite3_config(SQLITE_CONFIG_SERIALIZED);
                     43: 
1.1.2.2   misho      44:        str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
                     45:        if (!str) {
                     46:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     47:                return NULL;
                     48:        }
                     49: 
1.1.2.13  misho      50:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
1.1.2.3   misho      51:                MQTT_RTLM_LOG(sql);
1.1.2.2   misho      52:                sqlite3_close(sql);
                     53:                return NULL;
                     54:        }
                     55: 
1.1.2.12  misho      56:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                     57:                MQTT_RTLM_LOG(sql);
                     58:                sqlite3_close(sql);
                     59:                return NULL;
                     60:        }
1.1.2.2   misho      61:        return sql;
                     62: }
                     63: 
                     64: /*
                     65:  * mqtt_rtlm_close() Close database connection
                     66:  *
                     67:  * @sql = SQL handle
                     68:  * return: none
                     69:  */
                     70: void
                     71: mqtt_rtlm_close(sqlite3 *sql)
                     72: {
                     73:        sqlite3_close(sql);
                     74: }
1.1.2.4   misho      75: 
                     76: /*
                     77:  * mqtt_rtlm_init_session() Create session
                     78:  *
                     79:  * @cfg = loaded config
                     80:  * @sql = SQL handle
                     81:  * @user = username
1.1.2.14  misho      82:  * @connid = connection id
1.1.2.4   misho      83:  * @host = hostname
                     84:  * @port = port
1.1.2.14  misho      85:  * @will = will flag if !=0 must fill arguments
                     86:  * @... = will arguments in order topic,msg,qos,retain
1.1.2.4   misho      87:  * return: -1 error, 0 session already appears or >0 row changed
                     88:  */
                     89: int
1.1.2.14  misho      90: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, 
                     91:                const char *host, u_short port, char will, ...)
1.1.2.4   misho      92: {
1.1.2.14  misho      93:        va_list lst;
1.1.2.4   misho      94:        int ret = 0;
                     95:        char *str, szStmt[BUFSIZ] = { 0 };
                     96:        sqlite3_stmt *stmt;
                     97: 
                     98:        if (!cfg || !sql)
                     99:                return -1;
                    100: 
                    101:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    102:        if (!str) {
1.1.2.8   misho     103:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.4   misho     104:                return -1;
                    105:        }
1.1.2.14  misho     106:        if (!will)
                    107:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, ConnID, RemoteHost, RemotePort, "
                    108:                                "WillFlag) VALUES ('%s', '%s', '%s', %d, 0);", str, user, connid, host, port);
                    109:        else {
                    110:                va_start(lst, will);
                    111:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, ConnID, RemoteHost, RemotePort, "
                    112:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
                    113:                                "VALUES ('%s', '%s', '%s', %d, %d, %d, %d, '%s', '%s');", 
                    114:                                str, user, connid, host, port, will, 
                    115:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                    116:                va_end(lst);
                    117:        }
1.1.2.4   misho     118: 
                    119:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    120:                MQTT_RTLM_LOG(sql);
                    121:                return -1;
                    122:        }
                    123:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    124:                ret = sqlite3_changes(sql);
                    125:        else {
                    126:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    127:                        MQTT_RTLM_LOG(sql);
                    128:                ret = 0;
                    129:        }
                    130:        sqlite3_finalize(stmt);
                    131: 
                    132:        return ret;
                    133: }
1.1.2.5   misho     134: 
                    135: /*
                    136:  * mqtt_rtlm_fini_session() Delete session(s)
                    137:  *
                    138:  * @cfg = loaded config
                    139:  * @sql = SQL handle
                    140:  * @user = username
1.1.2.14  misho     141:  * @connid = connection id
1.1.2.5   misho     142:  * @host = hostname
                    143:  * return: -1 error, 0 session already appears or >0 row changed
                    144:  */
                    145: int
1.1.2.14  misho     146: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, const char *host)
1.1.2.5   misho     147: {
                    148:        int ret = 0;
                    149:        char *str, szStmt[BUFSIZ] = { 0 };
                    150:        sqlite3_stmt *stmt;
                    151: 
                    152:        if (!cfg || !sql)
                    153:                return -1;
                    154: 
                    155:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    156:        if (!str) {
1.1.2.8   misho     157:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.5   misho     158:                return -1;
                    159:        }
1.1.2.14  misho     160:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Username = '%s' AND ConnID = '%s' "
                    161:                        "AND RemoteHost LIKE '%s';", str, user, connid, host);
1.1.2.5   misho     162: 
                    163:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    164:                MQTT_RTLM_LOG(sql);
                    165:                return -1;
                    166:        }
                    167:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    168:                ret = sqlite3_changes(sql);
                    169:        else {
                    170:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    171:                        MQTT_RTLM_LOG(sql);
                    172:                ret = 0;
                    173:        }
                    174:        sqlite3_finalize(stmt);
                    175: 
                    176:        return ret;
                    177: }
1.1.2.6   misho     178: 
                    179: /*
                    180:  * mqtt_rtlm_chk_session() Check session(s)
                    181:  *
                    182:  * @cfg = loaded config
                    183:  * @sql = SQL handle
                    184:  * @user = username
1.1.2.14  misho     185:  * @connid = connection id
1.1.2.6   misho     186:  * @host = hostname
                    187:  * return: -1 error, 0 not logged or >0 logged found rows
                    188:  */
                    189: int
1.1.2.14  misho     190: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, const char *host)
1.1.2.6   misho     191: {
                    192:        int ret = 0;
                    193:        char *str, szStmt[BUFSIZ] = { 0 };
                    194:        sqlite3_stmt *stmt;
                    195: 
                    196:        if (!cfg || !sql)
                    197:                return -1;
                    198: 
                    199:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    200:        if (!str) {
1.1.2.8   misho     201:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.6   misho     202:                return -1;
                    203:        }
1.1.2.14  misho     204:        snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost, RemotePort FROM %s WHERE "
                    205:                        "Username = '%s' AND ConnID = '%s' AND RemoteHost LIKE '%s';", 
                    206:                        str, user, connid, host);
1.1.2.6   misho     207: 
                    208:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    209:                MQTT_RTLM_LOG(sql);
                    210:                return -1;
                    211:        }
1.1.2.7   misho     212:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    213:                ret = sqlite3_changes(sql);
                    214:        else
                    215:                ret = 0;
1.1.2.6   misho     216:        sqlite3_finalize(stmt);
                    217: 
                    218:        return ret;
                    219: }
1.1.2.8   misho     220: 
                    221: /*
                    222:  * mqtt_rtlm_write_topic() Publish topic
                    223:  *
                    224:  * @cfg = loaded config
                    225:  * @sql = SQL handle
1.1.2.14  misho     226:  * @msgid = MessageID
1.1.2.8   misho     227:  * @topic = topic
                    228:  * @txt = text
                    229:  * @user = username
                    230:  * @host = hostname
                    231:  * @retain = !=0 retain message to database
                    232:  * return: -1 error, 0 no publish or >0 published ok
                    233:  */
                    234: int
1.1.2.14  misho     235: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
1.1.2.8   misho     236:                const char *user, const char *host, char retain)
                    237: {
                    238:        int ret = 0;
                    239:        char *str, szStmt[BUFSIZ] = { 0 };
                    240:        sqlite3_stmt *stmt;
                    241: 
                    242:        if (!cfg || !sql || !topic)
                    243:                return -1;
                    244: 
                    245:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    246:        if (!str) {
                    247:                mqtt_rtlm_log("Error:: not found topics table name");
                    248:                return -1;
                    249:        }
1.1.2.14  misho     250:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
                    251:                        "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
                    252:                        "datetime('now', 'localtime'), '%s');", 
                    253:                        str, retain, msgid, topic, txt, user, host);
1.1.2.8   misho     254: 
                    255:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    256:                MQTT_RTLM_LOG(sql);
                    257:                return -1;
                    258:        }
                    259:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    260:                ret = sqlite3_changes(sql);
                    261:        else {
                    262:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    263:                        MQTT_RTLM_LOG(sql);
                    264:                ret = 0;
                    265:        }
                    266:        sqlite3_finalize(stmt);
                    267: 
                    268:        return ret;
                    269: }
                    270: 
                    271: /*
                    272:  * mqtt_rtlm_delete_topic() Delete topic
                    273:  *
                    274:  * @cfg = loaded config
                    275:  * @sql = SQL handle
1.1.2.14  misho     276:  * @msgid = MessageID
1.1.2.8   misho     277:  * @topic = topic
                    278:  * @user = username
                    279:  * @host = hostname
                    280:  * @retain = -1 no matter
                    281:  * return: -1 error, 0 no changes or >0 deleted rows
                    282:  */
                    283: int
1.1.2.14  misho     284: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
1.1.2.8   misho     285:                const char *user, const char *host, char retain)
                    286: {
                    287:        int ret = 0;
                    288:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
                    289:        sqlite3_stmt *stmt;
                    290: 
                    291:        if (!cfg || !sql || !topic)
                    292:                return -1;
                    293: 
                    294:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    295:        if (!str) {
                    296:                mqtt_rtlm_log("Error:: not found topics table name");
                    297:                return -1;
                    298:        }
                    299:        switch (retain) {
                    300:                case -1:
                    301:                        rtn = "";
                    302:                        break;
                    303:                case 0:
                    304:                        rtn = "AND Retain = 0";
                    305:                        break;
                    306:                default:
                    307:                        rtn = "AND Retain != 0";
                    308:                        break;
                    309:        }
1.1.2.14  misho     310:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
1.1.2.8   misho     311:                        "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
1.1.2.14  misho     312:                        msgid, topic, user, host, rtn);
1.1.2.8   misho     313: 
                    314:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    315:                MQTT_RTLM_LOG(sql);
                    316:                return -1;
                    317:        }
                    318:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    319:                ret = sqlite3_changes(sql);
                    320:        else {
                    321:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    322:                        MQTT_RTLM_LOG(sql);
                    323:                ret = 0;
                    324:        }
                    325:        sqlite3_finalize(stmt);
                    326: 
                    327:        return ret;
                    328: }
1.1.2.9   misho     329: 
                    330: /*
                    331:  * mqtt_rtlm_read_topic() Get topic
                    332:  *
                    333:  * @cfg = loaded config
                    334:  * @sql = SQL handle
1.1.2.14  misho     335:  * @msgid = MessageID
1.1.2.9   misho     336:  * @topic = topic
1.1.2.10  misho     337:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    338:  * return: NULL error or not found and !=NULL allocated subscribe topics
1.1.2.9   misho     339:  */
1.1.2.10  misho     340: mqtt_subscr_t *
1.1.2.14  misho     341: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
1.1.2.9   misho     342: {
1.1.2.10  misho     343:        int rowz = 0;
                    344:        char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
1.1.2.9   misho     345:        sqlite3_stmt *stmt;
1.1.2.10  misho     346:        register int j;
                    347:        mqtt_subscr_t *s = NULL;
1.1.2.9   misho     348: 
1.1.2.10  misho     349:        if (!cfg || !sql || !topic)
                    350:                return NULL;
                    351: 
                    352:        switch (retain) {
                    353:                case -1:
                    354:                        memset(szStr, 0, sizeof szStr);
                    355:                        break;
                    356:                case 0:
                    357:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    358:                        break;
                    359:                default:
                    360:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    361:                        break;
                    362:        }
1.1.2.9   misho     363: 
                    364:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    365:        if (!str) {
                    366:                mqtt_rtlm_log("Error:: not found topics table name");
1.1.2.10  misho     367:                return NULL;
1.1.2.9   misho     368:        }
1.1.2.14  misho     369:        snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
                    370:                        "MsgID = %d AND Topic LIKE '%s' %s;", 
                    371:                        str, msgid, topic, szStr);
1.1.2.9   misho     372: 
                    373:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    374:                MQTT_RTLM_LOG(sql);
1.1.2.10  misho     375:                return NULL;
1.1.2.9   misho     376:        }
1.1.2.10  misho     377: 
                    378:        /* calculate count of rows and allocate subscribe items */
                    379:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    380:                rowz++;
                    381:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    382:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    383:                goto end;
                    384:        } else
                    385:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    386:        sqlite3_reset(stmt);
                    387: 
                    388:        /* fill with data */
                    389:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
1.1.2.11  misho     390:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.1.2.15! misho     391:                s[j].sub_topic._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
        !           392:                s[j].sub_topic._size = strlen((char*) s[j].sub_topic._base);
        !           393:                s[j].sub_value._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
        !           394:                s[j].sub_value._size = strlen((char*) s[j].sub_value._base);
1.1.2.9   misho     395:        }
1.1.2.10  misho     396: end:
1.1.2.9   misho     397:        sqlite3_finalize(stmt);
                    398: 
1.1.2.10  misho     399:        return s;
1.1.2.9   misho     400: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>