Annotation of mqtt/src/pubmqtt.c, revision 1.2.2.12

1.2       misho       1: #include "global.h"
                      2: 
                      3: 
                      4: extern const char sql_schema[];
                      5: 
                      6: 
                      7: /*
                      8:  * mqtt_db_log() Log database connection message
                      9:  *
                     10:  * @fmt = format string
                     11:  * @... = argument list
                     12:  * return: none
                     13:  */
                     14: static void
                     15: mqtt_rtlm_log(const char *fmt, ...)
                     16: {
                     17:        va_list lst;
                     18: 
                     19:        va_start(lst, fmt);
                     20:        vsyslog(LOG_ERR, fmt, lst);
                     21:        va_end(lst);
                     22: }
1.2.2.5   misho      23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
                     24:                                        __func__, __LINE__, \
1.2       misho      25:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     26: 
1.2.2.10  misho      27: /* library pre-loaded actions */
                     28: void
                     29: _init()
                     30: {
                     31:        sqlite3_initialize();
                     32: }
                     33: 
                     34: void
                     35: _fini()
                     36: {
                     37:        sqlite3_shutdown();
                     38: }
                     39: 
1.2       misho      40: 
                     41: /*
                     42:  * mqtt_rtlm_open() Open database connection
                     43:  *
                     44:  * @cfg = config filename
                     45:  * return: NULL error or SQL handle
                     46:  */
                     47: sqlite3 *
1.2.2.3   misho      48: mqtt_rtlm_open(cfg_root_t *cfg)
1.2       misho      49: {
                     50:        sqlite3 *sql = NULL;
                     51:        const char *str = NULL;
                     52: 
                     53:        if (!cfg)
                     54:                return NULL;
                     55: 
1.2.2.3   misho      56:        str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2       misho      57:        if (!str) {
                     58:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     59:                return NULL;
                     60:        }
                     61: 
                     62:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
                     63:                MQTT_RTLM_LOG(sql);
                     64:                sqlite3_close(sql);
                     65:                return NULL;
                     66:        }
                     67: 
                     68:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                     69:                MQTT_RTLM_LOG(sql);
                     70:                sqlite3_close(sql);
                     71:                return NULL;
                     72:        }
                     73:        return sql;
                     74: }
                     75: 
                     76: /*
                     77:  * mqtt_rtlm_close() Close database connection
                     78:  *
                     79:  * @sql = SQL handle
                     80:  * return: none
                     81:  */
                     82: void
                     83: mqtt_rtlm_close(sqlite3 *sql)
                     84: {
                     85:        sqlite3_close(sql);
                     86: }
                     87: 
                     88: /*
                     89:  * mqtt_rtlm_init_session() Create session
                     90:  *
                     91:  * @cfg = loaded config
                     92:  * @sql = SQL handle
                     93:  * @connid = connection id
                     94:  * @user = username
                     95:  * @host = hostname
                     96:  * @will = will flag if !=0 must fill arguments
                     97:  * @... = will arguments in order topic,msg,qos,retain
                     98:  * return: -1 error, 0 session already appears or >0 row changed
                     99:  */
                    100: int
1.2.2.3   misho     101: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
1.2       misho     102:                const char *host, char will, ...)
                    103: {
                    104:        va_list lst;
                    105:        int ret = 0;
1.2.2.11  misho     106:        char *str, *psStmt;
1.2       misho     107:        sqlite3_stmt *stmt;
                    108: 
                    109:        if (!cfg || !sql)
                    110:                return -1;
                    111: 
1.2.2.3   misho     112:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     113:        if (!str) {
                    114:                mqtt_rtlm_log("Error:: not found online table name");
                    115:                return -1;
                    116:        }
                    117:        if (!will)
1.2.2.11  misho     118:                psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
                    119:                                "WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
1.2       misho     120:        else {
                    121:                va_start(lst, will);
1.2.2.11  misho     122:                psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
1.2       misho     123:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
1.2.2.11  misho     124:                                "VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');", 
1.2       misho     125:                                str, connid, user, host, will, 
                    126:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                    127:                va_end(lst);
                    128:        }
                    129: 
1.2.2.11  misho     130:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     131:                MQTT_RTLM_LOG(sql);
1.2.2.11  misho     132:                sqlite3_free(psStmt);
1.2       misho     133:                return -1;
1.2.2.11  misho     134:        } else
                    135:                sqlite3_free(psStmt);
