Annotation of mqtt/src/pubmqtt.c, revision 1.2.2.4

1.2       misho       1: #include "global.h"
                      2: 
                      3: 
                      4: extern const char sql_schema[];
                      5: 
                      6: 
                      7: /*
                      8:  * mqtt_db_log() Log database connection message
                      9:  *
                     10:  * @fmt = format string
                     11:  * @... = argument list
                     12:  * return: none
                     13:  */
                     14: static void
                     15: mqtt_rtlm_log(const char *fmt, ...)
                     16: {
                     17:        va_list lst;
                     18: 
                     19:        va_start(lst, fmt);
                     20:        vsyslog(LOG_ERR, fmt, lst);
                     21:        va_end(lst);
                     22: }
                     23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
                     24:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     25: 
                     26: 
                     27: /*
                     28:  * mqtt_rtlm_open() Open database connection
                     29:  *
                     30:  * @cfg = config filename
                     31:  * return: NULL error or SQL handle
                     32:  */
                     33: sqlite3 *
1.2.2.3   misho      34: mqtt_rtlm_open(cfg_root_t *cfg)
1.2       misho      35: {
                     36:        sqlite3 *sql = NULL;
                     37:        const char *str = NULL;
                     38: 
                     39:        if (!cfg)
                     40:                return NULL;
                     41: 
1.2.2.4 ! misho      42:        sqlite3_config(SQLITE_CONFIG_SERIALIZED);
        !            43:        if (!sqlite3_threadsafe())
1.2       misho      44:                return NULL;
                     45: 
1.2.2.3   misho      46:        str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2       misho      47:        if (!str) {
                     48:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     49:                return NULL;
                     50:        }
                     51: 
                     52:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
                     53:                MQTT_RTLM_LOG(sql);
                     54:                sqlite3_close(sql);
                     55:                return NULL;
                     56:        }
                     57: 
1.2.2.4 ! misho      58:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho      59:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                     60:                MQTT_RTLM_LOG(sql);
1.2.2.4 ! misho      61:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho      62:                sqlite3_close(sql);
                     63:                return NULL;
                     64:        }
1.2.2.4 ! misho      65:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho      66:        return sql;
                     67: }
                     68: 
                     69: /*
                     70:  * mqtt_rtlm_close() Close database connection
                     71:  *
                     72:  * @sql = SQL handle
                     73:  * return: none
                     74:  */
                     75: void
                     76: mqtt_rtlm_close(sqlite3 *sql)
                     77: {
                     78:        sqlite3_close(sql);
                     79: }
                     80: 
                     81: /*
                     82:  * mqtt_rtlm_init_session() Create session
                     83:  *
                     84:  * @cfg = loaded config
                     85:  * @sql = SQL handle
                     86:  * @connid = connection id
                     87:  * @user = username
                     88:  * @host = hostname
                     89:  * @will = will flag if !=0 must fill arguments
                     90:  * @... = will arguments in order topic,msg,qos,retain
                     91:  * return: -1 error, 0 session already appears or >0 row changed
                     92:  */
                     93: int
1.2.2.3   misho      94: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
1.2       misho      95:                const char *host, char will, ...)
                     96: {
                     97:        va_list lst;
                     98:        int ret = 0;
                     99:        char *str, szStmt[BUFSIZ] = { 0 };
                    100:        sqlite3_stmt *stmt;
                    101: 
                    102:        if (!cfg || !sql)
                    103:                return -1;
                    104: 
1.2.2.3   misho     105:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     106:        if (!str) {
                    107:                mqtt_rtlm_log("Error:: not found online table name");
                    108:                return -1;
                    109:        }
                    110:        if (!will)
                    111:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                    112:                                "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
                    113:        else {
                    114:                va_start(lst, will);
                    115:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                    116:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
                    117:                                "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
                    118:                                str, connid, user, host, will, 
                    119:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                    120:                va_end(lst);
                    121:        }
                    122: 
1.2.2.4 ! misho     123:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho     124:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    125:                MQTT_RTLM_LOG(sql);
1.2.2.4 ! misho     126:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     127:                return -1;
                    128:        }
                    129:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    130:                ret = sqlite3_changes(sql);
                    131:        else {
                    132:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    133:                        MQTT_RTLM_LOG(sql);
                    134:                ret = 0;
                    135:        }
                    136:        sqlite3_finalize(stmt);
1.2.2.4 ! misho     137:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     138: 
                    139:        return ret;
                    140: }
                    141: 
                    142: /*
                    143:  * mqtt_rtlm_fini_session() Delete session(s)
                    144:  *
                    145:  * @cfg = loaded config
                    146:  * @sql = SQL handle
                    147:  * @connid = connection id
                    148:  * @user = username
                    149:  * @host = hostname
                    150:  * return: -1 error, 0 session already appears or >0 row changed
                    151:  */
                    152: int
