Annotation of mqtt/src/pubmqtt.c, revision 1.2.2.11

1.2       misho       1: #include "global.h"
                      2: 
                      3: 
                      4: extern const char sql_schema[];
                      5: 
                      6: 
                      7: /*
                      8:  * mqtt_db_log() Log database connection message
                      9:  *
                     10:  * @fmt = format string
                     11:  * @... = argument list
                     12:  * return: none
                     13:  */
                     14: static void
                     15: mqtt_rtlm_log(const char *fmt, ...)
                     16: {
                     17:        va_list lst;
                     18: 
                     19:        va_start(lst, fmt);
                     20:        vsyslog(LOG_ERR, fmt, lst);
                     21:        va_end(lst);
                     22: }
1.2.2.5   misho      23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
                     24:                                        __func__, __LINE__, \
1.2       misho      25:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     26: 
1.2.2.10  misho      27: /* library pre-loaded actions */
                     28: void
                     29: _init()
                     30: {
                     31:        sqlite3_initialize();
                     32: }
                     33: 
                     34: void
                     35: _fini()
                     36: {
                     37:        sqlite3_shutdown();
                     38: }
                     39: 
1.2       misho      40: 
                     41: /*
                     42:  * mqtt_rtlm_open() Open database connection
                     43:  *
                     44:  * @cfg = config filename
                     45:  * return: NULL error or SQL handle
                     46:  */
                     47: sqlite3 *
1.2.2.3   misho      48: mqtt_rtlm_open(cfg_root_t *cfg)
1.2       misho      49: {
                     50:        sqlite3 *sql = NULL;
                     51:        const char *str = NULL;
                     52: 
                     53:        if (!cfg)
                     54:                return NULL;
                     55: 
1.2.2.3   misho      56:        str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2       misho      57:        if (!str) {
                     58:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     59:                return NULL;
                     60:        }
                     61: 
                     62:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
                     63:                MQTT_RTLM_LOG(sql);
                     64:                sqlite3_close(sql);
                     65:                return NULL;
                     66:        }
                     67: 
                     68:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                     69:                MQTT_RTLM_LOG(sql);
                     70:                sqlite3_close(sql);
                     71:                return NULL;
                     72:        }
                     73:        return sql;
                     74: }
                     75: 
                     76: /*
                     77:  * mqtt_rtlm_close() Close database connection
                     78:  *
                     79:  * @sql = SQL handle
                     80:  * return: none
                     81:  */
                     82: void
                     83: mqtt_rtlm_close(sqlite3 *sql)
                     84: {
                     85:        sqlite3_close(sql);
                     86: }
                     87: 
                     88: /*
                     89:  * mqtt_rtlm_init_session() Create session
                     90:  *
                     91:  * @cfg = loaded config
                     92:  * @sql = SQL handle
                     93:  * @connid = connection id
                     94:  * @user = username
                     95:  * @host = hostname
                     96:  * @will = will flag if !=0 must fill arguments
                     97:  * @... = will arguments in order topic,msg,qos,retain
                     98:  * return: -1 error, 0 session already appears or >0 row changed
                     99:  */
                    100: int
1.2.2.3   misho     101: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
1.2       misho     102:                const char *host, char will, ...)
                    103: {
                    104:        va_list lst;
                    105:        int ret = 0;
1.2.2.11! misho     106:        char *str, *psStmt;
1.2       misho     107:        sqlite3_stmt *stmt;
                    108: 
                    109:        if (!cfg || !sql)
                    110:                return -1;
                    111: 
1.2.2.3   misho     112:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     113:        if (!str) {
                    114:                mqtt_rtlm_log("Error:: not found online table name");
                    115:                return -1;
                    116:        }
                    117:        if (!will)
1.2.2.11! misho     118:                psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
        !           119:                                "WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
1.2       misho     120:        else {
                    121:                va_start(lst, will);
1.2.2.11! misho     122:                psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
1.2       misho     123:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
1.2.2.11! misho     124:                                "VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');", 
1.2       misho     125:                                str, connid, user, host, will, 
                    126:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                    127:                va_end(lst);
                    128:        }
                    129: 
1.2.2.11! misho     130:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     131:                MQTT_RTLM_LOG(sql);
1.2.2.11! misho     132:                sqlite3_free(psStmt);
1.2       misho     133:                return -1;
1.2.2.11! misho     134:        } else
        !           135:                sqlite3_free(psStmt);