1.2       misho     136:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    137:                ret = sqlite3_changes(sql);
                    138:        else {
                    139:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    140:                        MQTT_RTLM_LOG(sql);
                    141:                ret = 0;
                    142:        }
                    143:        sqlite3_finalize(stmt);
                    144: 
                    145:        return ret;
                    146: }
                    147: 
                    148: /*
                    149:  * mqtt_rtlm_fini_session() Delete session(s)
                    150:  *
                    151:  * @cfg = loaded config
                    152:  * @sql = SQL handle
                    153:  * @connid = connection id
                    154:  * @user = username
                    155:  * @host = hostname
                    156:  * return: -1 error, 0 session already appears or >0 row changed
                    157:  */
                    158: int
1.2.2.3   misho     159: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     160: {
                    161:        int ret = 0;
1.2.2.11  misho     162:        char *str, *psStmt;
1.2       misho     163:        sqlite3_stmt *stmt;
                    164: 
                    165:        if (!cfg || !sql)
                    166:                return -1;
                    167: 
1.2.2.3   misho     168:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     169:        if (!str) {
                    170:                mqtt_rtlm_log("Error:: not found online table name");
                    171:                return -1;
                    172:        }
1.2.2.11  misho     173:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
                    174:                        "AND RemoteHost LIKE '%q';", str, connid, user, host);
1.2       misho     175: 
1.2.2.11  misho     176:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     177:                MQTT_RTLM_LOG(sql);
1.2.2.11  misho     178:                sqlite3_free(psStmt);
1.2       misho     179:                return -1;
1.2.2.11  misho     180:        } else
                    181:                sqlite3_free(psStmt);
1.2       misho     182:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    183:                ret = sqlite3_changes(sql);
                    184:        else {
                    185:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    186:                        MQTT_RTLM_LOG(sql);
                    187:                ret = 0;
                    188:        }
                    189:        sqlite3_finalize(stmt);
                    190: 
                    191:        return ret;
                    192: }
                    193: 
                    194: /*
                    195:  * mqtt_rtlm_chk_session() Check session(s)
                    196:  *
                    197:  * @cfg = loaded config
                    198:  * @sql = SQL handle
                    199:  * @connid = connection id
                    200:  * @user = username
                    201:  * @host = hostname
                    202:  * return: -1 error, 0 not logged or >0 logged found rows
                    203:  */
                    204: int
1.2.2.3   misho     205: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     206: {
                    207:        int ret = 0;
1.2.2.11  misho     208:        char *str, *psStmt;
1.2       misho     209:        sqlite3_stmt *stmt;
                    210: 
                    211:        if (!cfg || !sql)
                    212:                return -1;
                    213: 
1.2.2.3   misho     214:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     215:        if (!str) {
                    216:                mqtt_rtlm_log("Error:: not found online table name");
                    217:                return -1;
                    218:        }
1.2.2.11  misho     219:        psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
                    220:                        "ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';", 
1.2       misho     221:                        str, connid, user, host);
                    222: 
1.2.2.11  misho     223:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     224:                MQTT_RTLM_LOG(sql);
1.2.2.11  misho     225:                sqlite3_free(psStmt);
1.2       misho     226:                return -1;
1.2.2.11  misho     227:        } else
                    228:                sqlite3_free(psStmt);
