Annotation of mqtt/src/pubmqtt.c, revision 1.2.2.5

1.2       misho       1: #include "global.h"
                      2: 
                      3: 
                      4: extern const char sql_schema[];
                      5: 
                      6: 
                      7: /*
                      8:  * mqtt_db_log() Log database connection message
                      9:  *
                     10:  * @fmt = format string
                     11:  * @... = argument list
                     12:  * return: none
                     13:  */
                     14: static void
                     15: mqtt_rtlm_log(const char *fmt, ...)
                     16: {
                     17:        va_list lst;
                     18: 
                     19:        va_start(lst, fmt);
                     20:        vsyslog(LOG_ERR, fmt, lst);
                     21:        va_end(lst);
                     22: }
1.2.2.5 ! misho      23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
        !            24:                                        __func__, __LINE__, \
1.2       misho      25:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     26: 
                     27: 
                     28: /*
                     29:  * mqtt_rtlm_open() Open database connection
                     30:  *
                     31:  * @cfg = config filename
                     32:  * return: NULL error or SQL handle
                     33:  */
                     34: sqlite3 *
1.2.2.3   misho      35: mqtt_rtlm_open(cfg_root_t *cfg)
1.2       misho      36: {
                     37:        sqlite3 *sql = NULL;
                     38:        const char *str = NULL;
                     39: 
                     40:        if (!cfg)
                     41:                return NULL;
                     42: 
1.2.2.4   misho      43:        sqlite3_config(SQLITE_CONFIG_SERIALIZED);
                     44:        if (!sqlite3_threadsafe())
1.2       misho      45:                return NULL;
                     46: 
1.2.2.3   misho      47:        str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2       misho      48:        if (!str) {
                     49:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     50:                return NULL;
                     51:        }
                     52: 
                     53:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
                     54:                MQTT_RTLM_LOG(sql);
                     55:                sqlite3_close(sql);
                     56:                return NULL;
                     57:        }
                     58: 
1.2.2.4   misho      59:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho      60:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                     61:                MQTT_RTLM_LOG(sql);
1.2.2.4   misho      62:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho      63:                sqlite3_close(sql);
                     64:                return NULL;
                     65:        }
1.2.2.4   misho      66:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho      67:        return sql;
                     68: }
                     69: 
                     70: /*
                     71:  * mqtt_rtlm_close() Close database connection
                     72:  *
                     73:  * @sql = SQL handle
                     74:  * return: none
                     75:  */
                     76: void
                     77: mqtt_rtlm_close(sqlite3 *sql)
                     78: {
                     79:        sqlite3_close(sql);
                     80: }
                     81: 
                     82: /*
                     83:  * mqtt_rtlm_init_session() Create session
                     84:  *
                     85:  * @cfg = loaded config
                     86:  * @sql = SQL handle
                     87:  * @connid = connection id
                     88:  * @user = username
                     89:  * @host = hostname
                     90:  * @will = will flag if !=0 must fill arguments
                     91:  * @... = will arguments in order topic,msg,qos,retain
                     92:  * return: -1 error, 0 session already appears or >0 row changed
                     93:  */
                     94: int
1.2.2.3   misho      95: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
1.2       misho      96:                const char *host, char will, ...)
                     97: {
                     98:        va_list lst;
                     99:        int ret = 0;
                    100:        char *str, szStmt[BUFSIZ] = { 0 };
                    101:        sqlite3_stmt *stmt;
                    102: 
                    103:        if (!cfg || !sql)
                    104:                return -1;
                    105: 
1.2.2.3   misho     106:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     107:        if (!str) {
                    108:                mqtt_rtlm_log("Error:: not found online table name");
                    109:                return -1;
                    110:        }
                    111:        if (!will)
                    112:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                    113:                                "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
                    114:        else {
                    115:                va_start(lst, will);
                    116:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                    117:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
                    118:                                "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
                    119:                                str, connid, user, host, will, 
                    120:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                    121:                va_end(lst);
                    122:        }
                    123: 
1.2.2.4   misho     124:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho     125:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    126:                MQTT_RTLM_LOG(sql);
1.2.2.4   misho     127:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     128:                return -1;
                    129:        }
                    130:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    131:                ret = sqlite3_changes(sql);
                    132:        else {
                    133:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    134:                        MQTT_RTLM_LOG(sql);
                    135:                ret = 0;
                    136:        }
                    137:        sqlite3_finalize(stmt);
1.2.2.4   misho     138:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     139: 
                    140:        return ret;
                    141: }
                    142: 
                    143: /*
                    144:  * mqtt_rtlm_fini_session() Delete session(s)
                    145:  *
                    146:  * @cfg = loaded config
                    147:  * @sql = SQL handle
                    148:  * @connid = connection id
                    149:  * @user = username
                    150:  * @host = hostname
                    151:  * return: -1 error, 0 session already appears or >0 row changed
                    152:  */
                    153: int
