Annotation of mqtt/src/pubmqtt.c, revision 1.2.2.10

1.2       misho       1: #include "global.h"
                      2: 
                      3: 
                      4: extern const char sql_schema[];
                      5: 
                      6: 
                      7: /*
                      8:  * mqtt_db_log() Log database connection message
                      9:  *
                     10:  * @fmt = format string
                     11:  * @... = argument list
                     12:  * return: none
                     13:  */
                     14: static void
                     15: mqtt_rtlm_log(const char *fmt, ...)
                     16: {
                     17:        va_list lst;
                     18: 
                     19:        va_start(lst, fmt);
                     20:        vsyslog(LOG_ERR, fmt, lst);
                     21:        va_end(lst);
                     22: }
1.2.2.5   misho      23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
                     24:                                        __func__, __LINE__, \
1.2       misho      25:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     26: 
1.2.2.10! misho      27: /* library pre-loaded actions */
        !            28: void
        !            29: _init()
        !            30: {
        !            31:        sqlite3_initialize();
        !            32: }
        !            33: 
        !            34: void
        !            35: _fini()
        !            36: {
        !            37:        sqlite3_shutdown();
        !            38: }
        !            39: 
1.2       misho      40: 
                     41: /*
                     42:  * mqtt_rtlm_open() Open database connection
                     43:  *
                     44:  * @cfg = config filename
                     45:  * return: NULL error or SQL handle
                     46:  */
                     47: sqlite3 *
1.2.2.3   misho      48: mqtt_rtlm_open(cfg_root_t *cfg)
1.2       misho      49: {
                     50:        sqlite3 *sql = NULL;
                     51:        const char *str = NULL;
                     52: 
                     53:        if (!cfg)
                     54:                return NULL;
                     55: 
1.2.2.3   misho      56:        str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2       misho      57:        if (!str) {
                     58:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     59:                return NULL;
                     60:        }
                     61: 
                     62:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
                     63:                MQTT_RTLM_LOG(sql);
                     64:                sqlite3_close(sql);
                     65:                return NULL;
                     66:        }
                     67: 
                     68:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                     69:                MQTT_RTLM_LOG(sql);
                     70:                sqlite3_close(sql);
                     71:                return NULL;
                     72:        }
                     73:        return sql;
                     74: }
                     75: 
                     76: /*
                     77:  * mqtt_rtlm_close() Close database connection
                     78:  *
                     79:  * @sql = SQL handle
                     80:  * return: none
                     81:  */
                     82: void
                     83: mqtt_rtlm_close(sqlite3 *sql)
                     84: {
                     85:        sqlite3_close(sql);
                     86: }
                     87: 
                     88: /*
                     89:  * mqtt_rtlm_init_session() Create session
                     90:  *
                     91:  * @cfg = loaded config
                     92:  * @sql = SQL handle
                     93:  * @connid = connection id
                     94:  * @user = username
                     95:  * @host = hostname
                     96:  * @will = will flag if !=0 must fill arguments
                     97:  * @... = will arguments in order topic,msg,qos,retain
                     98:  * return: -1 error, 0 session already appears or >0 row changed
                     99:  */
                    100: int
1.2.2.3   misho     101: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
1.2       misho     102:                const char *host, char will, ...)
                    103: {
                    104:        va_list lst;
                    105:        int ret = 0;
                    106:        char *str, szStmt[BUFSIZ] = { 0 };
                    107:        sqlite3_stmt *stmt;
                    108: 
                    109:        if (!cfg || !sql)
                    110:                return -1;
                    111: 
1.2.2.3   misho     112:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     113:        if (!str) {
                    114:                mqtt_rtlm_log("Error:: not found online table name");
                    115:                return -1;
                    116:        }
                    117:        if (!will)
                    118:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                    119:                                "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
                    120:        else {
                    121:                va_start(lst, will);
                    122:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                    123:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
                    124:                                "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
                    125:                                str, connid, user, host, will, 
                    126:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                    127:                va_end(lst);
                    128:        }
                    129: 
                    130:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    131:                MQTT_RTLM_LOG(sql);
                    132:                return -1;
                    133:        }
                    134:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    135:                ret = sqlite3_changes(sql);
                    136:        else {
                    137:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    138:                        MQTT_RTLM_LOG(sql);
                    139:                ret = 0;
                    140:        }
                    141:        sqlite3_finalize(stmt);
                    142: 
                    143:        return ret;
                    144: }
                    145: 
                    146: /*
                    147:  * mqtt_rtlm_fini_session() Delete session(s)
                    148:  *
                    149:  * @cfg = loaded config
                    150:  * @sql = SQL handle
                    151:  * @connid = connection id
                    152:  * @user = username
                    153:  * @host = hostname
                    154:  * return: -1 error, 0 session already appears or >0 row changed
                    155:  */
                    156: int
