Annotation of mqtt/src/pubmqtt.c, revision 1.3

1.2       misho       1: #include "global.h"
                      2: 
                      3: 
                      4: extern const char sql_schema[];
                      5: 
                      6: 
                      7: /*
                      8:  * mqtt_db_log() Log database connection message
                      9:  *
                     10:  * @fmt = format string
                     11:  * @... = argument list
                     12:  * return: none
                     13:  */
                     14: static void
                     15: mqtt_rtlm_log(const char *fmt, ...)
                     16: {
                     17:        va_list lst;
                     18: 
                     19:        va_start(lst, fmt);
                     20:        vsyslog(LOG_ERR, fmt, lst);
                     21:        va_end(lst);
                     22: }
1.3     ! misho      23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
        !            24:                                        __func__, __LINE__, \
1.2       misho      25:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     26: 
1.3     ! misho      27: /* library pre-loaded actions */
        !            28: void
        !            29: _init()
        !            30: {
        !            31:        sqlite3_initialize();
        !            32: }
        !            33: 
        !            34: void
        !            35: _fini()
        !            36: {
        !            37:        sqlite3_shutdown();
        !            38: }
        !            39: 
1.2       misho      40: 
                     41: /*
                     42:  * mqtt_rtlm_open() Open database connection
                     43:  *
                     44:  * @cfg = config filename
                     45:  * return: NULL error or SQL handle
                     46:  */
                     47: sqlite3 *
1.3     ! misho      48: mqtt_rtlm_open(cfg_root_t *cfg)
1.2       misho      49: {
                     50:        sqlite3 *sql = NULL;
                     51:        const char *str = NULL;
                     52: 
                     53:        if (!cfg)
                     54:                return NULL;
                     55: 
1.3     ! misho      56:        str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2       misho      57:        if (!str) {
                     58:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     59:                return NULL;
                     60:        }
                     61: 
                     62:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
                     63:                MQTT_RTLM_LOG(sql);
                     64:                sqlite3_close(sql);
                     65:                return NULL;
                     66:        }
                     67: 
                     68:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                     69:                MQTT_RTLM_LOG(sql);
                     70:                sqlite3_close(sql);
                     71:                return NULL;
                     72:        }
                     73:        return sql;
                     74: }
                     75: 
                     76: /*
                     77:  * mqtt_rtlm_close() Close database connection
                     78:  *
                     79:  * @sql = SQL handle
                     80:  * return: none
                     81:  */
                     82: void
                     83: mqtt_rtlm_close(sqlite3 *sql)
                     84: {
                     85:        sqlite3_close(sql);
                     86: }
                     87: 
                     88: /*
                     89:  * mqtt_rtlm_init_session() Create session
                     90:  *
                     91:  * @cfg = loaded config
                     92:  * @sql = SQL handle
                     93:  * @connid = connection id
                     94:  * @user = username
                     95:  * @host = hostname
                     96:  * @will = will flag if !=0 must fill arguments
                     97:  * @... = will arguments in order topic,msg,qos,retain
                     98:  * return: -1 error, 0 session already appears or >0 row changed
                     99:  */
                    100: int
1.3     ! misho     101: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
1.2       misho     102:                const char *host, char will, ...)
                    103: {
                    104:        va_list lst;
                    105:        int ret = 0;
1.3     ! misho     106:        char *str, *psStmt;
1.2       misho     107:        sqlite3_stmt *stmt;
                    108: 
                    109:        if (!cfg || !sql)
                    110:                return -1;
                    111: 
1.3     ! misho     112:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     113:        if (!str) {
                    114:                mqtt_rtlm_log("Error:: not found online table name");
                    115:                return -1;
                    116:        }
                    117:        if (!will)
1.3     ! misho     118:                psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
        !           119:                                "WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
1.2       misho     120:        else {
                    121:                va_start(lst, will);
1.3     ! misho     122:                psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
1.2       misho     123:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
1.3     ! misho     124:                                "VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');", 
1.2       misho     125:                                str, connid, user, host, will, 
                    126:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                    127:                va_end(lst);
                    128:        }
                    129: 
1.3     ! misho     130:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     131:                MQTT_RTLM_LOG(sql);
1.3     ! misho     132:                sqlite3_free(psStmt);
1.2       misho     133:                return -1;
1.3     ! misho     134:        } else
        !           135:                sqlite3_free(psStmt);