1.2.2.3   misho     154: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     155: {
                    156:        int ret = 0;
                    157:        char *str, szStmt[BUFSIZ] = { 0 };
                    158:        sqlite3_stmt *stmt;
                    159: 
                    160:        if (!cfg || !sql)
                    161:                return -1;
                    162: 
1.2.2.3   misho     163:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     164:        if (!str) {
                    165:                mqtt_rtlm_log("Error:: not found online table name");
                    166:                return -1;
                    167:        }
                    168:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
                    169:                        "AND RemoteHost LIKE '%s';", str, connid, user, host);
                    170: 
1.2.2.4   misho     171:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho     172:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    173:                MQTT_RTLM_LOG(sql);
1.2.2.4   misho     174:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     175:                return -1;
                    176:        }
                    177:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    178:                ret = sqlite3_changes(sql);
                    179:        else {
                    180:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    181:                        MQTT_RTLM_LOG(sql);
                    182:                ret = 0;
                    183:        }
                    184:        sqlite3_finalize(stmt);
1.2.2.4   misho     185:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     186: 
                    187:        return ret;
                    188: }
                    189: 
                    190: /*
                    191:  * mqtt_rtlm_chk_session() Check session(s)
                    192:  *
                    193:  * @cfg = loaded config
                    194:  * @sql = SQL handle
                    195:  * @connid = connection id
                    196:  * @user = username
                    197:  * @host = hostname
                    198:  * return: -1 error, 0 not logged or >0 logged found rows
                    199:  */
                    200: int
1.2.2.3   misho     201: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     202: {
                    203:        int ret = 0;
                    204:        char *str, szStmt[BUFSIZ] = { 0 };
                    205:        sqlite3_stmt *stmt;
                    206: 
                    207:        if (!cfg || !sql)
                    208:                return -1;
                    209: 
1.2.2.3   misho     210:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     211:        if (!str) {
                    212:                mqtt_rtlm_log("Error:: not found online table name");
                    213:                return -1;
                    214:        }
                    215:        snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
                    216:                        "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';", 
                    217:                        str, connid, user, host);
                    218: 
1.2.2.4   misho     219:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho     220:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    221:                MQTT_RTLM_LOG(sql);
1.2.2.4   misho     222:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     223:                return -1;
                    224:        }
                    225:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    226:                ret = sqlite3_changes(sql);
                    227:        else
                    228:                ret = 0;
                    229:        sqlite3_finalize(stmt);
1.2.2.4   misho     230:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     231: 
                    232:        return ret;
                    233: }
                    234: 
                    235: /*
                    236:  * mqtt_rtlm_write_topic() Publish topic
                    237:  *
                    238:  * @cfg = loaded config
                    239:  * @sql = SQL handle
                    240:  * @msgid = MessageID
                    241:  * @topic = topic
                    242:  * @txt = text
                    243:  * @user = username
                    244:  * @host = hostname
                    245:  * @retain = !=0 retain message to database
                    246:  * return: -1 error, 0 no publish or >0 published ok
                    247:  */
                    248: int
1.2.2.3   misho     249: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
1.2       misho     250:                const char *user, const char *host, char retain)
                    251: {
                    252:        int ret = 0;
                    253:        char *str, szStmt[BUFSIZ] = { 0 };
                    254:        sqlite3_stmt *stmt;
                    255: 
                    256:        if (!cfg || !sql || !topic)
                    257:                return -1;
                    258: 
1.2.2.3   misho     259:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     260:        if (!str) {
                    261:                mqtt_rtlm_log("Error:: not found topics table name");
                    262:                return -1;
                    263:        }
                    264:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
                    265:                        "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
                    266:                        "datetime('now', 'localtime'), '%s');", 
                    267:                        str, retain, msgid, topic, txt, user, host);
                    268: 
1.2.2.4   misho     269:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho     270:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    271:                MQTT_RTLM_LOG(sql);
1.2.2.4   misho     272:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     273:                return -1;
                    274:        }
                    275:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    276:                ret = sqlite3_changes(sql);
                    277:        else {
                    278:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    279:                        MQTT_RTLM_LOG(sql);
                    280:                ret = 0;
                    281:        }
                    282:        sqlite3_finalize(stmt);
1.2.2.4   misho     283:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     284: 
                    285:        return ret;
                    286: }
                    287: 
                    288: /*
                    289:  * mqtt_rtlm_delete_topic() Delete topic
                    290:  *
                    291:  * @cfg = loaded config
                    292:  * @sql = SQL handle
                    293:  * @msgid = MessageID
                    294:  * @topic = topic
                    295:  * @user = username
                    296:  * @host = hostname
                    297:  * @retain = -1 no matter
                    298:  * return: -1 error, 0 no changes or >0 deleted rows
                    299:  */
                    300: int