1.2.2.3   misho     157: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     158: {
                    159:        int ret = 0;
                    160:        char *str, szStmt[BUFSIZ] = { 0 };
                    161:        sqlite3_stmt *stmt;
                    162: 
                    163:        if (!cfg || !sql)
                    164:                return -1;
                    165: 
1.2.2.3   misho     166:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     167:        if (!str) {
                    168:                mqtt_rtlm_log("Error:: not found online table name");
                    169:                return -1;
                    170:        }
                    171:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
                    172:                        "AND RemoteHost LIKE '%s';", str, connid, user, host);
                    173: 
                    174:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    175:                MQTT_RTLM_LOG(sql);
                    176:                return -1;
                    177:        }
                    178:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    179:                ret = sqlite3_changes(sql);
                    180:        else {
                    181:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    182:                        MQTT_RTLM_LOG(sql);
                    183:                ret = 0;
                    184:        }
                    185:        sqlite3_finalize(stmt);
                    186: 
                    187:        return ret;
                    188: }
                    189: 
                    190: /*
                    191:  * mqtt_rtlm_chk_session() Check session(s)
                    192:  *
                    193:  * @cfg = loaded config
                    194:  * @sql = SQL handle
                    195:  * @connid = connection id
                    196:  * @user = username
                    197:  * @host = hostname
                    198:  * return: -1 error, 0 not logged or >0 logged found rows
                    199:  */
                    200: int
1.2.2.3   misho     201: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     202: {
                    203:        int ret = 0;
                    204:        char *str, szStmt[BUFSIZ] = { 0 };
                    205:        sqlite3_stmt *stmt;
                    206: 
                    207:        if (!cfg || !sql)
                    208:                return -1;
                    209: 
1.2.2.3   misho     210:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     211:        if (!str) {
                    212:                mqtt_rtlm_log("Error:: not found online table name");
                    213:                return -1;
                    214:        }
                    215:        snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
1.2.2.8   misho     216:                        "ConnID = '%s' AND Username LIKE '%s' AND RemoteHost LIKE '%s';", 
1.2       misho     217:                        str, connid, user, host);
                    218: 
                    219:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    220:                MQTT_RTLM_LOG(sql);
                    221:                return -1;
                    222:        }
                    223:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    224:                ret = sqlite3_changes(sql);
                    225:        else
                    226:                ret = 0;
                    227:        sqlite3_finalize(stmt);
                    228: 
                    229:        return ret;
                    230: }
                    231: 
                    232: /*
                    233:  * mqtt_rtlm_write_topic() Publish topic
                    234:  *
                    235:  * @cfg = loaded config
                    236:  * @sql = SQL handle
1.2.2.8   misho     237:  * @connid = connection id
1.2       misho     238:  * @msgid = MessageID
                    239:  * @topic = topic
                    240:  * @txt = text
                    241:  * @user = username
                    242:  * @host = hostname
                    243:  * @retain = !=0 retain message to database
                    244:  * return: -1 error, 0 no publish or >0 published ok
                    245:  */
                    246: int
