Annotation of mqtt/src/pubmqtt.c, revision 1.2.2.1

1.2       misho       1: #include "global.h"
                      2: 
                      3: 
                      4: extern const char sql_schema[];
                      5: 
                      6: 
                      7: /*
                      8:  * mqtt_db_log() Log database connection message
                      9:  *
                     10:  * @fmt = format string
                     11:  * @... = argument list
                     12:  * return: none
                     13:  */
                     14: static void
                     15: mqtt_rtlm_log(const char *fmt, ...)
                     16: {
                     17:        va_list lst;
                     18: 
                     19:        va_start(lst, fmt);
                     20:        vsyslog(LOG_ERR, fmt, lst);
                     21:        va_end(lst);
                     22: }
                     23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
                     24:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     25: 
                     26: 
                     27: /*
                     28:  * mqtt_rtlm_open() Open database connection
                     29:  *
                     30:  * @cfg = config filename
                     31:  * return: NULL error or SQL handle
                     32:  */
                     33: sqlite3 *
                     34: mqtt_rtlm_open(sl_config *cfg)
                     35: {
                     36:        sqlite3 *sql = NULL;
                     37:        const char *str = NULL;
                     38: 
                     39:        if (!cfg)
                     40:                return NULL;
                     41: 
                     42:        /*
                     43:        if (!sqlite3_threadsafe() || sqlite3_config(SQLITE_CONFIG_SERIALIZED))
                     44:                return NULL;
                     45:                */
                     46: 
                     47:        str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
                     48:        if (!str) {
                     49:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     50:                return NULL;
                     51:        }
                     52: 
                     53:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
                     54:                MQTT_RTLM_LOG(sql);
                     55:                sqlite3_close(sql);
                     56:                return NULL;
                     57:        }
                     58: 
                     59:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                     60:                MQTT_RTLM_LOG(sql);
                     61:                sqlite3_close(sql);
                     62:                return NULL;
                     63:        }
                     64:        return sql;
                     65: }
                     66: 
                     67: /*
                     68:  * mqtt_rtlm_close() Close database connection
                     69:  *
                     70:  * @sql = SQL handle
                     71:  * return: none
                     72:  */
                     73: void
                     74: mqtt_rtlm_close(sqlite3 *sql)
                     75: {
                     76:        sqlite3_close(sql);
                     77: }
                     78: 
                     79: /*
                     80:  * mqtt_rtlm_init_session() Create session
                     81:  *
                     82:  * @cfg = loaded config
                     83:  * @sql = SQL handle
                     84:  * @connid = connection id
                     85:  * @user = username
                     86:  * @host = hostname
                     87:  * @will = will flag if !=0 must fill arguments
                     88:  * @... = will arguments in order topic,msg,qos,retain
                     89:  * return: -1 error, 0 session already appears or >0 row changed
                     90:  */
                     91: int
                     92: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, 
                     93:                const char *host, char will, ...)
                     94: {
                     95:        va_list lst;
                     96:        int ret = 0;
                     97:        char *str, szStmt[BUFSIZ] = { 0 };
                     98:        sqlite3_stmt *stmt;
                     99: 
                    100:        if (!cfg || !sql)
                    101:                return -1;
                    102: 
                    103:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    104:        if (!str) {
                    105:                mqtt_rtlm_log("Error:: not found online table name");
                    106:                return -1;
                    107:        }
                    108:        if (!will)
                    109:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                    110:                                "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
                    111:        else {
                    112:                va_start(lst, will);
                    113:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                    114:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
                    115:                                "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
                    116:                                str, connid, user, host, will, 
                    117:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                    118:                va_end(lst);
                    119:        }
                    120: 
                    121:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    122:                MQTT_RTLM_LOG(sql);
                    123:                return -1;
                    124:        }
                    125:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    126:                ret = sqlite3_changes(sql);
                    127:        else {
                    128:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    129:                        MQTT_RTLM_LOG(sql);
                    130:                ret = 0;
                    131:        }
                    132:        sqlite3_finalize(stmt);
                    133: 
                    134:        return ret;
                    135: }
                    136: 
                    137: /*
                    138:  * mqtt_rtlm_fini_session() Delete session(s)
                    139:  *
                    140:  * @cfg = loaded config
                    141:  * @sql = SQL handle
                    142:  * @connid = connection id
                    143:  * @user = username
                    144:  * @host = hostname
                    145:  * return: -1 error, 0 session already appears or >0 row changed
                    146:  */
                    147: int
                    148: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
                    149: {
                    150:        int ret = 0;
                    151:        char *str, szStmt[BUFSIZ] = { 0 };
                    152:        sqlite3_stmt *stmt;
                    153: 
                    154:        if (!cfg || !sql)
                    155:                return -1;
                    156: 
                    157:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    158:        if (!str) {
                    159:                mqtt_rtlm_log("Error:: not found online table name");
                    160:                return -1;
                    161:        }
                    162:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
                    163:                        "AND RemoteHost LIKE '%s';", str, connid, user, host);
                    164: 
                    165:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    166:                MQTT_RTLM_LOG(sql);
                    167:                return -1;
                    168:        }
                    169:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    170:                ret = sqlite3_changes(sql);
                    171:        else {
                    172:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    173:                        MQTT_RTLM_LOG(sql);
                    174:                ret = 0;
                    175:        }
                    176:        sqlite3_finalize(stmt);
                    177: 
                    178:        return ret;
                    179: }
                    180: 
                    181: /*
                    182:  * mqtt_rtlm_chk_session() Check session(s)
                    183:  *
                    184:  * @cfg = loaded config
                    185:  * @sql = SQL handle
                    186:  * @connid = connection id
                    187:  * @user = username
                    188:  * @host = hostname
                    189:  * return: -1 error, 0 not logged or >0 logged found rows
                    190:  */
                    191: int
                    192: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
                    193: {
                    194:        int ret = 0;
                    195:        char *str, szStmt[BUFSIZ] = { 0 };
                    196:        sqlite3_stmt *stmt;
                    197: 
                    198:        if (!cfg || !sql)
                    199:                return -1;
                    200: 
                    201:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    202:        if (!str) {
                    203:                mqtt_rtlm_log("Error:: not found online table name");
                    204:                return -1;
                    205:        }
                    206:        snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
                    207:                        "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';", 
                    208:                        str, connid, user, host);
                    209: 
                    210:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    211:                MQTT_RTLM_LOG(sql);
                    212:                return -1;
                    213:        }
                    214:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    215:                ret = sqlite3_changes(sql);
                    216:        else
                    217:                ret = 0;
                    218:        sqlite3_finalize(stmt);
                    219: 
                    220:        return ret;
                    221: }
                    222: 
                    223: /*
                    224:  * mqtt_rtlm_write_topic() Publish topic
                    225:  *
                    226:  * @cfg = loaded config
                    227:  * @sql = SQL handle
                    228:  * @msgid = MessageID
                    229:  * @topic = topic
                    230:  * @txt = text
                    231:  * @user = username
                    232:  * @host = hostname
                    233:  * @retain = !=0 retain message to database
                    234:  * return: -1 error, 0 no publish or >0 published ok
                    235:  */
                    236: int
                    237: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
                    238:                const char *user, const char *host, char retain)
                    239: {
                    240:        int ret = 0;
                    241:        char *str, szStmt[BUFSIZ] = { 0 };
                    242:        sqlite3_stmt *stmt;
                    243: 
                    244:        if (!cfg || !sql || !topic)
                    245:                return -1;
                    246: 
                    247:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    248:        if (!str) {
                    249:                mqtt_rtlm_log("Error:: not found topics table name");
                    250:                return -1;
                    251:        }
                    252:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
                    253:                        "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
                    254:                        "datetime('now', 'localtime'), '%s');", 
                    255:                        str, retain, msgid, topic, txt, user, host);
                    256: 
                    257:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    258:                MQTT_RTLM_LOG(sql);
                    259:                return -1;
                    260:        }
                    261:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    262:                ret = sqlite3_changes(sql);
                    263:        else {
                    264:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    265:                        MQTT_RTLM_LOG(sql);
                    266:                ret = 0;
                    267:        }
                    268:        sqlite3_finalize(stmt);
                    269: 
                    270:        return ret;
                    271: }
                    272: 
                    273: /*
                    274:  * mqtt_rtlm_delete_topic() Delete topic
                    275:  *
                    276:  * @cfg = loaded config
                    277:  * @sql = SQL handle
                    278:  * @msgid = MessageID
                    279:  * @topic = topic
                    280:  * @user = username
                    281:  * @host = hostname
                    282:  * @retain = -1 no matter
                    283:  * return: -1 error, 0 no changes or >0 deleted rows
                    284:  */
                    285: int
                    286: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
                    287:                const char *user, const char *host, char retain)
                    288: {
                    289:        int ret = 0;
                    290:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
                    291:        sqlite3_stmt *stmt;
                    292: 
                    293:        if (!cfg || !sql || !topic)
                    294:                return -1;
                    295: 
                    296:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    297:        if (!str) {
                    298:                mqtt_rtlm_log("Error:: not found topics table name");
                    299:                return -1;
                    300:        }
                    301:        switch (retain) {
                    302:                case -1:
                    303:                        rtn = "";
                    304:                        break;
                    305:                case 0:
                    306:                        rtn = "AND Retain = 0";
                    307:                        break;
                    308:                default:
                    309:                        rtn = "AND Retain != 0";
                    310:                        break;
                    311:        }
                    312:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
                    313:                        "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
                    314:                        msgid, topic, user, host, rtn);
                    315: 
                    316:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    317:                MQTT_RTLM_LOG(sql);
                    318:                return -1;
                    319:        }
                    320:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    321:                ret = sqlite3_changes(sql);
                    322:        else {
                    323:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    324:                        MQTT_RTLM_LOG(sql);
                    325:                ret = 0;
                    326:        }
                    327:        sqlite3_finalize(stmt);
                    328: 
                    329:        return ret;
                    330: }
                    331: 
                    332: /*
                    333:  * mqtt_rtlm_read_topic() Get topic
                    334:  *
                    335:  * @cfg = loaded config
                    336:  * @sql = SQL handle
                    337:  * @msgid = MessageID
                    338:  * @topic = topic
                    339:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    340:  * return: NULL error or not found and !=NULL allocated subscribe topics
                    341:  */
                    342: mqtt_subscr_t *
                    343: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
                    344: {
                    345:        int rowz = 0;
                    346:        char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
                    347:        sqlite3_stmt *stmt;
                    348:        register int j;
                    349:        mqtt_subscr_t *s = NULL;
                    350: 
                    351:        if (!cfg || !sql || !topic)
                    352:                return NULL;
                    353: 
                    354:        switch (retain) {
                    355:                case -1:
                    356:                        memset(szStr, 0, sizeof szStr);
                    357:                        break;
                    358:                case 0:
                    359:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    360:                        break;
                    361:                default:
                    362:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    363:                        break;
                    364:        }
                    365: 
                    366:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    367:        if (!str) {
                    368:                mqtt_rtlm_log("Error:: not found topics table name");
                    369:                return NULL;
                    370:        }
                    371:        snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
                    372:                        "MsgID = %d AND Topic LIKE '%s' %s;", 
                    373:                        str, msgid, topic, szStr);
                    374: 
                    375:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    376:                MQTT_RTLM_LOG(sql);
                    377:                return NULL;
                    378:        }
                    379: 
                    380:        /* calculate count of rows and allocate subscribe items */
                    381:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    382:                rowz++;
                    383:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    384:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    385:                goto end;
                    386:        } else
                    387:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    388:        sqlite3_reset(stmt);
                    389: 
                    390:        /* fill with data */
                    391:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
                    392:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.2.2.1 ! misho     393:                s[j].sub_topic.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
        !           394:                s[j].sub_topic.msg_len = strlen((char*) s[j].sub_topic.msg_base);
        !           395:                s[j].sub_value.msg_base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
        !           396:                s[j].sub_value.msg_len = strlen((char*) s[j].sub_value.msg_base);
1.2       misho     397:        }
                    398: end:
                    399:        sqlite3_finalize(stmt);
                    400: 
                    401:        return s;
                    402: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>