Annotation of mqtt/src/pubmqtt.c, revision 1.3.2.1

1.3.2.1 ! misho       1: /*************************************************************************
        !             2: * (C) 2011 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
        !             3: *  by Michael Pounov <misho@openbsd-bg.org>
        !             4: *
        !             5: * $Author: misho $
        !             6: * $Id: global.h,v 1.4 2012/07/03 08:57:04 misho Exp $
        !             7: *
        !             8: **************************************************************************
        !             9: The ELWIX and AITNET software is distributed under the following
        !            10: terms:
        !            11: 
        !            12: All of the documentation and software included in the ELWIX and AITNET
        !            13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
        !            14: 
        !            15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
        !            16:        by Michael Pounov <misho@elwix.org>.  All rights reserved.
        !            17: 
        !            18: Redistribution and use in source and binary forms, with or without
        !            19: modification, are permitted provided that the following conditions
        !            20: are met:
        !            21: 1. Redistributions of source code must retain the above copyright
        !            22:    notice, this list of conditions and the following disclaimer.
        !            23: 2. Redistributions in binary form must reproduce the above copyright
        !            24:    notice, this list of conditions and the following disclaimer in the
        !            25:    documentation and/or other materials provided with the distribution.
        !            26: 3. All advertising materials mentioning features or use of this software
        !            27:    must display the following acknowledgement:
        !            28: This product includes software developed by Michael Pounov <misho@elwix.org>
        !            29: ELWIX - Embedded LightWeight unIX and its contributors.
        !            30: 4. Neither the name of AITNET nor the names of its contributors
        !            31:    may be used to endorse or promote products derived from this software
        !            32:    without specific prior written permission.
        !            33: 
        !            34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
        !            35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
        !            36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
        !            37: ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
        !            38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
        !            39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
        !            40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
        !            41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
        !            42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
        !            43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
        !            44: SUCH DAMAGE.
        !            45: */
1.2       misho      46: #include "global.h"
                     47: 
                     48: 
                     49: extern const char sql_schema[];
                     50: 
                     51: 
                     52: /*
                     53:  * mqtt_db_log() Log database connection message
                     54:  *
                     55:  * @fmt = format string
                     56:  * @... = argument list
                     57:  * return: none
                     58:  */
                     59: static void
                     60: mqtt_rtlm_log(const char *fmt, ...)
                     61: {
                     62:        va_list lst;
                     63: 
                     64:        va_start(lst, fmt);
                     65:        vsyslog(LOG_ERR, fmt, lst);
                     66:        va_end(lst);
                     67: }
1.3       misho      68: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: %s(%d) SQL #%d - %s", \
                     69:                                        __func__, __LINE__, \
1.2       misho      70:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     71: 
1.3       misho      72: /* library pre-loaded actions */
                     73: void
                     74: _init()
                     75: {
                     76:        sqlite3_initialize();
                     77: }
                     78: 
                     79: void
                     80: _fini()
                     81: {
                     82:        sqlite3_shutdown();
                     83: }
                     84: 
1.2       misho      85: 
                     86: /*
                     87:  * mqtt_rtlm_open() Open database connection
                     88:  *
                     89:  * @cfg = config filename
                     90:  * return: NULL error or SQL handle
                     91:  */
                     92: sqlite3 *
1.3       misho      93: mqtt_rtlm_open(cfg_root_t *cfg)
1.2       misho      94: {
                     95:        sqlite3 *sql = NULL;
                     96:        const char *str = NULL;
                     97: 
                     98:        if (!cfg)
                     99:                return NULL;
                    100: 
1.3       misho     101:        str = cfg_getAttribute(cfg, "mqtt_pub", "name");
1.2       misho     102:        if (!str) {
                    103:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                    104:                return NULL;
                    105:        }
                    106: 
                    107:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
                    108:                MQTT_RTLM_LOG(sql);
                    109:                sqlite3_close(sql);
                    110:                return NULL;
                    111:        }
                    112: 
                    113:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                    114:                MQTT_RTLM_LOG(sql);
                    115:                sqlite3_close(sql);
                    116:                return NULL;
                    117:        }
                    118:        return sql;
                    119: }
                    120: 
                    121: /*
                    122:  * mqtt_rtlm_close() Close database connection
                    123:  *
                    124:  * @sql = SQL handle
                    125:  * return: none
                    126:  */
                    127: void
                    128: mqtt_rtlm_close(sqlite3 *sql)
                    129: {
                    130:        sqlite3_close(sql);
                    131: }
                    132: 
                    133: /*
                    134:  * mqtt_rtlm_init_session() Create session
                    135:  *
                    136:  * @cfg = loaded config
                    137:  * @sql = SQL handle
                    138:  * @connid = connection id
                    139:  * @user = username
                    140:  * @host = hostname
                    141:  * @will = will flag if !=0 must fill arguments
                    142:  * @... = will arguments in order topic,msg,qos,retain
                    143:  * return: -1 error, 0 session already appears or >0 row changed
                    144:  */
                    145: int
