Annotation of mqtt/src/pubmqtt.c, revision 1.2.2.9

1.2       misho       1: #include "global.h"
                      2: 
                      3: 
                      4: extern const char sql_schema[];
                      5: 
                      6: 
                      7: /*
                      8:  * mqtt_db_log() Log database connection message
                      9:  *
                     10:  * @fmt = format string
                     11:  * @... = argument list
                     12:  * return: none
                     13:  */
                     14: static void
                     15: mqtt_rtlm_log(const char *fmt, ...)
                     16: {
                     17:        va_list lst;
                     18: 
                     19:        va_start(lst, fmt);
                     20:        vsyslog(LOG_ERR, fmt, lst);
                     21:        va_end(lst);
                     22: }
1.2.2.5   misho      23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
                     24:                                        __func__, __LINE__, \
1.2       misho      25:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     26: 
                     27: 
                     28: /*
                     29:  * mqtt_rtlm_open() Open database connection
                     30:  *
                     31:  * @cfg = config filename
                     32:  * return: NULL error or SQL handle
                     33:  */
                     34: sqlite3 *
1.2.2.3   misho      35: mqtt_rtlm_open(cfg_root_t *cfg)
1.2       misho      36: {
                     37:        sqlite3 *sql = NULL;
                     38:        const char *str = NULL;
                     39: 
                     40:        if (!cfg)
                     41:                return NULL;
                     42: 
1.2.2.3   misho      43:        str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2       misho      44:        if (!str) {
                     45:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     46:                return NULL;
                     47:        }
                     48: 
                     49:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
                     50:                MQTT_RTLM_LOG(sql);
                     51:                sqlite3_close(sql);
                     52:                return NULL;
                     53:        }
                     54: 
                     55:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                     56:                MQTT_RTLM_LOG(sql);
                     57:                sqlite3_close(sql);
                     58:                return NULL;
                     59:        }
                     60:        return sql;
                     61: }
                     62: 
                     63: /*
                     64:  * mqtt_rtlm_close() Close database connection
                     65:  *
                     66:  * @sql = SQL handle
                     67:  * return: none
                     68:  */
                     69: void
                     70: mqtt_rtlm_close(sqlite3 *sql)
                     71: {
                     72:        sqlite3_close(sql);
                     73: }
                     74: 
                     75: /*
                     76:  * mqtt_rtlm_init_session() Create session
                     77:  *
                     78:  * @cfg = loaded config
                     79:  * @sql = SQL handle
                     80:  * @connid = connection id
                     81:  * @user = username
                     82:  * @host = hostname
                     83:  * @will = will flag if !=0 must fill arguments
                     84:  * @... = will arguments in order topic,msg,qos,retain
                     85:  * return: -1 error, 0 session already appears or >0 row changed
                     86:  */
                     87: int
1.2.2.3   misho      88: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
1.2       misho      89:                const char *host, char will, ...)
                     90: {
                     91:        va_list lst;
                     92:        int ret = 0;
                     93:        char *str, szStmt[BUFSIZ] = { 0 };
                     94:        sqlite3_stmt *stmt;
                     95: 
                     96:        if (!cfg || !sql)
                     97:                return -1;
                     98: 
1.2.2.3   misho      99:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     100:        if (!str) {
                    101:                mqtt_rtlm_log("Error:: not found online table name");
                    102:                return -1;
                    103:        }
                    104:        if (!will)
                    105:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                    106:                                "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
                    107:        else {
                    108:                va_start(lst, will);
                    109:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                    110:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
                    111:                                "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
                    112:                                str, connid, user, host, will, 
                    113:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                    114:                va_end(lst);
                    115:        }
                    116: 
                    117:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    118:                MQTT_RTLM_LOG(sql);
                    119:                return -1;
                    120:        }
                    121:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    122:                ret = sqlite3_changes(sql);
                    123:        else {
                    124:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    125:                        MQTT_RTLM_LOG(sql);
                    126:                ret = 0;
                    127:        }
                    128:        sqlite3_finalize(stmt);
                    129: 
                    130:        return ret;
                    131: }
                    132: 
                    133: /*
                    134:  * mqtt_rtlm_fini_session() Delete session(s)
                    135:  *
                    136:  * @cfg = loaded config
                    137:  * @sql = SQL handle
                    138:  * @connid = connection id
                    139:  * @user = username
                    140:  * @host = hostname
                    141:  * return: -1 error, 0 session already appears or >0 row changed
                    142:  */
                    143: int