1.2       misho     136:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    137:                ret = sqlite3_changes(sql);
                    138:        else {
                    139:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    140:                        MQTT_RTLM_LOG(sql);
                    141:                ret = 0;
                    142:        }
                    143:        sqlite3_finalize(stmt);
                    144: 
                    145:        return ret;
                    146: }
                    147: 
                    148: /*
                    149:  * mqtt_rtlm_fini_session() Delete session(s)
                    150:  *
                    151:  * @cfg = loaded config
                    152:  * @sql = SQL handle
                    153:  * @connid = connection id
                    154:  * @user = username
                    155:  * @host = hostname
                    156:  * return: -1 error, 0 session already appears or >0 row changed
                    157:  */
                    158: int
1.2.2.3   misho     159: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     160: {
                    161:        int ret = 0;
1.2.2.11! misho     162:        char *str, *psStmt;
1.2       misho     163:        sqlite3_stmt *stmt;
                    164: 
                    165:        if (!cfg || !sql)
                    166:                return -1;
                    167: 
1.2.2.3   misho     168:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     169:        if (!str) {
                    170:                mqtt_rtlm_log("Error:: not found online table name");
                    171:                return -1;
                    172:        }
1.2.2.11! misho     173:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
        !           174:                        "AND RemoteHost LIKE '%q';", str, connid, user, host);
1.2       misho     175: 
1.2.2.11! misho     176:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     177:                MQTT_RTLM_LOG(sql);
1.2.2.11! misho     178:                sqlite3_free(psStmt);
1.2       misho     179:                return -1;
1.2.2.11! misho     180:        } else
        !           181:                sqlite3_free(psStmt);
1.2       misho     182:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    183:                ret = sqlite3_changes(sql);
                    184:        else {
                    185:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    186:                        MQTT_RTLM_LOG(sql);
                    187:                ret = 0;
                    188:        }
                    189:        sqlite3_finalize(stmt);
                    190: 
                    191:        return ret;
                    192: }
                    193: 
                    194: /*
                    195:  * mqtt_rtlm_chk_session() Check session(s)
                    196:  *
                    197:  * @cfg = loaded config
                    198:  * @sql = SQL handle
                    199:  * @connid = connection id
                    200:  * @user = username
                    201:  * @host = hostname
                    202:  * return: -1 error, 0 not logged or >0 logged found rows
                    203:  */
                    204: int
1.2.2.3   misho     205: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     206: {
                    207:        int ret = 0;
1.2.2.11! misho     208:        char *str, *psStmt;
1.2       misho     209:        sqlite3_stmt *stmt;
                    210: 
                    211:        if (!cfg || !sql)
                    212:                return -1;
                    213: 
1.2.2.3   misho     214:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     215:        if (!str) {
                    216:                mqtt_rtlm_log("Error:: not found online table name");
                    217:                return -1;
                    218:        }
1.2.2.11! misho     219:        psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
        !           220:                        "ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';", 
1.2       misho     221:                        str, connid, user, host);
                    222: 
1.2.2.11! misho     223:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     224:                MQTT_RTLM_LOG(sql);
1.2.2.11! misho     225:                sqlite3_free(psStmt);
1.2       misho     226:                return -1;
1.2.2.11! misho     227:        } else
        !           228:                sqlite3_free(psStmt);
