Annotation of mqtt/src/pubmqtt.c, revision 1.1.2.18

1.1.2.1   misho       1: #include "global.h"
                      2: 
                      3: 
1.1.2.12  misho       4: extern const char sql_schema[];
                      5: 
                      6: 
1.1.2.2   misho       7: /*
                      8:  * mqtt_db_log() Log database connection message
                      9:  *
                     10:  * @fmt = format string
                     11:  * @... = argument list
                     12:  * return: none
                     13:  */
                     14: static void
                     15: mqtt_rtlm_log(const char *fmt, ...)
                     16: {
                     17:        va_list lst;
                     18: 
                     19:        va_start(lst, fmt);
                     20:        vsyslog(LOG_ERR, fmt, lst);
                     21:        va_end(lst);
                     22: }
1.1.2.3   misho      23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
1.1.2.2   misho      24:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     25: 
                     26: 
                     27: /*
                     28:  * mqtt_rtlm_open() Open database connection
                     29:  *
1.1.2.3   misho      30:  * @cfg = config filename
1.1.2.2   misho      31:  * return: NULL error or SQL handle
                     32:  */
                     33: sqlite3 *
                     34: mqtt_rtlm_open(sl_config *cfg)
                     35: {
                     36:        sqlite3 *sql = NULL;
                     37:        const char *str = NULL;
                     38: 
                     39:        if (!cfg)
                     40:                return NULL;
                     41: 
1.1.2.16  misho      42:        /*
                     43:        if (!sqlite3_threadsafe() || sqlite3_config(SQLITE_CONFIG_SERIALIZED))
                     44:                return NULL;
                     45:                */
1.1.2.12  misho      46: 
1.1.2.2   misho      47:        str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
                     48:        if (!str) {
                     49:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     50:                return NULL;
                     51:        }
                     52: 
1.1.2.13  misho      53:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
1.1.2.3   misho      54:                MQTT_RTLM_LOG(sql);
1.1.2.2   misho      55:                sqlite3_close(sql);
                     56:                return NULL;
                     57:        }
                     58: 
1.1.2.12  misho      59:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                     60:                MQTT_RTLM_LOG(sql);
                     61:                sqlite3_close(sql);
                     62:                return NULL;
                     63:        }
1.1.2.2   misho      64:        return sql;
                     65: }
                     66: 
                     67: /*
                     68:  * mqtt_rtlm_close() Close database connection
                     69:  *
                     70:  * @sql = SQL handle
                     71:  * return: none
                     72:  */
                     73: void
                     74: mqtt_rtlm_close(sqlite3 *sql)
                     75: {
                     76:        sqlite3_close(sql);
                     77: }
1.1.2.4   misho      78: 
                     79: /*
                     80:  * mqtt_rtlm_init_session() Create session
                     81:  *
                     82:  * @cfg = loaded config
                     83:  * @sql = SQL handle
1.1.2.14  misho      84:  * @connid = connection id
1.1.2.17  misho      85:  * @user = username
1.1.2.4   misho      86:  * @host = hostname
1.1.2.14  misho      87:  * @will = will flag if !=0 must fill arguments
                     88:  * @... = will arguments in order topic,msg,qos,retain
1.1.2.4   misho      89:  * return: -1 error, 0 session already appears or >0 row changed
                     90:  */
                     91: int
1.1.2.17  misho      92: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, 
                     93:                const char *host, char will, ...)
1.1.2.4   misho      94: {
1.1.2.14  misho      95:        va_list lst;
1.1.2.4   misho      96:        int ret = 0;
                     97:        char *str, szStmt[BUFSIZ] = { 0 };
                     98:        sqlite3_stmt *stmt;
                     99: 
                    100:        if (!cfg || !sql)
                    101:                return -1;
                    102: 
                    103:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    104:        if (!str) {
1.1.2.8   misho     105:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.4   misho     106:                return -1;
                    107:        }
1.1.2.14  misho     108:        if (!will)
1.1.2.17  misho     109:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
                    110:                                "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
1.1.2.14  misho     111:        else {
                    112:                va_start(lst, will);
1.1.2.17  misho     113:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
1.1.2.14  misho     114:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
1.1.2.17  misho     115:                                "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
                    116:                                str, connid, user, host, will, 
1.1.2.14  misho     117:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                    118:                va_end(lst);
                    119:        }
1.1.2.4   misho     120: 
                    121:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    122:                MQTT_RTLM_LOG(sql);
                    123:                return -1;
                    124:        }
                    125:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    126:                ret = sqlite3_changes(sql);
                    127:        else {
                    128:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    129:                        MQTT_RTLM_LOG(sql);
                    130:                ret = 0;
                    131:        }
                    132:        sqlite3_finalize(stmt);
                    133: 
                    134:        return ret;
                    135: }