1.2.2.3   misho     301: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
1.2       misho     302:                const char *user, const char *host, char retain)
                    303: {
                    304:        int ret = 0;
                    305:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
                    306:        sqlite3_stmt *stmt;
                    307: 
                    308:        if (!cfg || !sql || !topic)
                    309:                return -1;
                    310: 
1.2.2.3   misho     311:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     312:        if (!str) {
                    313:                mqtt_rtlm_log("Error:: not found topics table name");
                    314:                return -1;
                    315:        }
                    316:        switch (retain) {
                    317:                case -1:
                    318:                        rtn = "";
                    319:                        break;
                    320:                case 0:
                    321:                        rtn = "AND Retain = 0";
                    322:                        break;
                    323:                default:
                    324:                        rtn = "AND Retain != 0";
                    325:                        break;
                    326:        }
                    327:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
                    328:                        "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
                    329:                        msgid, topic, user, host, rtn);
                    330: 
1.2.2.4   misho     331:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho     332:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    333:                MQTT_RTLM_LOG(sql);
1.2.2.4   misho     334:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     335:                return -1;
                    336:        }
                    337:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    338:                ret = sqlite3_changes(sql);
                    339:        else {
                    340:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    341:                        MQTT_RTLM_LOG(sql);
                    342:                ret = 0;
                    343:        }
                    344:        sqlite3_finalize(stmt);
1.2.2.4   misho     345:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     346: 
                    347:        return ret;
                    348: }
                    349: 
                    350: /*
                    351:  * mqtt_rtlm_read_topic() Get topic
                    352:  *
                    353:  * @cfg = loaded config
                    354:  * @sql = SQL handle
                    355:  * @msgid = MessageID
                    356:  * @topic = topic
                    357:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    358:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    359:  */
                    360: mqtt_subscr_t *
1.2.2.3   misho     361: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
1.2       misho     362: {
                    363:        int rowz = 0;
                    364:        char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
                    365:        sqlite3_stmt *stmt;
                    366:        register int j;
                    367:        mqtt_subscr_t *s = NULL;
                    368: 
                    369:        if (!cfg || !sql || !topic)
                    370:                return NULL;
                    371: 
                    372:        switch (retain) {
                    373:                case -1:
                    374:                        memset(szStr, 0, sizeof szStr);
                    375:                        break;
                    376:                case 0:
                    377:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    378:                        break;
                    379:                default:
                    380:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    381:                        break;
                    382:        }
                    383: 
1.2.2.3   misho     384:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     385:        if (!str) {
                    386:                mqtt_rtlm_log("Error:: not found topics table name");
                    387:                return NULL;
                    388:        }
                    389:        snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
                    390:                        "MsgID = %d AND Topic LIKE '%s' %s;", 
                    391:                        str, msgid, topic, szStr);
                    392: 
1.2.2.4   misho     393:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2       misho     394:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    395:                MQTT_RTLM_LOG(sql);
1.2.2.4   misho     396:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     397:                return NULL;
                    398:        }
                    399: 
                    400:        /* calculate count of rows and allocate subscribe items */
                    401:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    402:                rowz++;
                    403:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    404:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    405:                goto end;
                    406:        } else
                    407:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    408:        sqlite3_reset(stmt);
                    409: 
                    410:        /* fill with data */
                    411:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    412:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.1   misho     413:                s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
                    414:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                    415:                s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
                    416:                s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
1.2       misho     417:        }
                    418: end:
                    419:        sqlite3_finalize(stmt);
1.2.2.4   misho     420:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2       misho     421: 
                    422:        return s;
                    423: }
1.2.2.2   misho     424: 
                    425: /*
                    426:  * mqtt_rtlm_write_subscribe() Subscribe topic
                    427:  *
                    428:  * @cfg = loaded config
                    429:  * @sql = SQL handle
                    430:  * @msgid = MessageID
                    431:  * @topic = topic
                    432:  * @user = username
                    433:  * @host = hostname
                    434:  * @qos = Subscribe QoS
                    435:  * return: -1 error, 0 no publish or >0 published ok
                    436:  */
                    437: int