1.3       misho     146: mqtt_rtlm_init_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, 
1.2       misho     147:                const char *host, char will, ...)
                    148: {
                    149:        va_list lst;
                    150:        int ret = 0;
1.3       misho     151:        char *str, *psStmt;
1.2       misho     152:        sqlite3_stmt *stmt;
                    153: 
                    154:        if (!cfg || !sql)
                    155:                return -1;
                    156: 
1.3       misho     157:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     158:        if (!str) {
                    159:                mqtt_rtlm_log("Error:: not found online table name");
                    160:                return -1;
                    161:        }
                    162:        if (!will)
1.3       misho     163:                psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
                    164:                                "WillFlag) VALUES ('%q', '%q', '%q', 0);", str, connid, user, host);
1.2       misho     165:        else {
                    166:                va_start(lst, will);
1.3       misho     167:                psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, Username, RemoteHost, "
1.2       misho     168:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
1.3       misho     169:                                "VALUES ('%q', '%q', '%q', %d, %d, %d, '%q', '%q');", 
1.2       misho     170:                                str, connid, user, host, will, 
                    171:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                    172:                va_end(lst);
                    173:        }
                    174: 
1.3       misho     175:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     176:                MQTT_RTLM_LOG(sql);
1.3       misho     177:                sqlite3_free(psStmt);
1.2       misho     178:                return -1;
1.3       misho     179:        } else
                    180:                sqlite3_free(psStmt);
1.2       misho     181:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    182:                ret = sqlite3_changes(sql);
                    183:        else {
                    184:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    185:                        MQTT_RTLM_LOG(sql);
                    186:                ret = 0;
                    187:        }
                    188:        sqlite3_finalize(stmt);
                    189: 
                    190:        return ret;
                    191: }
                    192: 
                    193: /*
                    194:  * mqtt_rtlm_fini_session() Delete session(s)
                    195:  *
                    196:  * @cfg = loaded config
                    197:  * @sql = SQL handle
                    198:  * @connid = connection id
                    199:  * @user = username
                    200:  * @host = hostname
                    201:  * return: -1 error, 0 session already appears or >0 row changed
                    202:  */
                    203: int
1.3       misho     204: mqtt_rtlm_fini_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     205: {
                    206:        int ret = 0;
1.3       misho     207:        char *str, *psStmt;
1.2       misho     208:        sqlite3_stmt *stmt;
                    209: 
                    210:        if (!cfg || !sql)
                    211:                return -1;
                    212: 
1.3       misho     213:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     214:        if (!str) {
                    215:                mqtt_rtlm_log("Error:: not found online table name");
                    216:                return -1;
                    217:        }
1.3       misho     218:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND Username = '%q' "
                    219:                        "AND RemoteHost LIKE '%q';", str, connid, user, host);
1.2       misho     220: 
1.3       misho     221:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     222:                MQTT_RTLM_LOG(sql);
1.3       misho     223:                sqlite3_free(psStmt);
1.2       misho     224:                return -1;
1.3       misho     225:        } else
                    226:                sqlite3_free(psStmt);
1.2       misho     227:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    228:                ret = sqlite3_changes(sql);
                    229:        else {
                    230:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    231:                        MQTT_RTLM_LOG(sql);
                    232:                ret = 0;
                    233:        }
                    234:        sqlite3_finalize(stmt);
                    235: 
                    236:        return ret;
                    237: }
                    238: 
                    239: /*
                    240:  * mqtt_rtlm_chk_session() Check session(s)
                    241:  *
                    242:  * @cfg = loaded config
                    243:  * @sql = SQL handle
                    244:  * @connid = connection id
                    245:  * @user = username
                    246:  * @host = hostname
                    247:  * return: -1 error, 0 not logged or >0 logged found rows
                    248:  */
                    249: int