1.2.2.8   misho     247: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    248:                const char *topic, const char *txt, const char *user, const char *host, char retain)
1.2       misho     249: {
                    250:        int ret = 0;
                    251:        char *str, szStmt[BUFSIZ] = { 0 };
                    252:        sqlite3_stmt *stmt;
                    253: 
                    254:        if (!cfg || !sql || !topic)
                    255:                return -1;
                    256: 
1.2.2.3   misho     257:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     258:        if (!str) {
                    259:                mqtt_rtlm_log("Error:: not found topics table name");
                    260:                return -1;
                    261:        }
1.2.2.8   misho     262:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, ConnID, MsgID, Topic, Value, PubUser, "
                    263:                        "PubDate, PubHost) VALUES (%d, '%s', %d, '%s', '%s', '%s', "
1.2       misho     264:                        "datetime('now', 'localtime'), '%s');", 
1.2.2.8   misho     265:                        str, retain, connid, msgid, topic, txt, user, host);
                    266: 
                    267:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    268:                MQTT_RTLM_LOG(sql);
                    269:                return -1;
                    270:        }
                    271:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    272:                ret = sqlite3_changes(sql);
                    273:        else {
                    274:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    275:                        MQTT_RTLM_LOG(sql);
                    276:                ret = 0;
                    277:        }
                    278:        sqlite3_finalize(stmt);
                    279: 
                    280:        return ret;
                    281: }
                    282: 
                    283: /*
                    284:  * mqtt_rtlm_wipe_topic() Wipe all topics
                    285:  *
                    286:  * @cfg = loaded config
                    287:  * @sql = SQL handle
                    288:  * @connid = connection id
                    289:  * @user = username
                    290:  * @retain = -1 no matter
                    291:  * return: -1 error, 0 no changes or >0 deleted rows
                    292:  */
                    293: int
                    294: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
                    295: {
                    296:        int ret = 0;
                    297:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
                    298:        sqlite3_stmt *stmt;
                    299: 
                    300:        if (!cfg || !sql || !connid)
                    301:                return -1;
                    302: 
                    303:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
                    304:        if (!str) {
                    305:                mqtt_rtlm_log("Error:: not found topics table name");
                    306:                return -1;
                    307:        }
                    308:        switch (retain) {
                    309:                case -1:
                    310:                        rtn = "";
                    311:                        break;
                    312:                case 0:
                    313:                        rtn = "AND Retain = 0";
                    314:                        break;
                    315:                default:
                    316:                        rtn = "AND Retain != 0";
                    317:                        break;
                    318:        }
                    319:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND "
                    320:                        "PubUser LIKE '%s' %s;", str, connid, user, rtn);
1.2       misho     321: 
                    322:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    323:                MQTT_RTLM_LOG(sql);
                    324:                return -1;
                    325:        }
                    326:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    327:                ret = sqlite3_changes(sql);
                    328:        else {
                    329:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    330:                        MQTT_RTLM_LOG(sql);
                    331:                ret = 0;
                    332:        }
                    333:        sqlite3_finalize(stmt);
                    334: 
                    335:        return ret;
                    336: }
                    337: 
                    338: /*
                    339:  * mqtt_rtlm_delete_topic() Delete topic
                    340:  *
                    341:  * @cfg = loaded config
                    342:  * @sql = SQL handle
1.2.2.8   misho     343:  * @connid = connection id
1.2       misho     344:  * @msgid = MessageID
                    345:  * @topic = topic
                    346:  * @user = username
                    347:  * @host = hostname
                    348:  * @retain = -1 no matter
                    349:  * return: -1 error, 0 no changes or >0 deleted rows
                    350:  */
                    351: int
1.2.2.8   misho     352: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    353:                const char *topic, const char *user, const char *host, char retain)
1.2       misho     354: {
                    355:        int ret = 0;
                    356:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
                    357:        sqlite3_stmt *stmt;
                    358: 
                    359:        if (!cfg || !sql || !topic)
                    360:                return -1;
                    361: 
1.2.2.3   misho     362:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     363:        if (!str) {
                    364:                mqtt_rtlm_log("Error:: not found topics table name");
                    365:                return -1;
                    366:        }
                    367:        switch (retain) {
                    368:                case -1:
                    369:                        rtn = "";
                    370:                        break;
                    371:                case 0:
                    372:                        rtn = "AND Retain = 0";
                    373:                        break;
                    374:                default:
                    375:                        rtn = "AND Retain != 0";
                    376:                        break;
                    377:        }
1.2.2.8   misho     378:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND MsgID = %d AND "
                    379:                        "Topic LIKE '%s' AND PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
                    380:                        connid, msgid, topic, user, host, rtn);