1.2.2.3   misho     144: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     145: {
                    146:        int ret = 0;
                    147:        char *str, szStmt[BUFSIZ] = { 0 };
                    148:        sqlite3_stmt *stmt;
                    149: 
                    150:        if (!cfg || !sql)
                    151:                return -1;
                    152: 
1.2.2.3   misho     153:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     154:        if (!str) {
                    155:                mqtt_rtlm_log("Error:: not found online table name");
                    156:                return -1;
                    157:        }
                    158:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
                    159:                        "AND RemoteHost LIKE '%s';", str, connid, user, host);
                    160: 
                    161:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    162:                MQTT_RTLM_LOG(sql);
                    163:                return -1;
                    164:        }
                    165:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    166:                ret = sqlite3_changes(sql);
                    167:        else {
                    168:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    169:                        MQTT_RTLM_LOG(sql);
                    170:                ret = 0;
                    171:        }
                    172:        sqlite3_finalize(stmt);
                    173: 
                    174:        return ret;
                    175: }
                    176: 
                    177: /*
                    178:  * mqtt_rtlm_chk_session() Check session(s)
                    179:  *
                    180:  * @cfg = loaded config
                    181:  * @sql = SQL handle
                    182:  * @connid = connection id
                    183:  * @user = username
                    184:  * @host = hostname
                    185:  * return: -1 error, 0 not logged or >0 logged found rows
                    186:  */
                    187: int
1.2.2.3   misho     188: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     189: {
                    190:        int ret = 0;
                    191:        char *str, szStmt[BUFSIZ] = { 0 };
                    192:        sqlite3_stmt *stmt;
                    193: 
                    194:        if (!cfg || !sql)
                    195:                return -1;
                    196: 
1.2.2.3   misho     197:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     198:        if (!str) {
                    199:                mqtt_rtlm_log("Error:: not found online table name");
                    200:                return -1;
                    201:        }
                    202:        snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
1.2.2.8   misho     203:                        "ConnID = '%s' AND Username LIKE '%s' AND RemoteHost LIKE '%s';", 
1.2       misho     204:                        str, connid, user, host);
                    205: 
                    206:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    207:                MQTT_RTLM_LOG(sql);
                    208:                return -1;
                    209:        }
                    210:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    211:                ret = sqlite3_changes(sql);
                    212:        else
                    213:                ret = 0;
                    214:        sqlite3_finalize(stmt);
                    215: 
                    216:        return ret;
                    217: }
                    218: 
                    219: /*
                    220:  * mqtt_rtlm_write_topic() Publish topic
                    221:  *
                    222:  * @cfg = loaded config
                    223:  * @sql = SQL handle
1.2.2.8   misho     224:  * @connid = connection id
1.2       misho     225:  * @msgid = MessageID
                    226:  * @topic = topic
                    227:  * @txt = text
                    228:  * @user = username
                    229:  * @host = hostname
                    230:  * @retain = !=0 retain message to database
                    231:  * return: -1 error, 0 no publish or >0 published ok
                    232:  */
                    233: int