1.2       misho     229:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    230:                ret = sqlite3_changes(sql);
                    231:        else
                    232:                ret = 0;
                    233:        sqlite3_finalize(stmt);
                    234: 
                    235:        return ret;
                    236: }
                    237: 
                    238: /*
                    239:  * mqtt_rtlm_write_topic() Publish topic
                    240:  *
                    241:  * @cfg = loaded config
                    242:  * @sql = SQL handle
1.2.2.8   misho     243:  * @connid = connection id
1.2       misho     244:  * @msgid = MessageID
                    245:  * @topic = topic
                    246:  * @txt = text
1.2.2.11! misho     247:  * @txtlen = text length
1.2       misho     248:  * @user = username
                    249:  * @host = hostname
                    250:  * @retain = !=0 retain message to database
                    251:  * return: -1 error, 0 no publish or >0 published ok
                    252:  */
                    253: int
1.2.2.8   misho     254: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
1.2.2.11! misho     255:                const char *topic, void *txt, int txtlen, const char *user, const char *host, char retain)
1.2       misho     256: {
                    257:        int ret = 0;
1.2.2.11! misho     258:        char *str, *psStmt;
1.2       misho     259:        sqlite3_stmt *stmt;
                    260: 
                    261:        if (!cfg || !sql || !topic)
                    262:                return -1;
                    263: 
1.2.2.3   misho     264:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     265:        if (!str) {
                    266:                mqtt_rtlm_log("Error:: not found topics table name");
                    267:                return -1;
                    268:        }
1.2.2.11! misho     269:        psStmt = sqlite3_mprintf("INSERT INTO %s (Retain, ConnID, MsgID, Topic, Value, PubUser, "
        !           270:                        "PubDate, PubHost) VALUES (%d, '%q', %u, '%q', ?1, '%q', "
        !           271:                        "datetime('now', 'localtime'), '%q');", 
1.2.2.8   misho     272:                        str, retain, connid, msgid, topic, txt, user, host);
                    273: 
1.2.2.11! misho     274:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.8   misho     275:                MQTT_RTLM_LOG(sql);
1.2.2.11! misho     276:                sqlite3_free(psStmt);
        !           277:                return -1;
        !           278:        } else
        !           279:                sqlite3_free(psStmt);
        !           280:        if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
        !           281:                MQTT_RTLM_LOG(sql);
        !           282:                sqlite3_finalize(stmt);
1.2.2.8   misho     283:                return -1;
                    284:        }
                    285:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    286:                ret = sqlite3_changes(sql);
                    287:        else {
                    288:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    289:                        MQTT_RTLM_LOG(sql);
                    290:                ret = 0;
                    291:        }
                    292:        sqlite3_finalize(stmt);
                    293: 
                    294:        return ret;
                    295: }
                    296: 
                    297: /*
                    298:  * mqtt_rtlm_wipe_topic() Wipe all topics
                    299:  *
                    300:  * @cfg = loaded config
                    301:  * @sql = SQL handle
                    302:  * @connid = connection id
                    303:  * @user = username
                    304:  * @retain = -1 no matter
                    305:  * return: -1 error, 0 no changes or >0 deleted rows
                    306:  */
                    307: int
                    308: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
                    309: {
                    310:        int ret = 0;
1.2.2.11! misho     311:        char *str, *rtn, *psStmt;
1.2.2.8   misho     312:        sqlite3_stmt *stmt;
                    313: 
                    314:        if (!cfg || !sql || !connid)
                    315:                return -1;
                    316: 
                    317:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
                    318:        if (!str) {
                    319:                mqtt_rtlm_log("Error:: not found topics table name");
                    320:                return -1;
                    321:        }
                    322:        switch (retain) {
                    323:                case -1:
                    324:                        rtn = "";
                    325:                        break;
                    326:                case 0:
                    327:                        rtn = "AND Retain = 0";
                    328:                        break;
                    329:                default:
                    330:                        rtn = "AND Retain != 0";
                    331:                        break;
                    332:        }
1.2.2.11! misho     333:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
        !           334:                        "PubUser LIKE '%q' %s;", str, connid, user, rtn);