1.3       misho     250: mqtt_rtlm_chk_session(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.2       misho     251: {
                    252:        int ret = 0;
1.3       misho     253:        char *str, *psStmt;
1.2       misho     254:        sqlite3_stmt *stmt;
                    255: 
                    256:        if (!cfg || !sql)
                    257:                return -1;
                    258: 
1.3       misho     259:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_online");
1.2       misho     260:        if (!str) {
                    261:                mqtt_rtlm_log("Error:: not found online table name");
                    262:                return -1;
                    263:        }
1.3       misho     264:        psStmt = sqlite3_mprintf("SELECT ConnID, RemoteHost FROM %s WHERE "
                    265:                        "ConnID = '%q' AND Username LIKE '%q' AND RemoteHost LIKE '%q';", 
1.2       misho     266:                        str, connid, user, host);
                    267: 
1.3       misho     268:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     269:                MQTT_RTLM_LOG(sql);
1.3       misho     270:                sqlite3_free(psStmt);
1.2       misho     271:                return -1;
1.3       misho     272:        } else
                    273:                sqlite3_free(psStmt);
1.2       misho     274:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    275:                ret = sqlite3_changes(sql);
                    276:        else
                    277:                ret = 0;
                    278:        sqlite3_finalize(stmt);
                    279: 
                    280:        return ret;
                    281: }
                    282: 
                    283: /*
                    284:  * mqtt_rtlm_write_topic() Publish topic
                    285:  *
                    286:  * @cfg = loaded config
                    287:  * @sql = SQL handle
1.3       misho     288:  * @connid = connection id
1.2       misho     289:  * @msgid = MessageID
                    290:  * @topic = topic
                    291:  * @txt = text
1.3       misho     292:  * @txtlen = text length
1.2       misho     293:  * @user = username
                    294:  * @host = hostname
1.3       misho     295:  * @qos = QoS
1.2       misho     296:  * @retain = !=0 retain message to database
                    297:  * return: -1 error, 0 no publish or >0 published ok
                    298:  */
                    299: int
1.3       misho     300: mqtt_rtlm_write_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    301:                const char *topic, void *txt, int txtlen, const char *user, 
                    302:                const char *host, char qos, char retain)