1.2       misho     229:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    230:                ret = sqlite3_changes(sql);
                    231:        else
                    232:                ret = 0;
                    233:        sqlite3_finalize(stmt);
                    234: 
                    235:        return ret;
                    236: }
                    237: 
                    238: /*
                    239:  * mqtt_rtlm_write_topic() Publish topic
                    240:  *
                    241:  * @cfg = loaded config
                    242:  * @sql = SQL handle
1.2.2.8   misho     243:  * @connid = connection id
1.2       misho     244:  * @msgid = MessageID
                    245:  * @topic = topic
                    246:  * @txt = text
1.2.2.11  misho     247:  * @txtlen = text length
1.2       misho     248:  * @user = username
                    249:  * @host = hostname
1.2.2.12! misho     250:  * @qos = QoS
1.2       misho     251:  * @retain = !=0 retain message to database
                    252:  * return: -1 error, 0 no publish or >0 published ok
                    253:  */
                    254: int
1.2.2.8   misho     255: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
1.2.2.12! misho     256:                const char *topic, void *txt, int txtlen, const char *user, 
        !           257:                const char *host, char qos, char retain)
1.2       misho     258: {
                    259:        int ret = 0;
1.2.2.11  misho     260:        char *str, *psStmt;
1.2       misho     261:        sqlite3_stmt *stmt;
                    262: 
                    263:        if (!cfg || !sql || !topic)
                    264:                return -1;
                    265: 
1.2.2.3   misho     266:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     267:        if (!str) {
                    268:                mqtt_rtlm_log("Error:: not found topics table name");
                    269:                return -1;
                    270:        }
1.2.2.12! misho     271:        psStmt = sqlite3_mprintf("INSERT INTO %s (QoS, Retain, ConnID, MsgID, Topic, Value, PubUser, "
        !           272:                        "PubDate, PubHost) VALUES (%d, %d, '%q', %u, '%q', ?1, '%q', "
1.2.2.11  misho     273:                        "datetime('now', 'localtime'), '%q');", 
1.2.2.12! misho     274:                        str, qos, retain, connid, msgid, topic, user, host);
1.2.2.8   misho     275: 
1.2.2.12! misho     276:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL) || !stmt) {
1.2.2.8   misho     277:                MQTT_RTLM_LOG(sql);
1.2.2.11  misho     278:                sqlite3_free(psStmt);
                    279:                return -1;
                    280:        } else
                    281:                sqlite3_free(psStmt);
                    282:        if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
                    283:                MQTT_RTLM_LOG(sql);
                    284:                sqlite3_finalize(stmt);
1.2.2.8   misho     285:                return -1;
                    286:        }
                    287:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    288:                ret = sqlite3_changes(sql);
                    289:        else {
                    290:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    291:                        MQTT_RTLM_LOG(sql);
                    292:                ret = 0;
                    293:        }
                    294:        sqlite3_finalize(stmt);
                    295: 
                    296:        return ret;
                    297: }
                    298: 
                    299: /*
                    300:  * mqtt_rtlm_wipe_topic() Wipe all topics
                    301:  *
                    302:  * @cfg = loaded config
                    303:  * @sql = SQL handle
                    304:  * @connid = connection id
                    305:  * @user = username
                    306:  * @retain = -1 no matter
                    307:  * return: -1 error, 0 no changes or >0 deleted rows
                    308:  */
                    309: int
                    310: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
                    311: {
                    312:        int ret = 0;
1.2.2.11  misho     313:        char *str, *rtn, *psStmt;
1.2.2.8   misho     314:        sqlite3_stmt *stmt;
                    315: 
                    316:        if (!cfg || !sql || !connid)
                    317:                return -1;
                    318: 
                    319:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
                    320:        if (!str) {
                    321:                mqtt_rtlm_log("Error:: not found topics table name");
                    322:                return -1;
                    323:        }
                    324:        switch (retain) {
                    325:                case -1:
                    326:                        rtn = "";
                    327:                        break;
                    328:                case 0:
                    329:                        rtn = "AND Retain = 0";
                    330:                        break;
                    331:                default:
                    332:                        rtn = "AND Retain != 0";
                    333:                        break;
                    334:        }
1.2.2.11  misho     335:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
                    336:                        "PubUser LIKE '%q' %s;", str, connid, user, rtn);