1.2.2.8   misho     234: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    235:                const char *topic, const char *txt, const char *user, const char *host, char retain)
1.2       misho     236: {
                    237:        int ret = 0;
                    238:        char *str, szStmt[BUFSIZ] = { 0 };
                    239:        sqlite3_stmt *stmt;
                    240: 
                    241:        if (!cfg || !sql || !topic)
                    242:                return -1;
                    243: 
1.2.2.3   misho     244:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     245:        if (!str) {
                    246:                mqtt_rtlm_log("Error:: not found topics table name");
                    247:                return -1;
                    248:        }
1.2.2.8   misho     249:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, ConnID, MsgID, Topic, Value, PubUser, "
                    250:                        "PubDate, PubHost) VALUES (%d, '%s', %d, '%s', '%s', '%s', "
1.2       misho     251:                        "datetime('now', 'localtime'), '%s');", 
1.2.2.8   misho     252:                        str, retain, connid, msgid, topic, txt, user, host);
                    253: 
                    254:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    255:                MQTT_RTLM_LOG(sql);
                    256:                return -1;
                    257:        }
                    258:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    259:                ret = sqlite3_changes(sql);
                    260:        else {
                    261:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    262:                        MQTT_RTLM_LOG(sql);
                    263:                ret = 0;
                    264:        }
                    265:        sqlite3_finalize(stmt);
                    266: 
                    267:        return ret;
                    268: }
                    269: 
                    270: /*
                    271:  * mqtt_rtlm_wipe_topic() Wipe all topics
                    272:  *
                    273:  * @cfg = loaded config
                    274:  * @sql = SQL handle
                    275:  * @connid = connection id
                    276:  * @user = username
                    277:  * @retain = -1 no matter
                    278:  * return: -1 error, 0 no changes or >0 deleted rows
                    279:  */
                    280: int
                    281: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
                    282: {
                    283:        int ret = 0;
                    284:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
                    285:        sqlite3_stmt *stmt;
                    286: 
                    287:        if (!cfg || !sql || !connid)
                    288:                return -1;
                    289: 
                    290:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
                    291:        if (!str) {
                    292:                mqtt_rtlm_log("Error:: not found topics table name");
                    293:                return -1;
                    294:        }
                    295:        switch (retain) {
                    296:                case -1:
                    297:                        rtn = "";
                    298:                        break;
                    299:                case 0:
                    300:                        rtn = "AND Retain = 0";
                    301:                        break;
                    302:                default:
                    303:                        rtn = "AND Retain != 0";
                    304:                        break;
                    305:        }
                    306:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND "
                    307:                        "PubUser LIKE '%s' %s;", str, connid, user, rtn);
1.2       misho     308: 
                    309:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    310:                MQTT_RTLM_LOG(sql);
                    311:                return -1;
                    312:        }
                    313:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    314:                ret = sqlite3_changes(sql);
                    315:        else {
                    316:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    317:                        MQTT_RTLM_LOG(sql);
                    318:                ret = 0;
                    319:        }
                    320:        sqlite3_finalize(stmt);
                    321: 
                    322:        return ret;
                    323: }
                    324: 
                    325: /*
                    326:  * mqtt_rtlm_delete_topic() Delete topic
                    327:  *
                    328:  * @cfg = loaded config
                    329:  * @sql = SQL handle
1.2.2.8   misho     330:  * @connid = connection id
1.2       misho     331:  * @msgid = MessageID
                    332:  * @topic = topic
                    333:  * @user = username
                    334:  * @host = hostname
                    335:  * @retain = -1 no matter
                    336:  * return: -1 error, 0 no changes or >0 deleted rows
                    337:  */
                    338: int
1.2.2.8   misho     339: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    340:                const char *topic, const char *user, const char *host, char retain)
1.2       misho     341: {
                    342:        int ret = 0;
                    343:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
                    344:        sqlite3_stmt *stmt;
                    345: 
                    346:        if (!cfg || !sql || !topic)
                    347:                return -1;
                    348: 
1.2.2.3   misho     349:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     350:        if (!str) {
                    351:                mqtt_rtlm_log("Error:: not found topics table name");
                    352:                return -1;
                    353:        }
                    354:        switch (retain) {
                    355:                case -1:
                    356:                        rtn = "";
                    357:                        break;
                    358:                case 0:
                    359:                        rtn = "AND Retain = 0";
                    360:                        break;
                    361:                default:
                    362:                        rtn = "AND Retain != 0";
                    363:                        break;
                    364:        }
1.2.2.8   misho     365:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND MsgID = %d AND "
                    366:                        "Topic LIKE '%s' AND PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
                    367:                        connid, msgid, topic, user, host, rtn);