1.1.2.5   misho     136: 
                    137: /*
                    138:  * mqtt_rtlm_fini_session() Delete session(s)
                    139:  *
                    140:  * @cfg = loaded config
                    141:  * @sql = SQL handle
1.1.2.14  misho     142:  * @connid = connection id
1.1.2.17  misho     143:  * @user = username
1.1.2.5   misho     144:  * @host = hostname
                    145:  * return: -1 error, 0 session already appears or >0 row changed
                    146:  */
                    147: int
1.1.2.17  misho     148: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.1.2.5   misho     149: {
                    150:        int ret = 0;
                    151:        char *str, szStmt[BUFSIZ] = { 0 };
                    152:        sqlite3_stmt *stmt;
                    153: 
                    154:        if (!cfg || !sql)
                    155:                return -1;
                    156: 
                    157:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    158:        if (!str) {
1.1.2.8   misho     159:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.5   misho     160:                return -1;
                    161:        }
1.1.2.17  misho     162:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
1.1.2.18! misho     163:                        "AND RemoteHost LIKE '%s';", str, connid, user, host);
1.1.2.5   misho     164: 
                    165:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    166:                MQTT_RTLM_LOG(sql);
                    167:                return -1;
                    168:        }
                    169:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    170:                ret = sqlite3_changes(sql);
                    171:        else {
                    172:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    173:                        MQTT_RTLM_LOG(sql);
                    174:                ret = 0;
                    175:        }
                    176:        sqlite3_finalize(stmt);
                    177: 
                    178:        return ret;
                    179: }
1.1.2.6   misho     180: 
                    181: /*
                    182:  * mqtt_rtlm_chk_session() Check session(s)
                    183:  *
                    184:  * @cfg = loaded config
                    185:  * @sql = SQL handle
1.1.2.14  misho     186:  * @connid = connection id
1.1.2.17  misho     187:  * @user = username
1.1.2.6   misho     188:  * @host = hostname
                    189:  * return: -1 error, 0 not logged or >0 logged found rows
                    190:  */
                    191: int
1.1.2.17  misho     192: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
1.1.2.6   misho     193: {
                    194:        int ret = 0;
                    195:        char *str, szStmt[BUFSIZ] = { 0 };
                    196:        sqlite3_stmt *stmt;
                    197: 
                    198:        if (!cfg || !sql)
                    199:                return -1;
                    200: 
                    201:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    202:        if (!str) {
1.1.2.8   misho     203:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.6   misho     204:                return -1;
                    205:        }
1.1.2.17  misho     206:        snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
                    207:                        "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';", 
                    208:                        str, connid, user, host);
1.1.2.6   misho     209: 
                    210:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    211:                MQTT_RTLM_LOG(sql);
                    212:                return -1;
                    213:        }
1.1.2.7   misho     214:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    215:                ret = sqlite3_changes(sql);
                    216:        else
                    217:                ret = 0;
1.1.2.6   misho     218:        sqlite3_finalize(stmt);
                    219: 
                    220:        return ret;
                    221: }
1.1.2.8   misho     222: 
                    223: /*
                    224:  * mqtt_rtlm_write_topic() Publish topic
                    225:  *
                    226:  * @cfg = loaded config
                    227:  * @sql = SQL handle
1.1.2.14  misho     228:  * @msgid = MessageID
1.1.2.8   misho     229:  * @topic = topic
                    230:  * @txt = text
                    231:  * @user = username
                    232:  * @host = hostname
                    233:  * @retain = !=0 retain message to database
                    234:  * return: -1 error, 0 no publish or >0 published ok
                    235:  */
                    236: int
1.1.2.14  misho     237: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
1.1.2.8   misho     238:                const char *user, const char *host, char retain)
                    239: {
                    240:        int ret = 0;
                    241:        char *str, szStmt[BUFSIZ] = { 0 };
                    242:        sqlite3_stmt *stmt;
                    243: 
                    244:        if (!cfg || !sql || !topic)
                    245:                return -1;
                    246: 
                    247:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    248:        if (!str) {
                    249:                mqtt_rtlm_log("Error:: not found topics table name");
                    250:                return -1;
                    251:        }
1.1.2.14  misho     252:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
                    253:                        "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
                    254:                        "datetime('now', 'localtime'), '%s');", 
                    255:                        str, retain, msgid, topic, txt, user, host);