1.2       misho     337: 
1.2.2.11  misho     338:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     339:                MQTT_RTLM_LOG(sql);
1.2.2.11  misho     340:                sqlite3_free(psStmt);
1.2       misho     341:                return -1;
1.2.2.11  misho     342:        } else
                    343:                sqlite3_free(psStmt);
1.2       misho     344:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    345:                ret = sqlite3_changes(sql);
                    346:        else {
                    347:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    348:                        MQTT_RTLM_LOG(sql);
                    349:                ret = 0;
                    350:        }
                    351:        sqlite3_finalize(stmt);
                    352: 
                    353:        return ret;
                    354: }
                    355: 
                    356: /*
                    357:  * mqtt_rtlm_delete_topic() Delete topic
                    358:  *
                    359:  * @cfg = loaded config
                    360:  * @sql = SQL handle
1.2.2.8   misho     361:  * @connid = connection id
1.2       misho     362:  * @msgid = MessageID
                    363:  * @topic = topic
                    364:  * @user = username
                    365:  * @host = hostname
                    366:  * @retain = -1 no matter
                    367:  * return: -1 error, 0 no changes or >0 deleted rows
                    368:  */
                    369: int
1.2.2.8   misho     370: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    371:                const char *topic, const char *user, const char *host, char retain)
1.2       misho     372: {
                    373:        int ret = 0;
1.2.2.11  misho     374:        char *str, *rtn, *psStmt;
1.2       misho     375:        sqlite3_stmt *stmt;
                    376: 
                    377:        if (!cfg || !sql || !topic)
                    378:                return -1;
                    379: 
1.2.2.3   misho     380:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     381:        if (!str) {
                    382:                mqtt_rtlm_log("Error:: not found topics table name");
                    383:                return -1;
                    384:        }
                    385:        switch (retain) {
                    386:                case -1:
                    387:                        rtn = "";
                    388:                        break;
                    389:                case 0:
                    390:                        rtn = "AND Retain = 0";
                    391:                        break;
                    392:                default:
                    393:                        rtn = "AND Retain != 0";
                    394:                        break;
                    395:        }
1.2.2.11  misho     396:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
                    397:                        "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str, 
1.2.2.8   misho     398:                        connid, msgid, topic, user, host, rtn);
1.2       misho     399: 
1.2.2.11  misho     400:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     401:                MQTT_RTLM_LOG(sql);
1.2.2.11  misho     402:                sqlite3_free(psStmt);
1.2       misho     403:                return -1;
1.2.2.11  misho     404:        } else
                    405:                sqlite3_free(psStmt);
1.2       misho     406:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    407:                ret = sqlite3_changes(sql);
                    408:        else {
                    409:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    410:                        MQTT_RTLM_LOG(sql);
                    411:                ret = 0;
                    412:        }
                    413:        sqlite3_finalize(stmt);
                    414: 
                    415:        return ret;
                    416: }
                    417: 
                    418: /*
                    419:  * mqtt_rtlm_read_topic() Get topic
                    420:  *
                    421:  * @cfg = loaded config
                    422:  * @sql = SQL handle
1.2.2.8   misho     423:  * @connid = connection id
1.2       misho     424:  * @msgid = MessageID
                    425:  * @topic = topic
                    426:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    427:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    428:  */
                    429: mqtt_subscr_t *