1.2       misho     368: 
                    369:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    370:                MQTT_RTLM_LOG(sql);
                    371:                return -1;
                    372:        }
                    373:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    374:                ret = sqlite3_changes(sql);
                    375:        else {
                    376:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    377:                        MQTT_RTLM_LOG(sql);
                    378:                ret = 0;
                    379:        }
                    380:        sqlite3_finalize(stmt);
                    381: 
                    382:        return ret;
                    383: }
                    384: 
                    385: /*
                    386:  * mqtt_rtlm_read_topic() Get topic
                    387:  *
                    388:  * @cfg = loaded config
                    389:  * @sql = SQL handle
1.2.2.8   misho     390:  * @connid = connection id
1.2       misho     391:  * @msgid = MessageID
                    392:  * @topic = topic
                    393:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    394:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    395:  */
                    396: mqtt_subscr_t *
1.2.2.8   misho     397: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    398:                const char *topic, char retain)
1.2       misho     399: {
                    400:        int rowz = 0;
                    401:        char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
                    402:        sqlite3_stmt *stmt;
                    403:        register int j;
                    404:        mqtt_subscr_t *s = NULL;
                    405: 
                    406:        if (!cfg || !sql || !topic)
                    407:                return NULL;
                    408: 
                    409:        switch (retain) {
                    410:                case -1:
                    411:                        memset(szStr, 0, sizeof szStr);
                    412:                        break;
                    413:                case 0:
                    414:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    415:                        break;
                    416:                default:
                    417:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    418:                        break;
                    419:        }
                    420: 
1.2.2.3   misho     421:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     422:        if (!str) {
                    423:                mqtt_rtlm_log("Error:: not found topics table name");
                    424:                return NULL;
                    425:        }
                    426:        snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
1.2.2.8   misho     427:                        "ConnID = '%s' AND MsgID = %d AND Topic LIKE '%s' %s;", 
                    428:                        str, connid, msgid, topic, szStr);
1.2       misho     429: 
                    430:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    431:                MQTT_RTLM_LOG(sql);
                    432:                return NULL;
                    433:        }
                    434: 
                    435:        /* calculate count of rows and allocate subscribe items */
                    436:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    437:                rowz++;
1.2.2.9 ! misho     438:        if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
1.2       misho     439:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    440:                goto end;
                    441:        } else
                    442:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    443:        sqlite3_reset(stmt);
                    444: 
                    445:        /* fill with data */
                    446:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    447:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.9 ! misho     448:                s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
1.2.2.1   misho     449:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
1.2.2.9 ! misho     450:                s[j].sub_value.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 2));
1.2.2.1   misho     451:                s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
1.2       misho     452:        }
                    453: end:
                    454:        sqlite3_finalize(stmt);
                    455: 
                    456:        return s;
                    457: }
1.2.2.2   misho     458: 
                    459: /*
                    460:  * mqtt_rtlm_write_subscribe() Subscribe topic
                    461:  *
                    462:  * @cfg = loaded config
                    463:  * @sql = SQL handle
1.2.2.8   misho     464:  * @connid = connection id
1.2.2.2   misho     465:  * @msgid = MessageID
                    466:  * @topic = topic
                    467:  * @user = username
                    468:  * @host = hostname
                    469:  * @qos = Subscribe QoS
                    470:  * return: -1 error, 0 no publish or >0 published ok
                    471:  */
                    472: int