1.2.2.3   misho     153: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     154: {
                    155:        int ret = 0;
                    156:        char *str, szStmt[BUFSIZ] = { 0 };
                    157:        sqlite3_stmt *stmt;
                    158: 
                    159:        if (!cfg || !sql)
                    160:                return -1;
                    161: 
1.2.2.3   misho     162:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     163:        if (!str) {
                    164:                mqtt_rtlm_log("Error:: not found online table name");
                    165:                return -1;
                    166:        }
                    167:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
                    168:                        "AND RemoteHost LIKE '%s';", str, connid, user, host);
                    169: 
1.2.2.4 ! misho     170:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho     171:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    172:                MQTT_RTLM_LOG(sql);
1.2.2.4 ! misho     173:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     174:                return -1;
                    175:        }
                    176:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    177:                ret = sqlite3_changes(sql);
                    178:        else {
                    179:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    180:                        MQTT_RTLM_LOG(sql);
                    181:                ret = 0;
                    182:        }
                    183:        sqlite3_finalize(stmt);
1.2.2.4 ! misho     184:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     185: 
                    186:        return ret;
                    187: }
                    188: 
                    189: /*
                    190:  * mqtt_rtlm_chk_session() Check session(s)
                    191:  *
                    192:  * @cfg = loaded config
                    193:  * @sql = SQL handle
                    194:  * @connid = connection id
                    195:  * @user = username
                    196:  * @host = hostname
                    197:  * return: -1 error, 0 not logged or >0 logged found rows
                    198:  */
                    199: int
1.2.2.3   misho     200: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     201: {
                    202:        int ret = 0;
                    203:        char *str, szStmt[BUFSIZ] = { 0 };
                    204:        sqlite3_stmt *stmt;
                    205: 
                    206:        if (!cfg || !sql)
                    207:                return -1;
                    208: 
1.2.2.3   misho     209:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     210:        if (!str) {
                    211:                mqtt_rtlm_log("Error:: not found online table name");
                    212:                return -1;
                    213:        }
                    214:        snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
                    215:                        "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';", 
                    216:                        str, connid, user, host);
                    217: 
1.2.2.4 ! misho     218:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho     219:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    220:                MQTT_RTLM_LOG(sql);
1.2.2.4 ! misho     221:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     222:                return -1;
                    223:        }
                    224:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    225:                ret = sqlite3_changes(sql);
                    226:        else
                    227:                ret = 0;
                    228:        sqlite3_finalize(stmt);
1.2.2.4 ! misho     229:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     230: 
                    231:        return ret;
                    232: }
                    233: 
                    234: /*
                    235:  * mqtt_rtlm_write_topic() Publish topic
                    236:  *
                    237:  * @cfg = loaded config
                    238:  * @sql = SQL handle
                    239:  * @msgid = MessageID
                    240:  * @topic = topic
                    241:  * @txt = text
                    242:  * @user = username
                    243:  * @host = hostname
                    244:  * @retain = !=0 retain message to database
                    245:  * return: -1 error, 0 no publish or >0 published ok
                    246:  */
                    247: int
1.2.2.3   misho     248: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
1.2       misho     249:                const char *user, const char *host, char retain)
                    250: {
                    251:        int ret = 0;
                    252:        char *str, szStmt[BUFSIZ] = { 0 };
                    253:        sqlite3_stmt *stmt;
                    254: 
                    255:        if (!cfg || !sql || !topic)
                    256:                return -1;
                    257: 
1.2.2.3   misho     258:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     259:        if (!str) {
                    260:                mqtt_rtlm_log("Error:: not found topics table name");
                    261:                return -1;
                    262:        }
                    263:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
                    264:                        "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
                    265:                        "datetime('now', 'localtime'), '%s');", 
                    266:                        str, retain, msgid, topic, txt, user, host);
                    267: 
1.2.2.4 ! misho     268:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho     269:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    270:                MQTT_RTLM_LOG(sql);
1.2.2.4 ! misho     271:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     272:                return -1;
                    273:        }
                    274:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    275:                ret = sqlite3_changes(sql);
                    276:        else {
                    277:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    278:                        MQTT_RTLM_LOG(sql);
                    279:                ret = 0;
                    280:        }
                    281:        sqlite3_finalize(stmt);