1.2       misho     335: 
1.2.2.11! misho     336:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     337:                MQTT_RTLM_LOG(sql);
1.2.2.11! misho     338:                sqlite3_free(psStmt);
1.2       misho     339:                return -1;
1.2.2.11! misho     340:        } else
        !           341:                sqlite3_free(psStmt);
1.2       misho     342:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    343:                ret = sqlite3_changes(sql);
                    344:        else {
                    345:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    346:                        MQTT_RTLM_LOG(sql);
                    347:                ret = 0;
                    348:        }
                    349:        sqlite3_finalize(stmt);
                    350: 
                    351:        return ret;
                    352: }
                    353: 
                    354: /*
                    355:  * mqtt_rtlm_delete_topic() Delete topic
                    356:  *
                    357:  * @cfg = loaded config
                    358:  * @sql = SQL handle
1.2.2.8   misho     359:  * @connid = connection id
1.2       misho     360:  * @msgid = MessageID
                    361:  * @topic = topic
                    362:  * @user = username
                    363:  * @host = hostname
                    364:  * @retain = -1 no matter
                    365:  * return: -1 error, 0 no changes or >0 deleted rows
                    366:  */
                    367: int
1.2.2.8   misho     368: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    369:                const char *topic, const char *user, const char *host, char retain)
1.2       misho     370: {
                    371:        int ret = 0;
1.2.2.11! misho     372:        char *str, *rtn, *psStmt;
1.2       misho     373:        sqlite3_stmt *stmt;
                    374: 
                    375:        if (!cfg || !sql || !topic)
                    376:                return -1;
                    377: 
1.2.2.3   misho     378:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     379:        if (!str) {
                    380:                mqtt_rtlm_log("Error:: not found topics table name");
                    381:                return -1;
                    382:        }
                    383:        switch (retain) {
                    384:                case -1:
                    385:                        rtn = "";
                    386:                        break;
                    387:                case 0:
                    388:                        rtn = "AND Retain = 0";
                    389:                        break;
                    390:                default:
                    391:                        rtn = "AND Retain != 0";
                    392:                        break;
                    393:        }
1.2.2.11! misho     394:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
        !           395:                        "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str, 
1.2.2.8   misho     396:                        connid, msgid, topic, user, host, rtn);
1.2       misho     397: 
1.2.2.11! misho     398:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     399:                MQTT_RTLM_LOG(sql);
1.2.2.11! misho     400:                sqlite3_free(psStmt);
1.2       misho     401:                return -1;
1.2.2.11! misho     402:        } else
        !           403:                sqlite3_free(psStmt);
1.2       misho     404:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    405:                ret = sqlite3_changes(sql);
                    406:        else {
                    407:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    408:                        MQTT_RTLM_LOG(sql);
                    409:                ret = 0;
                    410:        }
                    411:        sqlite3_finalize(stmt);
                    412: 
                    413:        return ret;
                    414: }
                    415: 
                    416: /*
                    417:  * mqtt_rtlm_read_topic() Get topic
                    418:  *
                    419:  * @cfg = loaded config
                    420:  * @sql = SQL handle
1.2.2.8   misho     421:  * @connid = connection id
1.2       misho     422:  * @msgid = MessageID
                    423:  * @topic = topic
                    424:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    425:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    426:  */
                    427: mqtt_subscr_t *