1.2       misho     381: 
                    382:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    383:                MQTT_RTLM_LOG(sql);
                    384:                return -1;
                    385:        }
                    386:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    387:                ret = sqlite3_changes(sql);
                    388:        else {
                    389:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    390:                        MQTT_RTLM_LOG(sql);
                    391:                ret = 0;
                    392:        }
                    393:        sqlite3_finalize(stmt);
                    394: 
                    395:        return ret;
                    396: }
                    397: 
                    398: /*
                    399:  * mqtt_rtlm_read_topic() Get topic
                    400:  *
                    401:  * @cfg = loaded config
                    402:  * @sql = SQL handle
1.2.2.8   misho     403:  * @connid = connection id
1.2       misho     404:  * @msgid = MessageID
                    405:  * @topic = topic
                    406:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    407:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    408:  */
                    409: mqtt_subscr_t *
1.2.2.8   misho     410: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    411:                const char *topic, char retain)
1.2       misho     412: {
                    413:        int rowz = 0;
                    414:        char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
                    415:        sqlite3_stmt *stmt;
                    416:        register int j;
                    417:        mqtt_subscr_t *s = NULL;
                    418: 
                    419:        if (!cfg || !sql || !topic)
                    420:                return NULL;
                    421: 
                    422:        switch (retain) {
                    423:                case -1:
                    424:                        memset(szStr, 0, sizeof szStr);
                    425:                        break;
                    426:                case 0:
                    427:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    428:                        break;
                    429:                default:
                    430:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    431:                        break;
                    432:        }
                    433: 
1.2.2.3   misho     434:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     435:        if (!str) {
                    436:                mqtt_rtlm_log("Error:: not found topics table name");
                    437:                return NULL;
                    438:        }
                    439:        snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
1.2.2.8   misho     440:                        "ConnID = '%s' AND MsgID = %d AND Topic LIKE '%s' %s;", 
                    441:                        str, connid, msgid, topic, szStr);
1.2       misho     442: 
                    443:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    444:                MQTT_RTLM_LOG(sql);
                    445:                return NULL;
                    446:        }
                    447: 
                    448:        /* calculate count of rows and allocate subscribe items */
                    449:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    450:                rowz++;
1.2.2.9   misho     451:        if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
1.2       misho     452:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    453:                goto end;
                    454:        } else
                    455:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    456:        sqlite3_reset(stmt);
                    457: 
                    458:        /* fill with data */
                    459:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    460:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.9   misho     461:                s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
1.2.2.1   misho     462:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
1.2.2.9   misho     463:                s[j].sub_value.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 2));
1.2.2.1   misho     464:                s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
1.2       misho     465:        }
                    466: end:
                    467:        sqlite3_finalize(stmt);
                    468: 
                    469:        return s;
                    470: }
1.2.2.2   misho     471: 
                    472: /*
                    473:  * mqtt_rtlm_write_subscribe() Subscribe topic
                    474:  *
                    475:  * @cfg = loaded config
                    476:  * @sql = SQL handle
1.2.2.8   misho     477:  * @connid = connection id
1.2.2.2   misho     478:  * @msgid = MessageID
                    479:  * @topic = topic
                    480:  * @user = username
                    481:  * @host = hostname
                    482:  * @qos = Subscribe QoS
                    483:  * return: -1 error, 0 no publish or >0 published ok
                    484:  */
                    485: int