1.2.2.4 ! misho     282:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     283: 
                    284:        return ret;
                    285: }
                    286: 
                    287: /*
                    288:  * mqtt_rtlm_delete_topic() Delete topic
                    289:  *
                    290:  * @cfg = loaded config
                    291:  * @sql = SQL handle
                    292:  * @msgid = MessageID
                    293:  * @topic = topic
                    294:  * @user = username
                    295:  * @host = hostname
                    296:  * @retain = -1 no matter
                    297:  * return: -1 error, 0 no changes or >0 deleted rows
                    298:  */
                    299: int
1.2.2.3   misho     300: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
1.2       misho     301:                const char *user, const char *host, char retain)
                    302: {
                    303:        int ret = 0;
                    304:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
                    305:        sqlite3_stmt *stmt;
                    306: 
                    307:        if (!cfg || !sql || !topic)
                    308:                return -1;
                    309: 
1.2.2.3   misho     310:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     311:        if (!str) {
                    312:                mqtt_rtlm_log("Error:: not found topics table name");
                    313:                return -1;
                    314:        }
                    315:        switch (retain) {
                    316:                case -1:
                    317:                        rtn = "";
                    318:                        break;
                    319:                case 0:
                    320:                        rtn = "AND Retain = 0";
                    321:                        break;
                    322:                default:
                    323:                        rtn = "AND Retain != 0";
                    324:                        break;
                    325:        }
                    326:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
                    327:                        "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
                    328:                        msgid, topic, user, host, rtn);
                    329: 
1.2.2.4 ! misho     330:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho     331:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    332:                MQTT_RTLM_LOG(sql);
1.2.2.4 ! misho     333:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     334:                return -1;
                    335:        }
                    336:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    337:                ret = sqlite3_changes(sql);
                    338:        else {
                    339:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    340:                        MQTT_RTLM_LOG(sql);
                    341:                ret = 0;
                    342:        }
                    343:        sqlite3_finalize(stmt);
1.2.2.4 ! misho     344:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     345: 
                    346:        return ret;
                    347: }
                    348: 
                    349: /*
                    350:  * mqtt_rtlm_read_topic() Get topic
                    351:  *
                    352:  * @cfg = loaded config
                    353:  * @sql = SQL handle
                    354:  * @msgid = MessageID
                    355:  * @topic = topic
                    356:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    357:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    358:  */
                    359: mqtt_subscr_t *
1.2.2.3   misho     360: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
1.2       misho     361: {
                    362:        int rowz = 0;
                    363:        char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
                    364:        sqlite3_stmt *stmt;
                    365:        register int j;
                    366:        mqtt_subscr_t *s = NULL;
                    367: 
                    368:        if (!cfg || !sql || !topic)
                    369:                return NULL;
                    370: 
                    371:        switch (retain) {
                    372:                case -1:
                    373:                        memset(szStr, 0, sizeof szStr);
                    374:                        break;
                    375:                case 0:
                    376:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    377:                        break;
                    378:                default:
                    379:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    380:                        break;
                    381:        }
                    382: 
1.2.2.3   misho     383:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     384:        if (!str) {
                    385:                mqtt_rtlm_log("Error:: not found topics table name");
                    386:                return NULL;
                    387:        }
                    388:        snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
                    389:                        "MsgID = %d AND Topic LIKE '%s' %s;", 
                    390:                        str, msgid, topic, szStr);
                    391: 
1.2.2.4 ! misho     392:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho     393:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    394:                MQTT_RTLM_LOG(sql);
1.2.2.4 ! misho     395:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     396:                return NULL;
                    397:        }
                    398: 
                    399:        /* calculate count of rows and allocate subscribe items */
                    400:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    401:                rowz++;
                    402:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    403:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    404:                goto end;
                    405:        } else
                    406:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    407:        sqlite3_reset(stmt);
                    408: 
                    409:        /* fill with data */
                    410:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    411:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.1   misho     412:                s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
                    413:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                    414:                s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
                    415:                s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
1.2       misho     416:        }
                    417: end:
                    418:        sqlite3_finalize(stmt);
1.2.2.4 ! misho     419:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     420: 
                    421:        return s;
                    422: }
1.2.2.2   misho     423: 
                    424: /*
                    425:  * mqtt_rtlm_write_subscribe() Subscribe topic
                    426:  *
                    427:  * @cfg = loaded config
                    428:  * @sql = SQL handle
                    429:  * @msgid = MessageID
                    430:  * @topic = topic
                    431:  * @user = username
                    432:  * @host = hostname
                    433:  * @qos = Subscribe QoS
                    434:  * return: -1 error, 0 no publish or >0 published ok
                    435:  */
                    436: int