1.2       misho     303: {
                    304:        int ret = 0;
1.3       misho     305:        char *str, *psStmt;
1.2       misho     306:        sqlite3_stmt *stmt;
                    307: 
                    308:        if (!cfg || !sql || !topic)
                    309:                return -1;
                    310: 
1.3       misho     311:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     312:        if (!str) {
                    313:                mqtt_rtlm_log("Error:: not found topics table name");
                    314:                return -1;
                    315:        }
1.3       misho     316:        psStmt = sqlite3_mprintf("INSERT INTO %s (QoS, Retain, ConnID, MsgID, Topic, Value, PubUser, "
                    317:                        "PubDate, PubHost) VALUES (%d, %d, '%q', %u, '%q', ?1, '%q', "
                    318:                        "datetime('now', 'localtime'), '%q');", 
                    319:                        str, qos, retain, connid, msgid, topic, user, host);
1.2       misho     320: 
1.3       misho     321:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL) || !stmt) {
                    322:                MQTT_RTLM_LOG(sql);
                    323:                sqlite3_free(psStmt);
                    324:                return -1;
                    325:        } else
                    326:                sqlite3_free(psStmt);
                    327:        if (sqlite3_bind_blob(stmt, 1, txt, txtlen, SQLITE_TRANSIENT)) {
1.2       misho     328:                MQTT_RTLM_LOG(sql);
1.3       misho     329:                sqlite3_finalize(stmt);
1.2       misho     330:                return -1;
                    331:        }
                    332:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    333:                ret = sqlite3_changes(sql);
                    334:        else {
                    335:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    336:                        MQTT_RTLM_LOG(sql);
                    337:                ret = 0;
                    338:        }
                    339:        sqlite3_finalize(stmt);
                    340: 
                    341:        return ret;
                    342: }
                    343: 
                    344: /*
1.3       misho     345:  * mqtt_rtlm_wipe_topic() Wipe all topics
                    346:  *
                    347:  * @cfg = loaded config
                    348:  * @sql = SQL handle
                    349:  * @connid = connection id
                    350:  * @user = username
                    351:  * @retain = -1 no matter
                    352:  * return: -1 error, 0 no changes or >0 deleted rows
                    353:  */
                    354: int
                    355: mqtt_rtlm_wipe_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *user, char retain)
                    356: {
                    357:        int ret = 0;
                    358:        char *str, *rtn, *psStmt;
                    359:        sqlite3_stmt *stmt;
                    360: 
                    361:        if (!cfg || !sql || !connid)
                    362:                return -1;
                    363: 
                    364:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
                    365:        if (!str) {
                    366:                mqtt_rtlm_log("Error:: not found topics table name");
                    367:                return -1;
                    368:        }
                    369:        switch (retain) {
                    370:                case -1:
                    371:                        rtn = "";
                    372:                        break;
                    373:                case 0:
                    374:                        rtn = "AND Retain = 0";
                    375:                        break;
                    376:                default:
                    377:                        rtn = "AND Retain != 0";
                    378:                        break;
                    379:        }
                    380:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
                    381:                        "PubUser LIKE '%q' %s;", str, connid, user, rtn);
                    382: 
                    383:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                    384:                MQTT_RTLM_LOG(sql);
                    385:                sqlite3_free(psStmt);
                    386:                return -1;
                    387:        } else
                    388:                sqlite3_free(psStmt);
                    389:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    390:                ret = sqlite3_changes(sql);
                    391:        else {
                    392:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    393:                        MQTT_RTLM_LOG(sql);
                    394:                ret = 0;
                    395:        }
                    396:        sqlite3_finalize(stmt);
                    397: 
                    398:        return ret;
                    399: }
                    400: 
                    401: /*
1.2       misho     402:  * mqtt_rtlm_delete_topic() Delete topic
                    403:  *
                    404:  * @cfg = loaded config
                    405:  * @sql = SQL handle
1.3       misho     406:  * @connid = connection id
1.2       misho     407:  * @msgid = MessageID
                    408:  * @topic = topic
                    409:  * @user = username
                    410:  * @host = hostname
                    411:  * @retain = -1 no matter
                    412:  * return: -1 error, 0 no changes or >0 deleted rows
                    413:  */
                    414: int
1.3       misho     415: mqtt_rtlm_delete_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    416:                const char *topic, const char *user, const char *host, char retain)
1.2       misho     417: {
                    418:        int ret = 0;
1.3       misho     419:        char *str, *rtn, *psStmt;
1.2       misho     420:        sqlite3_stmt *stmt;
                    421: 
                    422:        if (!cfg || !sql || !topic)
                    423:                return -1;
                    424: 
1.3       misho     425:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     426:        if (!str) {
                    427:                mqtt_rtlm_log("Error:: not found topics table name");
                    428:                return -1;
                    429:        }
                    430:        switch (retain) {
                    431:                case -1:
                    432:                        rtn = "";
                    433:                        break;
                    434:                case 0:
                    435:                        rtn = "AND Retain = 0";
                    436:                        break;
                    437:                default:
                    438:                        rtn = "AND Retain != 0";
                    439:                        break;
                    440:        }
1.3       misho     441:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND MsgID = %d AND "
                    442:                        "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q' %s;", str, 
                    443:                        connid, msgid, topic, user, host, rtn);
1.2       misho     444: 
1.3       misho     445:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     446:                MQTT_RTLM_LOG(sql);
1.3       misho     447:                sqlite3_free(psStmt);
1.2       misho     448:                return -1;
1.3       misho     449:        } else
                    450:                sqlite3_free(psStmt);
1.2       misho     451:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    452:                ret = sqlite3_changes(sql);
                    453:        else {
                    454:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    455:                        MQTT_RTLM_LOG(sql);
                    456:                ret = 0;
                    457:        }
                    458:        sqlite3_finalize(stmt);
                    459: 
                    460:        return ret;
                    461: }
                    462: 
                    463: /*
                    464:  * mqtt_rtlm_read_topic() Get topic
                    465:  *
                    466:  * @cfg = loaded config
                    467:  * @sql = SQL handle
1.3       misho     468:  * @connid = connection id
1.2       misho     469:  * @topic = topic
                    470:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    471:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    472:  */
                    473: mqtt_subscr_t *