1.2.2.8   misho     486: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    487:                const char *topic, const char *user, const char *host, char qos)
1.2.2.2   misho     488: {
                    489:        int ret = 0;
                    490:        char *str, szStmt[BUFSIZ] = { 0 };
                    491:        sqlite3_stmt *stmt;
                    492: 
                    493:        if (!cfg || !sql || !topic)
                    494:                return -1;
                    495: 
1.2.2.3   misho     496:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     497:        if (!str) {
                    498:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    499:                return -1;
                    500:        }
1.2.2.8   misho     501:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
                    502:                        "PubDate, PubHost) VALUES ('%s', %d, %d, '%s', '%s', "
1.2.2.2   misho     503:                        "datetime('now', 'localtime'), '%s');", str, 
1.2.2.8   misho     504:                        connid, msgid, qos, topic, user, host);
1.2.2.2   misho     505: 
                    506:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    507:                MQTT_RTLM_LOG(sql);
                    508:                return -1;
                    509:        }
                    510:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    511:                ret = sqlite3_changes(sql);
                    512:        else {
                    513:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    514:                        MQTT_RTLM_LOG(sql);
                    515:                ret = 0;
                    516:        }
                    517:        sqlite3_finalize(stmt);
                    518: 
                    519:        return ret;
                    520: }
                    521: 
                    522: /*
                    523:  * mqtt_rtlm_delete_subscribe() Delete subscribe
                    524:  *
                    525:  * @cfg = loaded config
                    526:  * @sql = SQL handle
1.2.2.8   misho     527:  * @connid = connection id
1.2.2.2   misho     528:  * @topic = topic
                    529:  * @user = username
                    530:  * @host = hostname
                    531:  * return: -1 error, 0 no changes or >0 deleted rows
                    532:  */
                    533: int
1.2.2.8   misho     534: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
                    535:                const char *topic, const char *user, const char *host)
1.2.2.2   misho     536: {
                    537:        int ret = 0;
1.2.2.7   misho     538:        char *str, szStmt[BUFSIZ] = { 0 };
1.2.2.2   misho     539:        sqlite3_stmt *stmt;
                    540: 
                    541:        if (!cfg || !sql || !topic)
                    542:                return -1;
                    543: 
1.2.2.3   misho     544:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     545:        if (!str) {
                    546:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    547:                return -1;
                    548:        }
1.2.2.8   misho     549:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND "
                    550:                        "Topic LIKE '%s' AND PubUser LIKE '%s' AND PubHost LIKE '%s';", str, 
                    551:                        connid, topic, user, host);
1.2.2.2   misho     552: 
                    553:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    554:                MQTT_RTLM_LOG(sql);
                    555:                return -1;
                    556:        }
                    557:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    558:                ret = sqlite3_changes(sql);
                    559:        else {
                    560:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    561:                        MQTT_RTLM_LOG(sql);
                    562:                ret = 0;
                    563:        }
                    564:        sqlite3_finalize(stmt);
                    565: 
                    566:        return ret;
                    567: }
                    568: 
                    569: /*
                    570:  * mqtt_rtlm_read_subscribe() Get subscribe topic
                    571:  *
                    572:  * @cfg = loaded config
                    573:  * @sql = SQL handle
1.2.2.8   misho     574:  * @connid = connection id
1.2.2.2   misho     575:  * @topic = topic
                    576:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    577:  */
                    578: mqtt_subscr_t *
1.2.2.8   misho     579: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
1.2.2.2   misho     580: {
                    581:        int rowz = 0;
                    582:        char *str, szStmt[BUFSIZ] = { 0 };
                    583:        sqlite3_stmt *stmt;
                    584:        register int j;
                    585:        mqtt_subscr_t *s = NULL;
                    586: 
                    587:        if (!cfg || !sql || !topic)
                    588:                return NULL;
                    589: 
1.2.2.3   misho     590:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     591:        if (!str) {
                    592:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    593:                return NULL;
                    594:        }
1.2.2.8   misho     595:        snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE ConnID = '%s' AND "
                    596:                        "Topic LIKE '%s';", str, connid, topic);
1.2.2.2   misho     597: 
                    598:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    599:                MQTT_RTLM_LOG(sql);
                    600:                return NULL;
                    601:        }
                    602: 
                    603:        /* calculate count of rows and allocate subscribe items */
                    604:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    605:                rowz++;
1.2.2.9   misho     606:        if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
1.2.2.2   misho     607:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    608:                goto end;
                    609:        } else
                    610:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    611:        sqlite3_reset(stmt);
                    612: 
                    613:        /* fill with data */
                    614:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    615:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.9   misho     616:                s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
1.2.2.2   misho     617:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                    618:                s[j].sub_value.msg_base = NULL;
                    619:                s[j].sub_value.msg_len = 0;
                    620:        }
                    621: end:
                    622:        sqlite3_finalize(stmt);
                    623: 
                    624:        return s;
                    625: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>