1.2.2.8   misho     430: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    431:                const char *topic, char retain)
1.2       misho     432: {
                    433:        int rowz = 0;
1.2.2.11  misho     434:        char *str, szStr[STRSIZ], *psStmt;
1.2       misho     435:        sqlite3_stmt *stmt;
                    436:        register int j;
                    437:        mqtt_subscr_t *s = NULL;
1.2.2.11  misho     438:        ait_val_t v;
1.2       misho     439: 
                    440:        if (!cfg || !sql || !topic)
                    441:                return NULL;
                    442: 
                    443:        switch (retain) {
                    444:                case -1:
                    445:                        memset(szStr, 0, sizeof szStr);
                    446:                        break;
                    447:                case 0:
                    448:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    449:                        break;
                    450:                default:
                    451:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    452:                        break;
                    453:        }
                    454: 
1.2.2.3   misho     455:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     456:        if (!str) {
                    457:                mqtt_rtlm_log("Error:: not found topics table name");
                    458:                return NULL;
                    459:        }
1.2.2.11  misho     460:        psStmt = sqlite3_mprintf("SELECT Retain, Topic, Value FROM %s WHERE "
                    461:                        "ConnID = '%q' AND MsgID = %d AND Topic LIKE '%q' %s;", 
1.2.2.8   misho     462:                        str, connid, msgid, topic, szStr);
1.2       misho     463: 
1.2.2.11  misho     464:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     465:                MQTT_RTLM_LOG(sql);
1.2.2.11  misho     466:                sqlite3_free(psStmt);
1.2       misho     467:                return NULL;
1.2.2.11  misho     468:        } else
                    469:                sqlite3_free(psStmt);
1.2       misho     470: 
                    471:        /* calculate count of rows and allocate subscribe items */
                    472:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    473:                rowz++;
1.2.2.9   misho     474:        if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
1.2       misho     475:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    476:                goto end;
                    477:        } else
                    478:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    479:        sqlite3_reset(stmt);
                    480: 
                    481:        /* fill with data */
                    482:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    483:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.9   misho     484:                s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
1.2.2.1   misho     485:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
1.2.2.11  misho     486:                AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
                    487:                s[j].sub_value.msg_len = AIT_LEN(&v);
                    488:                s[j].sub_value.msg_base = (u_char*) io_malloc(s[j].sub_value.msg_len);
                    489:                if (s[j].sub_value.msg_base)
                    490:                        memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
1.2       misho     491:        }
                    492: end:
                    493:        sqlite3_finalize(stmt);
                    494: 
                    495:        return s;
                    496: }
1.2.2.2   misho     497: 
                    498: /*
                    499:  * mqtt_rtlm_write_subscribe() Subscribe topic
                    500:  *
                    501:  * @cfg = loaded config
                    502:  * @sql = SQL handle
1.2.2.8   misho     503:  * @connid = connection id
1.2.2.2   misho     504:  * @msgid = MessageID
                    505:  * @topic = topic
                    506:  * @user = username
                    507:  * @host = hostname
                    508:  * @qos = Subscribe QoS
                    509:  * return: -1 error, 0 no publish or >0 published ok
                    510:  */
                    511: int
1.2.2.8   misho     512: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    513:                const char *topic, const char *user, const char *host, char qos)
1.2.2.2   misho     514: {
                    515:        int ret = 0;
1.2.2.11  misho     516:        char *str, *psStmt;
1.2.2.2   misho     517:        sqlite3_stmt *stmt;
                    518: 
                    519:        if (!cfg || !sql || !topic)
                    520:                return -1;
                    521: 
1.2.2.3   misho     522:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     523:        if (!str) {
                    524:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    525:                return -1;
                    526:        }
1.2.2.11  misho     527:        psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
                    528:                        "PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
                    529:                        "datetime('now', 'localtime'), '%q');", str, 
1.2.2.8   misho     530:                        connid, msgid, qos, topic, user, host);
1.2.2.2   misho     531: 
1.2.2.11  misho     532:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.2   misho     533:                MQTT_RTLM_LOG(sql);
1.2.2.11  misho     534:                sqlite3_free(psStmt);
1.2.2.2   misho     535:                return -1;
1.2.2.11  misho     536:        } else
                    537:                sqlite3_free(psStmt);