1.3       misho     474: mqtt_rtlm_read_topic(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
                    475:                const char *topic, char retain)
1.2       misho     476: {
                    477:        int rowz = 0;
1.3       misho     478:        char *str, szStr[STRSIZ], *psStmt;
1.2       misho     479:        sqlite3_stmt *stmt;
                    480:        register int j;
                    481:        mqtt_subscr_t *s = NULL;
1.3       misho     482:        ait_val_t v;
1.2       misho     483: 
                    484:        if (!cfg || !sql || !topic)
                    485:                return NULL;
                    486: 
                    487:        switch (retain) {
                    488:                case -1:
                    489:                        memset(szStr, 0, sizeof szStr);
                    490:                        break;
                    491:                case 0:
                    492:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    493:                        break;
                    494:                default:
                    495:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    496:                        break;
                    497:        }
                    498: 
1.3       misho     499:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_topics");
1.2       misho     500:        if (!str) {
                    501:                mqtt_rtlm_log("Error:: not found topics table name");
                    502:                return NULL;
                    503:        }
1.3       misho     504:        psStmt = sqlite3_mprintf("SELECT QoS, Topic, Value  FROM %s WHERE "
                    505:                        "ConnID LIKE '%q' AND Topic LIKE '%q' %s;", 
                    506:                        str, connid, topic, szStr);
1.2       misho     507: 
1.3       misho     508:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
1.2       misho     509:                MQTT_RTLM_LOG(sql);
1.3       misho     510:                sqlite3_free(psStmt);
1.2       misho     511:                return NULL;
1.3       misho     512:        } else
                    513:                sqlite3_free(psStmt);
                    514: 
                    515:        /* calculate count of rows and allocate subscribe items */
                    516:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    517:                rowz++;
                    518:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    519:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    520:                goto end;
                    521:        } else
                    522:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    523:        sqlite3_reset(stmt);
                    524: 
                    525:        /* fill with data */
                    526:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    527:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
                    528:                s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
                    529:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                    530:                AIT_SET_PTR(&v, (void*) sqlite3_column_blob(stmt, 2), sqlite3_column_bytes(stmt, 2));
                    531:                s[j].sub_value.msg_len = AIT_LEN(&v);
                    532:                s[j].sub_value.msg_base = (u_char*) malloc(s[j].sub_value.msg_len);
                    533:                if (s[j].sub_value.msg_base)
                    534:                        memcpy(s[j].sub_value.msg_base, AIT_GET_PTR(&v), s[j].sub_value.msg_len);
                    535:                AIT_FREE_VAL(&v);