1.1.2.8   misho     256: 
                    257:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    258:                MQTT_RTLM_LOG(sql);
                    259:                return -1;
                    260:        }
                    261:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    262:                ret = sqlite3_changes(sql);
                    263:        else {
                    264:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    265:                        MQTT_RTLM_LOG(sql);
                    266:                ret = 0;
                    267:        }
                    268:        sqlite3_finalize(stmt);
                    269: 
                    270:        return ret;
                    271: }
                    272: 
                    273: /*
                    274:  * mqtt_rtlm_delete_topic() Delete topic
                    275:  *
                    276:  * @cfg = loaded config
                    277:  * @sql = SQL handle
1.1.2.14  misho     278:  * @msgid = MessageID
1.1.2.8   misho     279:  * @topic = topic
                    280:  * @user = username
                    281:  * @host = hostname
                    282:  * @retain = -1 no matter
                    283:  * return: -1 error, 0 no changes or >0 deleted rows
                    284:  */
                    285: int
1.1.2.14  misho     286: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
1.1.2.8   misho     287:                const char *user, const char *host, char retain)
                    288: {
                    289:        int ret = 0;
                    290:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
                    291:        sqlite3_stmt *stmt;
                    292: 
                    293:        if (!cfg || !sql || !topic)
                    294:                return -1;
                    295: 
                    296:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    297:        if (!str) {
                    298:                mqtt_rtlm_log("Error:: not found topics table name");
                    299:                return -1;
                    300:        }
                    301:        switch (retain) {
                    302:                case -1:
                    303:                        rtn = "";
                    304:                        break;
                    305:                case 0:
                    306:                        rtn = "AND Retain = 0";
                    307:                        break;
                    308:                default:
                    309:                        rtn = "AND Retain != 0";
                    310:                        break;
                    311:        }
1.1.2.14  misho     312:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
1.1.2.8   misho     313:                        "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
1.1.2.14  misho     314:                        msgid, topic, user, host, rtn);
1.1.2.8   misho     315: 
                    316:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    317:                MQTT_RTLM_LOG(sql);
                    318:                return -1;
                    319:        }
                    320:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    321:                ret = sqlite3_changes(sql);
                    322:        else {
                    323:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    324:                        MQTT_RTLM_LOG(sql);
                    325:                ret = 0;
                    326:        }
                    327:        sqlite3_finalize(stmt);
                    328: 
                    329:        return ret;
                    330: }
1.1.2.9   misho     331: 
                    332: /*
                    333:  * mqtt_rtlm_read_topic() Get topic
                    334:  *
                    335:  * @cfg = loaded config
                    336:  * @sql = SQL handle
1.1.2.14  misho     337:  * @msgid = MessageID
1.1.2.9   misho     338:  * @topic = topic
1.1.2.10  misho     339:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    340:  * return: NULL error or not found and !=NULL allocated subscribe topics
1.1.2.9   misho     341:  */
1.1.2.10  misho     342: mqtt_subscr_t *
1.1.2.14  misho     343: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
1.1.2.9   misho     344: {
1.1.2.10  misho     345:        int rowz = 0;
                    346:        char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
1.1.2.9   misho     347:        sqlite3_stmt *stmt;
1.1.2.10  misho     348:        register int j;
                    349:        mqtt_subscr_t *s = NULL;
1.1.2.9   misho     350: 
1.1.2.10  misho     351:        if (!cfg || !sql || !topic)
                    352:                return NULL;
                    353: 
                    354:        switch (retain) {
                    355:                case -1:
                    356:                        memset(szStr, 0, sizeof szStr);
                    357:                        break;
                    358:                case 0:
                    359:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    360:                        break;
                    361:                default:
                    362:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    363:                        break;
                    364:        }
1.1.2.9   misho     365: 
                    366:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    367:        if (!str) {
                    368:                mqtt_rtlm_log("Error:: not found topics table name");
1.1.2.10  misho     369:                return NULL;
1.1.2.9   misho     370:        }
1.1.2.14  misho     371:        snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
                    372:                        "MsgID = %d AND Topic LIKE '%s' %s;", 
                    373:                        str, msgid, topic, szStr);
1.1.2.9   misho     374: 
                    375:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    376:                MQTT_RTLM_LOG(sql);
1.1.2.10  misho     377:                return NULL;
1.1.2.9   misho     378:        }
1.1.2.10  misho     379: 
                    380:        /* calculate count of rows and allocate subscribe items */
                    381:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    382:                rowz++;
                    383:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    384:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    385:                goto end;
                    386:        } else
                    387:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    388:        sqlite3_reset(stmt);
                    389: 
                    390:        /* fill with data */
                    391:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
1.1.2.11  misho     392:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.1.2.15  misho     393:                s[j].sub_topic._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
                    394:                s[j].sub_topic._size = strlen((char*) s[j].sub_topic._base);
                    395:                s[j].sub_value._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
                    396:                s[j].sub_value._size = strlen((char*) s[j].sub_value._base);
1.1.2.9   misho     397:        }
1.1.2.10  misho     398: end:
1.1.2.9   misho     399:        sqlite3_finalize(stmt);
                    400: 
1.1.2.10  misho     401:        return s;
1.1.2.9   misho     402: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>