1.2.2.2   misho     538:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    539:                ret = sqlite3_changes(sql);
                    540:        else {
                    541:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    542:                        MQTT_RTLM_LOG(sql);
                    543:                ret = 0;
                    544:        }
                    545:        sqlite3_finalize(stmt);
                    546: 
                    547:        return ret;
                    548: }
                    549: 
                    550: /*
                    551:  * mqtt_rtlm_delete_subscribe() Delete subscribe
                    552:  *
                    553:  * @cfg = loaded config
                    554:  * @sql = SQL handle
1.2.2.8   misho     555:  * @connid = connection id
1.2.2.2   misho     556:  * @topic = topic
                    557:  * @user = username
                    558:  * @host = hostname
                    559:  * return: -1 error, 0 no changes or >0 deleted rows
                    560:  */
                    561: int
1.2.2.8   misho     562: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
                    563:                const char *topic, const char *user, const char *host)
1.2.2.2   misho     564: {
                    565:        int ret = 0;
1.2.2.11  misho     566:        char *str, *psStmt;
1.2.2.2   misho     567:        sqlite3_stmt *stmt;
                    568: 
                    569:        if (!cfg || !sql || !topic)
                    570:                return -1;
                    571: 
1.2.2.3   misho     572:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     573:        if (!str) {
                    574:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    575:                return -1;
                    576:        }
1.2.2.11  misho     577:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
                    578:                        "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str, 
1.2.2.8   misho     579:                        connid, topic, user, host);
1.2.2.2   misho     580: 
1.2.2.11  misho     581:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.2   misho     582:                MQTT_RTLM_LOG(sql);
1.2.2.11  misho     583:                sqlite3_free(psStmt);
1.2.2.2   misho     584:                return -1;
1.2.2.11  misho     585:        } else
                    586:                sqlite3_free(psStmt);
1.2.2.2   misho     587:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    588:                ret = sqlite3_changes(sql);
                    589:        else {
                    590:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    591:                        MQTT_RTLM_LOG(sql);
                    592:                ret = 0;
                    593:        }
                    594:        sqlite3_finalize(stmt);
                    595: 
                    596:        return ret;
                    597: }
                    598: 
                    599: /*
                    600:  * mqtt_rtlm_read_subscribe() Get subscribe topic
                    601:  *
                    602:  * @cfg = loaded config
                    603:  * @sql = SQL handle
1.2.2.8   misho     604:  * @connid = connection id
1.2.2.2   misho     605:  * @topic = topic
                    606:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    607:  */
                    608: mqtt_subscr_t *
1.2.2.8   misho     609: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
1.2.2.2   misho     610: {
                    611:        int rowz = 0;
1.2.2.11  misho     612:        char *str, *psStmt;
1.2.2.2   misho     613:        sqlite3_stmt *stmt;
                    614:        register int j;
                    615:        mqtt_subscr_t *s = NULL;
                    616: 
                    617:        if (!cfg || !sql || !topic)
                    618:                return NULL;
                    619: 
1.2.2.3   misho     620:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     621:        if (!str) {
                    622:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    623:                return NULL;
                    624:        }
1.2.2.11  misho     625:        psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
                    626:                        "Topic LIKE '%q';", str, connid, topic);
1.2.2.2   misho     627: 
1.2.2.11  misho     628:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.2   misho     629:                MQTT_RTLM_LOG(sql);
1.2.2.11  misho     630:                sqlite3_free(psStmt);
1.2.2.2   misho     631:                return NULL;
1.2.2.11  misho     632:        } else
                    633:                sqlite3_free(psStmt);
1.2.2.2   misho     634: 
                    635:        /* calculate count of rows and allocate subscribe items */
                    636:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    637:                rowz++;
1.2.2.9   misho     638:        if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
1.2.2.2   misho     639:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    640:                goto end;
                    641:        } else
                    642:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    643:        sqlite3_reset(stmt);
                    644: 
                    645:        /* fill with data */
                    646:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    647:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.9   misho     648:                s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
1.2.2.2   misho     649:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                    650:                s[j].sub_value.msg_base = NULL;
                    651:                s[j].sub_value.msg_len = 0;
                    652:        }
                    653: end:
                    654:        sqlite3_finalize(stmt);
                    655: 
                    656:        return s;
                    657: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>