1.2       misho     136:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    137:                ret = sqlite3_changes(sql);
                    138:        else {
                    139:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    140:                        MQTT_RTLM_LOG(sql);
                    141:                ret = 0;
                    142:        }
                    143:        sqlite3_finalize(stmt);
                    144: 
                    145:        return ret;
                    146: }
                    147: 
                    148: /*
                    149:  * mqtt_rtlm_fini_session() Delete session(s)
                    150:  *
                    151:  * @cfg = loaded config
                    152:  * @sql = SQL handle
                    153:  * @connid = connection id
                    154:  * @user = username
                    155:  * @host = hostname
                    156:  * return: -1 error, 0 session already appears or >0 row changed
                    157:  */
                    158: int
1.3     ! misho     159: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     160: {
                    161:        int ret = 0;
1.3     ! misho     162:        char *str, *psStmt;
1.2       misho     163:        sqlite3_stmt *stmt;
                    164: 
                    165:        if (!cfg || !sql)
                    166:                return -1;
                    167: 
1.3     ! misho     168:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     169:        if (!str) {
                    170:                mqtt_rtlm_log("Error:: not found online table name");
                    171:                return -1;
                    172:        }
1.3     ! misho     173:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
        !           174:                        "AND RemoteHost LIKE '%q';", str, connid, user, host);
1.2       misho     175: 
1.3     ! misho     176:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     177:                MQTT_RTLM_LOG(sql);
1.3     ! misho     178:                sqlite3_free(psStmt);
1.2       misho     179:                return -1;
1.3     ! misho     180:        } else
        !           181:                sqlite3_free(psStmt);
1.2       misho     182:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    183:                ret = sqlite3_changes(sql);
                    184:        else {
                    185:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    186:                        MQTT_RTLM_LOG(sql);
                    187:                ret = 0;
                    188:        }
                    189:        sqlite3_finalize(stmt);
                    190: 
                    191:        return ret;
                    192: }
                    193: 
                    194: /*
                    195:  * mqtt_rtlm_chk_session() Check session(s)
                    196:  *
                    197:  * @cfg = loaded config
                    198:  * @sql = SQL handle
                    199:  * @connid = connection id
                    200:  * @user = username
                    201:  * @host = hostname
                    202:  * return: -1 error, 0 not logged or >0 logged found rows
                    203:  */
                    204: int
1.3     ! misho     205: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     206: {
                    207:        int ret = 0;
1.3     ! misho     208:        char *str, *psStmt;
1.2       misho     209:        sqlite3_stmt *stmt;
                    210: 
                    211:        if (!cfg || !sql)
                    212:                return -1;
                    213: 
1.3     ! misho     214:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     215:        if (!str) {
                    216:                mqtt_rtlm_log("Error:: not found online table name");
                    217:                return -1;
                    218:        }
1.3     ! misho     219:        psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
        !           220:                        "ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';", 
1.2       misho     221:                        str, connid, user, host);
                    222: 
1.3     ! misho     223:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     224:                MQTT_RTLM_LOG(sql);
1.3     ! misho     225:                sqlite3_free(psStmt);
1.2       misho     226:                return -1;
1.3     ! misho     227:        } else
        !           228:                sqlite3_free(psStmt);
1.2       misho     229:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    230:                ret = sqlite3_changes(sql);
                    231:        else
                    232:                ret = 0;
                    233:        sqlite3_finalize(stmt);
                    234: 
                    235:        return ret;
                    236: }
                    237: 
                    238: /*
                    239:  * mqtt_rtlm_write_topic() Publish topic
                    240:  *
                    241:  * @cfg = loaded config
                    242:  * @sql = SQL handle
1.3     ! misho     243:  * @connid = connection id
1.2       misho     244:  * @msgid = MessageID
                    245:  * @topic = topic
                    246:  * @txt = text
1.3     ! misho     247:  * @txtlen = text length
1.2       misho     248:  * @user = username
                    249:  * @host = hostname
1.3     ! misho     250:  * @qos = QoS
1.2       misho     251:  * @retain = !=0 retain message to database
                    252:  * return: -1 error, 0 no publish or >0 published ok
                    253:  */
                    254: int
1.3     ! misho     255: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
        !           256:                const char *topic, void *txt, int txtlen, const char *user, 
        !           257:                const char *host, char qos, char retain)