1.2.2.8   misho     428: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    429:                const char *topic, char retain)
1.2       misho     430: {
                    431:        int rowz = 0;
1.2.2.11! misho     432:        char *str, szStr[STRSIZ], *psStmt;
1.2       misho     433:        sqlite3_stmt *stmt;
                    434:        register int j;
                    435:        mqtt_subscr_t *s = NULL;
1.2.2.11! misho     436:        ait_val_t v;
1.2       misho     437: 
                    438:        if (!cfg || !sql || !topic)
                    439:                return NULL;
                    440: 
                    441:        switch (retain) {
                    442:                case -1:
                    443:                        memset(szStr, 0, sizeof szStr);
                    444:                        break;
                    445:                case 0:
                    446:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    447:                        break;
                    448:                default:
                    449:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    450:                        break;
                    451:        }
                    452: 
1.2.2.3   misho     453:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     454:        if (!str) {
                    455:                mqtt_rtlm_log("Error:: not found topics table name");
                    456:                return NULL;
                    457:        }
1.2.2.11! misho     458:        psStmt = sqlite3_mprintf("SELECT Retain, Topic, Value FROM %s WHERE "
        !           459:                        "ConnID = '%q' AND MsgID = %d AND Topic LIKE '%q' %s;", 
1.2.2.8   misho     460:                        str, connid, msgid, topic, szStr);
1.2       misho     461: 
1.2.2.11! misho     462:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     463:                MQTT_RTLM_LOG(sql);
1.2.2.11! misho     464:                sqlite3_free(psStmt);
1.2       misho     465:                return NULL;
1.2.2.11! misho     466:        } else
        !           467:                sqlite3_free(psStmt);
1.2       misho     468: 
                    469:        /* calculate count of rows and allocate subscribe items */
                    470:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    471:                rowz++;
1.2.2.9   misho     472:        if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
1.2       misho     473:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    474:                goto end;
                    475:        } else
                    476:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    477:        sqlite3_reset(stmt);
                    478: 
                    479:        /* fill with data */
                    480:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    481:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.9   misho     482:                s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
1.2.2.1   misho     483:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
1.2.2.11! misho     484:                AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
        !           485:                s[j].sub_value.msg_len = AIT_LEN(&v);
        !           486:                s[j].sub_value.msg_base = (u_char*) io_malloc(s[j].sub_value.msg_len);
        !           487:                if (s[j].sub_value.msg_base)
        !           488:                        memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
1.2       misho     489:        }
                    490: end:
                    491:        sqlite3_finalize(stmt);
                    492: 
                    493:        return s;
                    494: }
1.2.2.2   misho     495: 
                    496: /*
                    497:  * mqtt_rtlm_write_subscribe() Subscribe topic
                    498:  *
                    499:  * @cfg = loaded config
                    500:  * @sql = SQL handle
1.2.2.8   misho     501:  * @connid = connection id
1.2.2.2   misho     502:  * @msgid = MessageID
                    503:  * @topic = topic
                    504:  * @user = username
                    505:  * @host = hostname
                    506:  * @qos = Subscribe QoS
                    507:  * return: -1 error, 0 no publish or >0 published ok
                    508:  */
                    509: int
1.2.2.8   misho     510: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    511:                const char *topic, const char *user, const char *host, char qos)
1.2.2.2   misho     512: {
                    513:        int ret = 0;
1.2.2.11! misho     514:        char *str, *psStmt;
1.2.2.2   misho     515:        sqlite3_stmt *stmt;
                    516: 
                    517:        if (!cfg || !sql || !topic)
                    518:                return -1;
                    519: 
1.2.2.3   misho     520:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     521:        if (!str) {
                    522:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    523:                return -1;
                    524:        }
1.2.2.11! misho     525:        psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
        !           526:                        "PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
        !           527:                        "datetime('now', 'localtime'), '%q');", str, 
1.2.2.8   misho     528:                        connid, msgid, qos, topic, user, host);
1.2.2.2   misho     529: 
1.2.2.11! misho     530:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.2   misho     531:                MQTT_RTLM_LOG(sql);
1.2.2.11! misho     532:                sqlite3_free(psStmt);
1.2.2.2   misho     533:                return -1;
1.2.2.11! misho     534:        } else
        !           535:                sqlite3_free(psStmt);
1.2.2.2   misho     536:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    537:                ret = sqlite3_changes(sql);
                    538:        else {
                    539:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    540:                        MQTT_RTLM_LOG(sql);
                    541:                ret = 0;
                    542:        }
                    543:        sqlite3_finalize(stmt);
                    544: 
                    545:        return ret;
                    546: }
                    547: 
                    548: /*
                    549:  * mqtt_rtlm_delete_subscribe() Delete subscribe
                    550:  *
                    551:  * @cfg = loaded config
                    552:  * @sql = SQL handle
1.2.2.8   misho     553:  * @connid = connection id
1.2.2.2   misho     554:  * @topic = topic
                    555:  * @user = username
                    556:  * @host = hostname
                    557:  * return: -1 error, 0 no changes or >0 deleted rows
                    558:  */
                    559: int