1.2.2.8   misho     473: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    474:                const char *topic, const char *user, const char *host, char qos)
1.2.2.2   misho     475: {
                    476:        int ret = 0;
                    477:        char *str, szStmt[BUFSIZ] = { 0 };
                    478:        sqlite3_stmt *stmt;
                    479: 
                    480:        if (!cfg || !sql || !topic)
                    481:                return -1;
                    482: 
1.2.2.3   misho     483:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     484:        if (!str) {
                    485:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    486:                return -1;
                    487:        }
1.2.2.8   misho     488:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
                    489:                        "PubDate, PubHost) VALUES ('%s', %d, %d, '%s', '%s', "
1.2.2.2   misho     490:                        "datetime('now', 'localtime'), '%s');", str, 
1.2.2.8   misho     491:                        connid, msgid, qos, topic, user, host);
1.2.2.2   misho     492: 
                    493:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    494:                MQTT_RTLM_LOG(sql);
                    495:                return -1;
                    496:        }
                    497:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    498:                ret = sqlite3_changes(sql);
                    499:        else {
                    500:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    501:                        MQTT_RTLM_LOG(sql);
                    502:                ret = 0;
                    503:        }
                    504:        sqlite3_finalize(stmt);
                    505: 
                    506:        return ret;
                    507: }
                    508: 
                    509: /*
                    510:  * mqtt_rtlm_delete_subscribe() Delete subscribe
                    511:  *
                    512:  * @cfg = loaded config
                    513:  * @sql = SQL handle
1.2.2.8   misho     514:  * @connid = connection id
1.2.2.2   misho     515:  * @topic = topic
                    516:  * @user = username
                    517:  * @host = hostname
                    518:  * return: -1 error, 0 no changes or >0 deleted rows
                    519:  */
                    520: int
1.2.2.8   misho     521: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
                    522:                const char *topic, const char *user, const char *host)
1.2.2.2   misho     523: {
                    524:        int ret = 0;
1.2.2.7   misho     525:        char *str, szStmt[BUFSIZ] = { 0 };
1.2.2.2   misho     526:        sqlite3_stmt *stmt;
                    527: 
                    528:        if (!cfg || !sql || !topic)
                    529:                return -1;
                    530: 
1.2.2.3   misho     531:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     532:        if (!str) {
                    533:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    534:                return -1;
                    535:        }
1.2.2.8   misho     536:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND "
                    537:                        "Topic LIKE '%s' AND PubUser LIKE '%s' AND PubHost LIKE '%s';", str, 
                    538:                        connid, topic, user, host);
1.2.2.2   misho     539: 
                    540:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    541:                MQTT_RTLM_LOG(sql);
                    542:                return -1;
                    543:        }
                    544:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    545:                ret = sqlite3_changes(sql);
                    546:        else {
                    547:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    548:                        MQTT_RTLM_LOG(sql);
                    549:                ret = 0;
                    550:        }
                    551:        sqlite3_finalize(stmt);
                    552: 
                    553:        return ret;
                    554: }
                    555: 
                    556: /*
                    557:  * mqtt_rtlm_read_subscribe() Get subscribe topic
                    558:  *
                    559:  * @cfg = loaded config
                    560:  * @sql = SQL handle
1.2.2.8   misho     561:  * @connid = connection id
1.2.2.2   misho     562:  * @topic = topic
                    563:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    564:  */
                    565: mqtt_subscr_t *
1.2.2.8   misho     566: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
1.2.2.2   misho     567: {
                    568:        int rowz = 0;
                    569:        char *str, szStmt[BUFSIZ] = { 0 };
                    570:        sqlite3_stmt *stmt;
                    571:        register int j;
                    572:        mqtt_subscr_t *s = NULL;
                    573: 
                    574:        if (!cfg || !sql || !topic)
                    575:                return NULL;
                    576: 
1.2.2.3   misho     577:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     578:        if (!str) {
                    579:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    580:                return NULL;
                    581:        }
1.2.2.8   misho     582:        snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE ConnID = '%s' AND "
                    583:                        "Topic LIKE '%s';", str, connid, topic);
1.2.2.2   misho     584: 
                    585:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    586:                MQTT_RTLM_LOG(sql);
                    587:                return NULL;
                    588:        }
                    589: 
                    590:        /* calculate count of rows and allocate subscribe items */
                    591:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    592:                rowz++;
1.2.2.9 ! misho     593:        if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
1.2.2.2   misho     594:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    595:                goto end;
                    596:        } else
                    597:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    598:        sqlite3_reset(stmt);
                    599: 
                    600:        /* fill with data */
                    601:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    602:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.9 ! misho     603:                s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
1.2.2.2   misho     604:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                    605:                s[j].sub_value.msg_base = NULL;
                    606:                s[j].sub_value.msg_len = 0;
                    607:        }
                    608: end:
                    609:        sqlite3_finalize(stmt);
                    610: 
                    611:        return s;
                    612: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>