1.2       misho     258: {
                    259:        int ret = 0;
1.3     ! misho     260:        char *str, *psStmt;
1.2       misho     261:        sqlite3_stmt *stmt;
                    262: 
                    263:        if (!cfg || !sql || !topic)
                    264:                return -1;
                    265: 
1.3     ! misho     266:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     267:        if (!str) {
                    268:                mqtt_rtlm_log("Error:: not found topics table name");
                    269:                return -1;
                    270:        }
1.3     ! misho     271:        psStmt = sqlite3_mprintf("INSERT INTO %s (QoS, Retain, ConnID, MsgID, Topic, Value, PubUser, "
        !           272:                        "PubDate, PubHost) VALUES (%d, %d, '%q', %u, '%q', ?1, '%q', "
        !           273:                        "datetime('now', 'localtime'), '%q');", 
        !           274:                        str, qos, retain, connid, msgid, topic, user, host);
1.2       misho     275: 
1.3     ! misho     276:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL) || !stmt) {
        !           277:                MQTT_RTLM_LOG(sql);
        !           278:                sqlite3_free(psStmt);
        !           279:                return -1;
        !           280:        } else
        !           281:                sqlite3_free(psStmt);
        !           282:        if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
1.2       misho     283:                MQTT_RTLM_LOG(sql);
1.3     ! misho     284:                sqlite3_finalize(stmt);
1.2       misho     285:                return -1;
                    286:        }
                    287:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    288:                ret = sqlite3_changes(sql);
                    289:        else {
                    290:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    291:                        MQTT_RTLM_LOG(sql);
                    292:                ret = 0;
                    293:        }
                    294:        sqlite3_finalize(stmt);
                    295: 
                    296:        return ret;
                    297: }
                    298: 
                    299: /*
1.3     ! misho     300:  * mqtt_rtlm_wipe_topic() Wipe all topics
        !           301:  *
        !           302:  * @cfg = loaded config
        !           303:  * @sql = SQL handle
        !           304:  * @connid = connection id
        !           305:  * @user = username
        !           306:  * @retain = -1 no matter
        !           307:  * return: -1 error, 0 no changes or >0 deleted rows
        !           308:  */
        !           309: int
        !           310: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
        !           311: {
        !           312:        int ret = 0;
        !           313:        char *str, *rtn, *psStmt;
        !           314:        sqlite3_stmt *stmt;
        !           315: 
        !           316:        if (!cfg || !sql || !connid)
        !           317:                return -1;
        !           318: 
        !           319:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
        !           320:        if (!str) {
        !           321:                mqtt_rtlm_log("Error:: not found topics table name");
        !           322:                return -1;
        !           323:        }
        !           324:        switch (retain) {
        !           325:                case -1:
        !           326:                        rtn = "";
        !           327:                        break;
        !           328:                case 0:
        !           329:                        rtn = "AND Retain = 0";
        !           330:                        break;
        !           331:                default:
        !           332:                        rtn = "AND Retain != 0";
        !           333:                        break;
        !           334:        }
        !           335:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
        !           336:                        "PubUser LIKE '%q' %s;", str, connid, user, rtn);
        !           337: 
        !           338:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
        !           339:                MQTT_RTLM_LOG(sql);
        !           340:                sqlite3_free(psStmt);
        !           341:                return -1;
        !           342:        } else
        !           343:                sqlite3_free(psStmt);
        !           344:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
        !           345:                ret = sqlite3_changes(sql);
        !           346:        else {
        !           347:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
        !           348:                        MQTT_RTLM_LOG(sql);
        !           349:                ret = 0;
        !           350:        }
        !           351:        sqlite3_finalize(stmt);
        !           352: 
        !           353:        return ret;
        !           354: }
        !           355: 
        !           356: /*
1.2       misho     357:  * mqtt_rtlm_delete_topic() Delete topic
                    358:  *
                    359:  * @cfg = loaded config
                    360:  * @sql = SQL handle
1.3     ! misho     361:  * @connid = connection id
1.2       misho     362:  * @msgid = MessageID
                    363:  * @topic = topic
                    364:  * @user = username
                    365:  * @host = hostname
                    366:  * @retain = -1 no matter
                    367:  * return: -1 error, 0 no changes or >0 deleted rows
                    368:  */
                    369: int
1.3     ! misho     370: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
        !           371:                const char *topic, const char *user, const char *host, char retain)
1.2       misho     372: {
                    373:        int ret = 0;
1.3     ! misho     374:        char *str, *rtn, *psStmt;
1.2       misho     375:        sqlite3_stmt *stmt;
                    376: 
                    377:        if (!cfg || !sql || !topic)
                    378:                return -1;
                    379: 
1.3     ! misho     380:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     381:        if (!str) {
                    382:                mqtt_rtlm_log("Error:: not found topics table name");
                    383:                return -1;
                    384:        }
                    385:        switch (retain) {
                    386:                case -1:
                    387:                        rtn = "";
                    388:                        break;
                    389:                case 0:
                    390:                        rtn = "AND Retain = 0";
                    391:                        break;
                    392:                default:
                    393:                        rtn = "AND Retain != 0";
                    394:                        break;
                    395:        }
1.3     ! misho     396:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
        !           397:                        "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str, 
        !           398:                        connid, msgid, topic, user, host, rtn);