1.2.2.3   misho     438: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
1.2.2.2   misho     439:                const char *user, const char *host, char qos)
                    440: {
                    441:        int ret = 0;
                    442:        char *str, szStmt[BUFSIZ] = { 0 };
                    443:        sqlite3_stmt *stmt;
                    444: 
                    445:        if (!cfg || !sql || !topic)
                    446:                return -1;
                    447: 
1.2.2.3   misho     448:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     449:        if (!str) {
                    450:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    451:                return -1;
                    452:        }
                    453:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (MsgID, QoS, Topic, PubUser, "
                    454:                        "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', "
                    455:                        "datetime('now', 'localtime'), '%s');", str, 
                    456:                        msgid, qos, topic, user, host);
                    457: 
1.2.2.4   misho     458:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2.2.2   misho     459:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    460:                MQTT_RTLM_LOG(sql);
1.2.2.4   misho     461:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2.2.2   misho     462:                return -1;
                    463:        }
                    464:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    465:                ret = sqlite3_changes(sql);
                    466:        else {
                    467:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    468:                        MQTT_RTLM_LOG(sql);
                    469:                ret = 0;
                    470:        }
                    471:        sqlite3_finalize(stmt);
1.2.2.4   misho     472:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2.2.2   misho     473: 
                    474:        return ret;
                    475: }
                    476: 
                    477: /*
                    478:  * mqtt_rtlm_delete_subscribe() Delete subscribe
                    479:  *
                    480:  * @cfg = loaded config
                    481:  * @sql = SQL handle
                    482:  * @topic = topic
                    483:  * @user = username
                    484:  * @host = hostname
                    485:  * @qos = Subscribe QoS if -1 no matter
                    486:  * return: -1 error, 0 no changes or >0 deleted rows
                    487:  */
                    488: int
1.2.2.3   misho     489: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic, 
1.2.2.2   misho     490:                const char *user, const char *host, char qos)
                    491: {
                    492:        int ret = 0;
                    493:        char *str, szStr[STRSIZ] = { 0 }, szStmt[BUFSIZ] = { 0 };
                    494:        sqlite3_stmt *stmt;
                    495: 
                    496:        if (!cfg || !sql || !topic)
                    497:                return -1;
                    498: 
1.2.2.3   misho     499:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     500:        if (!str) {
                    501:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    502:                return -1;
                    503:        }
                    504:        if (qos > -1 && qos < 3)
                    505:                snprintf(szStr, sizeof szStr, "AND QoS = %d", qos);
                    506:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
                    507:                        "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
                    508:                        topic, user, host, szStr);
                    509: 
1.2.2.4   misho     510:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2.2.2   misho     511:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    512:                MQTT_RTLM_LOG(sql);
1.2.2.4   misho     513:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2.2.2   misho     514:                return -1;
                    515:        }
                    516:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    517:                ret = sqlite3_changes(sql);
                    518:        else {
                    519:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    520:                        MQTT_RTLM_LOG(sql);
                    521:                ret = 0;
                    522:        }
                    523:        sqlite3_finalize(stmt);
1.2.2.4   misho     524:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2.2.2   misho     525: 
                    526:        return ret;
                    527: }
                    528: 
                    529: /*
                    530:  * mqtt_rtlm_read_subscribe() Get subscribe topic
                    531:  *
                    532:  * @cfg = loaded config
                    533:  * @sql = SQL handle
                    534:  * @topic = topic
                    535:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    536:  */
                    537: mqtt_subscr_t *
1.2.2.3   misho     538: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *topic)
1.2.2.2   misho     539: {
                    540:        int rowz = 0;
                    541:        char *str, szStmt[BUFSIZ] = { 0 };
                    542:        sqlite3_stmt *stmt;
                    543:        register int j;
                    544:        mqtt_subscr_t *s = NULL;
                    545: 
                    546:        if (!cfg || !sql || !topic)
                    547:                return NULL;
                    548: 
1.2.2.3   misho     549:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     550:        if (!str) {
                    551:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    552:                return NULL;
                    553:        }
                    554:        snprintf(szStmt, sizeof szStmt, "SELECT QoS, Topic FROM %s WHERE Topic LIKE '%s';", str, topic);
                    555: 
1.2.2.4   misho     556:        sqlite3_mutex_enter(sqlite3_db_mutex(sql));
1.2.2.2   misho     557:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    558:                MQTT_RTLM_LOG(sql);
1.2.2.4   misho     559:                sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2.2.2   misho     560:                return NULL;
                    561:        }
                    562: 
                    563:        /* calculate count of rows and allocate subscribe items */
                    564:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    565:                rowz++;
                    566:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    567:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    568:                goto end;
                    569:        } else
                    570:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    571:        sqlite3_reset(stmt);
                    572: 
                    573:        /* fill with data */
                    574:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    575:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
                    576:                s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
                    577:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                    578:                s[j].sub_value.msg_base = NULL;
                    579:                s[j].sub_value.msg_len = 0;
                    580:        }
                    581: end:
                    582:        sqlite3_finalize(stmt);
1.2.2.4   misho     583:        sqlite3_mutex_leave(sqlite3_db_mutex(sql));
1.2.2.2   misho     584: 
                    585:        return s;
                    586: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>