1.2.2.8   misho     560: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
                    561:                const char *topic, const char *user, const char *host)
1.2.2.2   misho     562: {
                    563:        int ret = 0;
1.2.2.11! misho     564:        char *str, *psStmt;
1.2.2.2   misho     565:        sqlite3_stmt *stmt;
                    566: 
                    567:        if (!cfg || !sql || !topic)
                    568:                return -1;
                    569: 
1.2.2.3   misho     570:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     571:        if (!str) {
                    572:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    573:                return -1;
                    574:        }
1.2.2.11! misho     575:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
        !           576:                        "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str, 
1.2.2.8   misho     577:                        connid, topic, user, host);
1.2.2.2   misho     578: 
1.2.2.11! misho     579:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.2   misho     580:                MQTT_RTLM_LOG(sql);
1.2.2.11! misho     581:                sqlite3_free(psStmt);
1.2.2.2   misho     582:                return -1;
1.2.2.11! misho     583:        } else
        !           584:                sqlite3_free(psStmt);
1.2.2.2   misho     585:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    586:                ret = sqlite3_changes(sql);
                    587:        else {
                    588:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    589:                        MQTT_RTLM_LOG(sql);
                    590:                ret = 0;
                    591:        }
                    592:        sqlite3_finalize(stmt);
                    593: 
                    594:        return ret;
                    595: }
                    596: 
                    597: /*
                    598:  * mqtt_rtlm_read_subscribe() Get subscribe topic
                    599:  *
                    600:  * @cfg = loaded config
                    601:  * @sql = SQL handle
1.2.2.8   misho     602:  * @connid = connection id
1.2.2.2   misho     603:  * @topic = topic
                    604:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    605:  */
                    606: mqtt_subscr_t *
1.2.2.8   misho     607: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
1.2.2.2   misho     608: {
                    609:        int rowz = 0;
1.2.2.11! misho     610:        char *str, *psStmt;
1.2.2.2   misho     611:        sqlite3_stmt *stmt;
                    612:        register int j;
                    613:        mqtt_subscr_t *s = NULL;
                    614: 
                    615:        if (!cfg || !sql || !topic)
                    616:                return NULL;
                    617: 
1.2.2.3   misho     618:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
1.2.2.2   misho     619:        if (!str) {
                    620:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    621:                return NULL;
                    622:        }
1.2.2.11! misho     623:        psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
        !           624:                        "Topic LIKE '%q';", str, connid, topic);
1.2.2.2   misho     625: 
1.2.2.11! misho     626:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2.2.2   misho     627:                MQTT_RTLM_LOG(sql);
1.2.2.11! misho     628:                sqlite3_free(psStmt);
1.2.2.2   misho     629:                return NULL;
1.2.2.11! misho     630:        } else
        !           631:                sqlite3_free(psStmt);
1.2.2.2   misho     632: 
                    633:        /* calculate count of rows and allocate subscribe items */
                    634:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    635:                rowz++;
1.2.2.9   misho     636:        if (!(s = io_malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
1.2.2.2   misho     637:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    638:                goto end;
                    639:        } else
                    640:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    641:        sqlite3_reset(stmt);
                    642: 
                    643:        /* fill with data */
                    644:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    645:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.9   misho     646:                s[j].sub_topic.msg_base = (u_char*) io_strdup((char*) sqlite3_column_text(stmt, 1));
1.2.2.2   misho     647:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                    648:                s[j].sub_value.msg_base = NULL;
                    649:                s[j].sub_value.msg_len = 0;
                    650:        }
                    651: end:
                    652:        sqlite3_finalize(stmt);
                    653: 
                    654:        return s;
                    655: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>