1.2       misho     399: 
1.3     ! misho     400:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     401:                MQTT_RTLM_LOG(sql);
1.3     ! misho     402:                sqlite3_free(psStmt);
1.2       misho     403:                return -1;
1.3     ! misho     404:        } else
        !           405:                sqlite3_free(psStmt);
1.2       misho     406:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    407:                ret = sqlite3_changes(sql);
                    408:        else {
                    409:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    410:                        MQTT_RTLM_LOG(sql);
                    411:                ret = 0;
                    412:        }
                    413:        sqlite3_finalize(stmt);
                    414: 
                    415:        return ret;
                    416: }
                    417: 
                    418: /*
                    419:  * mqtt_rtlm_read_topic() Get topic
                    420:  *
                    421:  * @cfg = loaded config
                    422:  * @sql = SQL handle
1.3     ! misho     423:  * @connid = connection id
1.2       misho     424:  * @topic = topic
                    425:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    426:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    427:  */
                    428: mqtt_subscr_t *
1.3     ! misho     429: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
        !           430:                const char *topic, char retain)
1.2       misho     431: {
                    432:        int rowz = 0;
1.3     ! misho     433:        char *str, szStr[STRSIZ], *psStmt;
1.2       misho     434:        sqlite3_stmt *stmt;
                    435:        register int j;
                    436:        mqtt_subscr_t *s = NULL;
1.3     ! misho     437:        ait_val_t v;
1.2       misho     438: 
                    439:        if (!cfg || !sql || !topic)
                    440:                return NULL;
                    441: 
                    442:        switch (retain) {
                    443:                case -1:
                    444:                        memset(szStr, 0, sizeof szStr);
                    445:                        break;
                    446:                case 0:
                    447:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    448:                        break;
                    449:                default:
                    450:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    451:                        break;
                    452:        }
                    453: 
1.3     ! misho     454:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     455:        if (!str) {
                    456:                mqtt_rtlm_log("Error:: not found topics table name");
                    457:                return NULL;
                    458:        }
1.3     ! misho     459:        psStmt = sqlite3_mprintf("SELECT QoS, Topic, Value  FROM %s WHERE "
        !           460:                        "ConnID LIKE '%q' AND Topic LIKE '%q' %s;", 
        !           461:                        str, connid, topic, szStr);
1.2       misho     462: 
1.3     ! misho     463:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     464:                MQTT_RTLM_LOG(sql);
1.3     ! misho     465:                sqlite3_free(psStmt);
1.2       misho     466:                return NULL;
1.3     ! misho     467:        } else
        !           468:                sqlite3_free(psStmt);
        !           469: 
        !           470:        /* calculate count of rows and allocate subscribe items */
        !           471:        while (sqlite3_step(stmt) == SQLITE_ROW)
        !           472:                rowz++;
        !           473:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
        !           474:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
        !           475:                goto end;
        !           476:        } else
        !           477:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
        !           478:        sqlite3_reset(stmt);
        !           479: 
        !           480:        /* fill with data */
        !           481:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
        !           482:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
        !           483:                s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
        !           484:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
        !           485:                AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
        !           486:                s[j].sub_value.msg_len = AIT_LEN(&v);
        !           487:                s[j].sub_value.msg_base = (u_char*) malloc(s[j].sub_value.msg_len);
        !           488:                if (s[j].sub_value.msg_base)
        !           489:                        memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
        !           490:                AIT_FREE_VAL(&v);