1.2.2.3   misho     437: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
1.2.2.2   misho     438:                const char *user, const char *host, char qos)
                    439: {
                    440:        int ret = 0;
                    441:        char *str, szStmt[BUFSIZ] = { 0 };
                    442:        sqlite3_stmt *stmt;
                    443: 
                    444:        if (!cfg || !sql || !topic)
                    445:                return -1;
                    446: 
1.2.2.3   misho     447:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     448:        if (!str) {
                    449:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    450:                return -1;
                    451:        }
                    452:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (MsgID, QoS, Topic, PubUser, "
                    453:                        "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', "
                    454:                        "datetime('now', 'localtime'), '%s');", str, 
                    455:                        msgid, qos, topic, user, host);
                    456: 
1.2.2.4 ! misho     457:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2.2.2   misho     458:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    459:                MQTT_RTLM_LOG(sql);
1.2.2.4 ! misho     460:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2.2.2   misho     461:                return -1;
                    462:        }
                    463:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    464:                ret = sqlite3_changes(sql);
                    465:        else {
                    466:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    467:                        MQTT_RTLM_LOG(sql);
                    468:                ret = 0;
                    469:        }
                    470:        sqlite3_finalize(stmt);
1.2.2.4 ! misho     471:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2.2.2   misho     472: 
                    473:        return ret;
                    474: }
                    475: 
                    476: /*
                    477:  * mqtt_rtlm_delete_subscribe() Delete subscribe
                    478:  *
                    479:  * @cfg = loaded config
                    480:  * @sql = SQL handle
                    481:  * @topic = topic
                    482:  * @user = username
                    483:  * @host = hostname
                    484:  * @qos = Subscribe QoS if -1 no matter
                    485:  * return: -1 error, 0 no changes or >0 deleted rows
                    486:  */
                    487: int
1.2.2.3   misho     488: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic, 
1.2.2.2   misho     489:                const char *user, const char *host, char qos)
                    490: {
                    491:        int ret = 0;
                    492:        char *str, szStr[STRSIZ] = { 0 }, szStmt[BUFSIZ] = { 0 };
                    493:        sqlite3_stmt *stmt;
                    494: 
                    495:        if (!cfg || !sql || !topic)
                    496:                return -1;
                    497: 
1.2.2.3   misho     498:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     499:        if (!str) {
                    500:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    501:                return -1;
                    502:        }
                    503:        if (qos > -1 && qos < 3)
                    504:                snprintf(szStr, sizeof szStr, "AND QoS = %d", qos);
                    505:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
                    506:                        "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
                    507:                        topic, user, host, szStr);
                    508: 
1.2.2.4 ! misho     509:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2.2.2   misho     510:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    511:                MQTT_RTLM_LOG(sql);
1.2.2.4 ! misho     512:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2.2.2   misho     513:                return -1;
                    514:        }
                    515:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    516:                ret = sqlite3_changes(sql);
                    517:        else {
                    518:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    519:                        MQTT_RTLM_LOG(sql);
                    520:                ret = 0;
                    521:        }
                    522:        sqlite3_finalize(stmt);
1.2.2.4 ! misho     523:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2.2.2   misho     524: 
                    525:        return ret;
                    526: }
                    527: 
                    528: /*
                    529:  * mqtt_rtlm_read_subscribe() Get subscribe topic
                    530:  *
                    531:  * @cfg = loaded config
                    532:  * @sql = SQL handle
                    533:  * @topic = topic
                    534:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    535:  */
                    536: mqtt_subscr_t *
1.2.2.3   misho     537: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic)
1.2.2.2   misho     538: {
                    539:        int rowz = 0;
                    540:        char *str, szStmt[BUFSIZ] = { 0 };
                    541:        sqlite3_stmt *stmt;
                    542:        register int j;
                    543:        mqtt_subscr_t *s = NULL;
                    544: 
                    545:        if (!cfg || !sql || !topic)
                    546:                return NULL;
                    547: 
1.2.2.3   misho     548:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     549:        if (!str) {
                    550:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    551:                return NULL;
                    552:        }
                    553:        snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic);
                    554: 
1.2.2.4 ! misho     555:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2.2.2   misho     556:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    557:                MQTT_RTLM_LOG(sql);
1.2.2.4 ! misho     558:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2.2.2   misho     559:                return NULL;
                    560:        }
                    561: 
                    562:        /* calculate count of rows and allocate subscribe items */
                    563:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    564:                rowz++;
                    565:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    566:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    567:                goto end;
                    568:        } else
                    569:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    570:        sqlite3_reset(stmt);
                    571: 
                    572:        /* fill with data */
                    573:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    574:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
                    575:                s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
                    576:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                    577:                s[j].sub_value.msg_base = NULL;
                    578:                s[j].sub_value.msg_len = 0;
                    579:        }
                    580: end:
                    581:        sqlite3_finalize(stmt);
1.2.2.4 ! misho     582:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2.2.2   misho     583: 
                    584:        return s;
                    585: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>