1.2       misho     536:        }
1.3       misho     537: end:
                    538:        sqlite3_finalize(stmt);
                    539: 
                    540:        return s;
                    541: }
                    542: 
                    543: /*
                    544:  * mqtt_rtlm_write_subscribe() Subscribe topic
                    545:  *
                    546:  * @cfg = loaded config
                    547:  * @sql = SQL handle
                    548:  * @connid = connection id
                    549:  * @msgid = MessageID
                    550:  * @topic = topic
                    551:  * @user = username
                    552:  * @host = hostname
                    553:  * @qos = Subscribe QoS
                    554:  * return: -1 error, 0 no publish or >0 published ok
                    555:  */
                    556: int
                    557: mqtt_rtlm_write_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, u_short msgid, 
                    558:                const char *topic, const char *user, const char *host, char qos)
                    559: {
                    560:        int ret = 0;
                    561:        char *str, *psStmt;
                    562:        sqlite3_stmt *stmt;
                    563: 
                    564:        if (!cfg || !sql || !topic)
                    565:                return -1;
                    566: 
                    567:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
                    568:        if (!str) {
                    569:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    570:                return -1;
                    571:        }
                    572:        psStmt = sqlite3_mprintf("INSERT INTO %s (ConnID, MsgID, QoS, Topic, PubUser, "
                    573:                        "PubDate, PubHost) VALUES ('%q', %d, %d, '%q', '%q', "
                    574:                        "datetime('now', 'localtime'), '%q');", str, 
                    575:                        connid, msgid, qos, topic, user, host);
                    576: 
                    577:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                    578:                MQTT_RTLM_LOG(sql);
                    579:                sqlite3_free(psStmt);
                    580:                return -1;
                    581:        } else
                    582:                sqlite3_free(psStmt);
                    583:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    584:                ret = sqlite3_changes(sql);
                    585:        else {
                    586:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    587:                        MQTT_RTLM_LOG(sql);
                    588:                ret = 0;
                    589:        }
                    590:        sqlite3_finalize(stmt);
                    591: 
                    592:        return ret;
                    593: }
                    594: 
                    595: /*
                    596:  * mqtt_rtlm_delete_subscribe() Delete subscribe
                    597:  *
                    598:  * @cfg = loaded config
                    599:  * @sql = SQL handle
                    600:  * @connid = connection id
                    601:  * @topic = topic
                    602:  * @user = username
                    603:  * @host = hostname
                    604:  * return: -1 error, 0 no changes or >0 deleted rows
                    605:  */
                    606: int
                    607: mqtt_rtlm_delete_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, 
                    608:                const char *topic, const char *user, const char *host)
                    609: {
                    610:        int ret = 0;
                    611:        char *str, *psStmt;
                    612:        sqlite3_stmt *stmt;
                    613: 
                    614:        if (!cfg || !sql || !topic)
                    615:                return -1;
                    616: 
                    617:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
                    618:        if (!str) {
                    619:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    620:                return -1;
                    621:        }
                    622:        psStmt = sqlite3_mprintf("DELETE FROM %s WHERE ConnID = '%q' AND "
                    623:                        "Topic LIKE '%q' AND PubUser LIKE '%q' AND PubHost LIKE '%q';", str, 
                    624:                        connid, topic, user, host);
                    625: 
                    626:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                    627:                MQTT_RTLM_LOG(sql);
                    628:                sqlite3_free(psStmt);
                    629:                return -1;
                    630:        } else
                    631:                sqlite3_free(psStmt);
                    632:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    633:                ret = sqlite3_changes(sql);
                    634:        else {
                    635:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    636:                        MQTT_RTLM_LOG(sql);
                    637:                ret = 0;
                    638:        }
                    639:        sqlite3_finalize(stmt);
                    640: 
                    641:        return ret;
                    642: }
                    643: 
                    644: /*
                    645:  * mqtt_rtlm_read_subscribe() Get subscribe topic
                    646:  *
                    647:  * @cfg = loaded config
                    648:  * @sql = SQL handle
                    649:  * @connid = connection id
                    650:  * @topic = topic
                    651:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    652:  */
                    653: mqtt_subscr_t *
                    654: mqtt_rtlm_read_subscribe(cfg_root_t *cfg, sqlite3 *sql, const char *connid, const char *topic)
                    655: {
                    656:        int rowz = 0;
                    657:        char *str, *psStmt;
                    658:        sqlite3_stmt *stmt;
                    659:        register int j;
                    660:        mqtt_subscr_t *s = NULL;
                    661: 
                    662:        if (!cfg || !sql || !topic)
                    663:                return NULL;
                    664: 
                    665:        str = (char*) cfg_getAttribute(cfg, "mqtt_pub", "tbl_subscribes");
                    666:        if (!str) {
                    667:                mqtt_rtlm_log("Error:: not found subscribes table name");
                    668:                return NULL;
                    669:        }
                    670:        psStmt = sqlite3_mprintf("SELECT QoS, Topic FROM %s WHERE ConnID = '%q' AND "
                    671:                        "Topic LIKE '%q';", str, connid, topic);
                    672: 
                    673:        if (sqlite3_prepare_v2(sql, psStmt, strlen(psStmt), &stmt, NULL)) {
                    674:                MQTT_RTLM_LOG(sql);
                    675:                sqlite3_free(psStmt);
                    676:                return NULL;
                    677:        } else
                    678:                sqlite3_free(psStmt);
1.2       misho     679: 
                    680:        /* calculate count of rows and allocate subscribe items */
                    681:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    682:                rowz++;
                    683:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    684:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    685:                goto end;
                    686:        } else
                    687:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    688:        sqlite3_reset(stmt);
                    689: 
                    690:        /* fill with data */
                    691:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    692:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.3       misho     693:                s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
                    694:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
                    695:                s[j].sub_value.msg_base = NULL;
                    696:                s[j].sub_value.msg_len = 0;
1.2       misho     697:        }
                    698: end:
                    699:        sqlite3_finalize(stmt);
                    700: 
                    701:        return s;
                    702: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>