1.2       misho     491:        }
1.3     ! misho     492: end:
        !           493:        sqlite3_finalize(stmt);
        !           494: 
        !           495:        return s;
        !           496: }
        !           497: 
        !           498: /*
        !           499:  * mqtt_rtlm_write_subscribe() Subscribe topic
        !           500:  *
        !           501:  * @cfg = loaded config
        !           502:  * @sql = SQL handle
        !           503:  * @connid = connection id
        !           504:  * @msgid = MessageID
        !           505:  * @topic = topic
        !           506:  * @user = username
        !           507:  * @host = hostname
        !           508:  * @qos = Subscribe QoS
        !           509:  * return: -1 error, 0 no publish or >0 published ok
        !           510:  */
        !           511: int
        !           512: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
        !           513:                const char *topic, const char *user, const char *host, char qos)
        !           514: {
        !           515:        int ret = 0;
        !           516:        char *str, *psStmt;
        !           517:        sqlite3_stmt *stmt;
        !           518: 
        !           519:        if (!cfg || !sql || !topic)
        !           520:                return -1;
        !           521: 
        !           522:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
        !           523:        if (!str) {
        !           524:                mqtt_rtlm_log("Error:: not found subscribes table name");
        !           525:                return -1;
        !           526:        }
        !           527:        psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
        !           528:                        "PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
        !           529:                        "datetime('now', 'localtime'), '%q');", str, 
        !           530:                        connid, msgid, qos, topic, user, host);
        !           531: 
        !           532:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
        !           533:                MQTT_RTLM_LOG(sql);
        !           534:                sqlite3_free(psStmt);
        !           535:                return -1;
        !           536:        } else
        !           537:                sqlite3_free(psStmt);
        !           538:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
        !           539:                ret = sqlite3_changes(sql);
        !           540:        else {
        !           541:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
        !           542:                        MQTT_RTLM_LOG(sql);
        !           543:                ret = 0;
        !           544:        }
        !           545:        sqlite3_finalize(stmt);
        !           546: 
        !           547:        return ret;
        !           548: }
        !           549: 
        !           550: /*
        !           551:  * mqtt_rtlm_delete_subscribe() Delete subscribe
        !           552:  *
        !           553:  * @cfg = loaded config
        !           554:  * @sql = SQL handle
        !           555:  * @connid = connection id
        !           556:  * @topic = topic
        !           557:  * @user = username
        !           558:  * @host = hostname
        !           559:  * return: -1 error, 0 no changes or >0 deleted rows
        !           560:  */
        !           561: int
        !           562: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
        !           563:                const char *topic, const char *user, const char *host)
        !           564: {
        !           565:        int ret = 0;
        !           566:        char *str, *psStmt;
        !           567:        sqlite3_stmt *stmt;
        !           568: 
        !           569:        if (!cfg || !sql || !topic)
        !           570:                return -1;
        !           571: 
        !           572:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
        !           573:        if (!str) {
        !           574:                mqtt_rtlm_log("Error:: not found subscribes table name");
        !           575:                return -1;
        !           576:        }
        !           577:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
        !           578:                        "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str, 
        !           579:                        connid, topic, user, host);
        !           580: 
        !           581:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
        !           582:                MQTT_RTLM_LOG(sql);
        !           583:                sqlite3_free(psStmt);
        !           584:                return -1;
        !           585:        } else
        !           586:                sqlite3_free(psStmt);
        !           587:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
        !           588:                ret = sqlite3_changes(sql);
        !           589:        else {
        !           590:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
        !           591:                        MQTT_RTLM_LOG(sql);
        !           592:                ret = 0;
        !           593:        }
        !           594:        sqlite3_finalize(stmt);
        !           595: 
        !           596:        return ret;
        !           597: }
        !           598: 
        !           599: /*
        !           600:  * mqtt_rtlm_read_subscribe() Get subscribe topic
        !           601:  *
        !           602:  * @cfg = loaded config
        !           603:  * @sql = SQL handle
        !           604:  * @connid = connection id
        !           605:  * @topic = topic
        !           606:  * return: NULL error or not found and !=NULL allocated subscribe topics
        !           607:  */
        !           608: mqtt_subscr_t *
        !           609: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
        !           610: {
        !           611:        int rowz = 0;
        !           612:        char *str, *psStmt;
        !           613:        sqlite3_stmt *stmt;
        !           614:        register int j;
        !           615:        mqtt_subscr_t *s = NULL;
        !           616: 
        !           617:        if (!cfg || !sql || !topic)
        !           618:                return NULL;
        !           619: 
        !           620:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
        !           621:        if (!str) {
        !           622:                mqtt_rtlm_log("Error:: not found subscribes table name");
        !           623:                return NULL;
        !           624:        }
        !           625:        psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
        !           626:                        "Topic LIKE '%q';", str, connid, topic);
        !           627: 
        !           628:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
        !           629:                MQTT_RTLM_LOG(sql);
        !           630:                sqlite3_free(psStmt);
        !           631:                return NULL;
        !           632:        } else
        !           633:                sqlite3_free(psStmt);
1.2       misho     634: 
                    635:        /* calculate count of rows and allocate subscribe items */
                    636:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    637:                rowz++;
                    638:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    639:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    640:                goto end;
                    641:        } else
                    642:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    643:        sqlite3_reset(stmt);
                    644: 
                    645:        /* fill with data */
                    646:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    647:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.3     ! misho     648:                s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
        !           649:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
        !           650:                s[j].sub_value.msg_base = NULL;
        !           651:                s[j].sub_value.msg_len = 0;
1.2       misho     652:        }
                    653: end:
                    654:        sqlite3_finalize(stmt);
                    655: 
                    656:        return s;
                    657: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>