Annotation of mqtt/src/pubmqtt.c, revision 1.1.2.16

1.1.2.1   misho       1: #include "global.h"
                      2: 
                      3: 
1.1.2.12  misho       4: extern const char sql_schema[];
                      5: 
                      6: 
1.1.2.2   misho       7: /*
                      8:  * mqtt_db_log() Log database connection message
                      9:  *
                     10:  * @fmt = format string
                     11:  * @... = argument list
                     12:  * return: none
                     13:  */
                     14: static void
                     15: mqtt_rtlm_log(const char *fmt, ...)
                     16: {
                     17:        va_list lst;
                     18: 
                     19:        va_start(lst, fmt);
                     20:        vsyslog(LOG_ERR, fmt, lst);
                     21:        va_end(lst);
                     22: }
1.1.2.3   misho      23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
1.1.2.2   misho      24:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     25: 
                     26: 
                     27: /*
                     28:  * mqtt_rtlm_open() Open database connection
                     29:  *
1.1.2.3   misho      30:  * @cfg = config filename
1.1.2.2   misho      31:  * return: NULL error or SQL handle
                     32:  */
                     33: sqlite3 *
                     34: mqtt_rtlm_open(sl_config *cfg)
                     35: {
                     36:        sqlite3 *sql = NULL;
                     37:        const char *str = NULL;
                     38: 
                     39:        if (!cfg)
                     40:                return NULL;
                     41: 
1.1.2.16! misho      42:        /*
        !            43:        if (!sqlite3_threadsafe() || sqlite3_config(SQLITE_CONFIG_SERIALIZED))
        !            44:                return NULL;
        !            45:                */
1.1.2.12  misho      46: 
1.1.2.2   misho      47:        str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
                     48:        if (!str) {
                     49:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     50:                return NULL;
                     51:        }
                     52: 
1.1.2.13  misho      53:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
1.1.2.3   misho      54:                MQTT_RTLM_LOG(sql);
1.1.2.2   misho      55:                sqlite3_close(sql);
                     56:                return NULL;
                     57:        }
                     58: 
1.1.2.12  misho      59:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                     60:                MQTT_RTLM_LOG(sql);
                     61:                sqlite3_close(sql);
                     62:                return NULL;
                     63:        }
1.1.2.2   misho      64:        return sql;
                     65: }
                     66: 
                     67: /*
                     68:  * mqtt_rtlm_close() Close database connection
                     69:  *
                     70:  * @sql = SQL handle
                     71:  * return: none
                     72:  */
                     73: void
                     74: mqtt_rtlm_close(sqlite3 *sql)
                     75: {
                     76:        sqlite3_close(sql);
                     77: }
1.1.2.4   misho      78: 
                     79: /*
                     80:  * mqtt_rtlm_init_session() Create session
                     81:  *
                     82:  * @cfg = loaded config
                     83:  * @sql = SQL handle
                     84:  * @user = username
1.1.2.14  misho      85:  * @connid = connection id
1.1.2.4   misho      86:  * @host = hostname
                     87:  * @port = port
1.1.2.14  misho      88:  * @will = will flag if !=0 must fill arguments
                     89:  * @... = will arguments in order topic,msg,qos,retain
1.1.2.4   misho      90:  * return: -1 error, 0 session already appears or >0 row changed
                     91:  */
                     92: int
1.1.2.14  misho      93: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, 
                     94:                const char *host, u_short port, char will, ...)
1.1.2.4   misho      95: {
1.1.2.14  misho      96:        va_list lst;
1.1.2.4   misho      97:        int ret = 0;
                     98:        char *str, szStmt[BUFSIZ] = { 0 };
                     99:        sqlite3_stmt *stmt;
                    100: 
                    101:        if (!cfg || !sql)
                    102:                return -1;
                    103: 
                    104:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    105:        if (!str) {
1.1.2.8   misho     106:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.4   misho     107:                return -1;
                    108:        }
1.1.2.14  misho     109:        if (!will)
                    110:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, ConnID, RemoteHost, RemotePort, "
                    111:                                "WillFlag) VALUES ('%s', '%s', '%s', %d, 0);", str, user, connid, host, port);
                    112:        else {
                    113:                va_start(lst, will);
                    114:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, ConnID, RemoteHost, RemotePort, "
                    115:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
                    116:                                "VALUES ('%s', '%s', '%s', %d, %d, %d, %d, '%s', '%s');", 
                    117:                                str, user, connid, host, port, will, 
                    118:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
                    119:                va_end(lst);
                    120:        }
1.1.2.4   misho     121: 
                    122:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    123:                MQTT_RTLM_LOG(sql);
                    124:                return -1;
                    125:        }
                    126:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    127:                ret = sqlite3_changes(sql);
                    128:        else {
                    129:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    130:                        MQTT_RTLM_LOG(sql);
                    131:                ret = 0;
                    132:        }
                    133:        sqlite3_finalize(stmt);
                    134: 
                    135:        return ret;
                    136: }
1.1.2.5   misho     137: 
                    138: /*
                    139:  * mqtt_rtlm_fini_session() Delete session(s)
                    140:  *
                    141:  * @cfg = loaded config
                    142:  * @sql = SQL handle
                    143:  * @user = username
1.1.2.14  misho     144:  * @connid = connection id
1.1.2.5   misho     145:  * @host = hostname
                    146:  * return: -1 error, 0 session already appears or >0 row changed
                    147:  */
                    148: int
1.1.2.14  misho     149: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, const char *host)
1.1.2.5   misho     150: {
                    151:        int ret = 0;
                    152:        char *str, szStmt[BUFSIZ] = { 0 };
                    153:        sqlite3_stmt *stmt;
                    154: 
                    155:        if (!cfg || !sql)
                    156:                return -1;
                    157: 
                    158:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    159:        if (!str) {
1.1.2.8   misho     160:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.5   misho     161:                return -1;
                    162:        }
1.1.2.14  misho     163:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Username = '%s' AND ConnID = '%s' "
                    164:                        "AND RemoteHost LIKE '%s';", str, user, connid, host);
1.1.2.5   misho     165: 
                    166:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    167:                MQTT_RTLM_LOG(sql);
                    168:                return -1;
                    169:        }
                    170:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    171:                ret = sqlite3_changes(sql);
                    172:        else {
                    173:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    174:                        MQTT_RTLM_LOG(sql);
                    175:                ret = 0;
                    176:        }
                    177:        sqlite3_finalize(stmt);
                    178: 
                    179:        return ret;
                    180: }
1.1.2.6   misho     181: 
                    182: /*
                    183:  * mqtt_rtlm_chk_session() Check session(s)
                    184:  *
                    185:  * @cfg = loaded config
                    186:  * @sql = SQL handle
                    187:  * @user = username
1.1.2.14  misho     188:  * @connid = connection id
1.1.2.6   misho     189:  * @host = hostname
                    190:  * return: -1 error, 0 not logged or >0 logged found rows
                    191:  */
                    192: int
1.1.2.14  misho     193: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *connid, const char *host)
1.1.2.6   misho     194: {
                    195:        int ret = 0;
                    196:        char *str, szStmt[BUFSIZ] = { 0 };
                    197:        sqlite3_stmt *stmt;
                    198: 
                    199:        if (!cfg || !sql)
                    200:                return -1;
                    201: 
                    202:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    203:        if (!str) {
1.1.2.8   misho     204:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.6   misho     205:                return -1;
                    206:        }
1.1.2.14  misho     207:        snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost, RemotePort FROM %s WHERE "
                    208:                        "Username = '%s' AND ConnID = '%s' AND RemoteHost LIKE '%s';", 
                    209:                        str, user, connid, host);
1.1.2.6   misho     210: 
                    211:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    212:                MQTT_RTLM_LOG(sql);
                    213:                return -1;
                    214:        }
1.1.2.7   misho     215:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    216:                ret = sqlite3_changes(sql);
                    217:        else
                    218:                ret = 0;
1.1.2.6   misho     219:        sqlite3_finalize(stmt);
                    220: 
                    221:        return ret;
                    222: }
1.1.2.8   misho     223: 
                    224: /*
                    225:  * mqtt_rtlm_write_topic() Publish topic
                    226:  *
                    227:  * @cfg = loaded config
                    228:  * @sql = SQL handle
1.1.2.14  misho     229:  * @msgid = MessageID
1.1.2.8   misho     230:  * @topic = topic
                    231:  * @txt = text
                    232:  * @user = username
                    233:  * @host = hostname
                    234:  * @retain = !=0 retain message to database
                    235:  * return: -1 error, 0 no publish or >0 published ok
                    236:  */
                    237: int
1.1.2.14  misho     238: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
1.1.2.8   misho     239:                const char *user, const char *host, char retain)
                    240: {
                    241:        int ret = 0;
                    242:        char *str, szStmt[BUFSIZ] = { 0 };
                    243:        sqlite3_stmt *stmt;
                    244: 
                    245:        if (!cfg || !sql || !topic)
                    246:                return -1;
                    247: 
                    248:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    249:        if (!str) {
                    250:                mqtt_rtlm_log("Error:: not found topics table name");
                    251:                return -1;
                    252:        }
1.1.2.14  misho     253:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
                    254:                        "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
                    255:                        "datetime('now', 'localtime'), '%s');", 
                    256:                        str, retain, msgid, topic, txt, user, host);
1.1.2.8   misho     257: 
                    258:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    259:                MQTT_RTLM_LOG(sql);
                    260:                return -1;
                    261:        }
                    262:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    263:                ret = sqlite3_changes(sql);
                    264:        else {
                    265:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    266:                        MQTT_RTLM_LOG(sql);
                    267:                ret = 0;
                    268:        }
                    269:        sqlite3_finalize(stmt);
                    270: 
                    271:        return ret;
                    272: }
                    273: 
                    274: /*
                    275:  * mqtt_rtlm_delete_topic() Delete topic
                    276:  *
                    277:  * @cfg = loaded config
                    278:  * @sql = SQL handle
1.1.2.14  misho     279:  * @msgid = MessageID
1.1.2.8   misho     280:  * @topic = topic
                    281:  * @user = username
                    282:  * @host = hostname
                    283:  * @retain = -1 no matter
                    284:  * return: -1 error, 0 no changes or >0 deleted rows
                    285:  */
                    286: int
1.1.2.14  misho     287: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
1.1.2.8   misho     288:                const char *user, const char *host, char retain)
                    289: {
                    290:        int ret = 0;
                    291:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
                    292:        sqlite3_stmt *stmt;
                    293: 
                    294:        if (!cfg || !sql || !topic)
                    295:                return -1;
                    296: 
                    297:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    298:        if (!str) {
                    299:                mqtt_rtlm_log("Error:: not found topics table name");
                    300:                return -1;
                    301:        }
                    302:        switch (retain) {
                    303:                case -1:
                    304:                        rtn = "";
                    305:                        break;
                    306:                case 0:
                    307:                        rtn = "AND Retain = 0";
                    308:                        break;
                    309:                default:
                    310:                        rtn = "AND Retain != 0";
                    311:                        break;
                    312:        }
1.1.2.14  misho     313:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
1.1.2.8   misho     314:                        "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
1.1.2.14  misho     315:                        msgid, topic, user, host, rtn);
1.1.2.8   misho     316: 
                    317:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    318:                MQTT_RTLM_LOG(sql);
                    319:                return -1;
                    320:        }
                    321:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    322:                ret = sqlite3_changes(sql);
                    323:        else {
                    324:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    325:                        MQTT_RTLM_LOG(sql);
                    326:                ret = 0;
                    327:        }
                    328:        sqlite3_finalize(stmt);
                    329: 
                    330:        return ret;
                    331: }
1.1.2.9   misho     332: 
                    333: /*
                    334:  * mqtt_rtlm_read_topic() Get topic
                    335:  *
                    336:  * @cfg = loaded config
                    337:  * @sql = SQL handle
1.1.2.14  misho     338:  * @msgid = MessageID
1.1.2.9   misho     339:  * @topic = topic
1.1.2.10  misho     340:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    341:  * return: NULL error or not found and !=NULL allocated subscribe topics
1.1.2.9   misho     342:  */
1.1.2.10  misho     343: mqtt_subscr_t *
1.1.2.14  misho     344: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
1.1.2.9   misho     345: {
1.1.2.10  misho     346:        int rowz = 0;
                    347:        char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
1.1.2.9   misho     348:        sqlite3_stmt *stmt;
1.1.2.10  misho     349:        register int j;
                    350:        mqtt_subscr_t *s = NULL;
1.1.2.9   misho     351: 
1.1.2.10  misho     352:        if (!cfg || !sql || !topic)
                    353:                return NULL;
                    354: 
                    355:        switch (retain) {
                    356:                case -1:
                    357:                        memset(szStr, 0, sizeof szStr);
                    358:                        break;
                    359:                case 0:
                    360:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    361:                        break;
                    362:                default:
                    363:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    364:                        break;
                    365:        }
1.1.2.9   misho     366: 
                    367:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    368:        if (!str) {
                    369:                mqtt_rtlm_log("Error:: not found topics table name");
1.1.2.10  misho     370:                return NULL;
1.1.2.9   misho     371:        }
1.1.2.14  misho     372:        snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
                    373:                        "MsgID = %d AND Topic LIKE '%s' %s;", 
                    374:                        str, msgid, topic, szStr);
1.1.2.9   misho     375: 
                    376:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    377:                MQTT_RTLM_LOG(sql);
1.1.2.10  misho     378:                return NULL;
1.1.2.9   misho     379:        }
1.1.2.10  misho     380: 
                    381:        /* calculate count of rows and allocate subscribe items */
                    382:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    383:                rowz++;
                    384:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    385:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    386:                goto end;
                    387:        } else
                    388:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    389:        sqlite3_reset(stmt);
                    390: 
                    391:        /* fill with data */
                    392:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
1.1.2.11  misho     393:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.1.2.15  misho     394:                s[j].sub_topic._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
                    395:                s[j].sub_topic._size = strlen((char*) s[j].sub_topic._base);
                    396:                s[j].sub_value._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
                    397:                s[j].sub_value._size = strlen((char*) s[j].sub_value._base);
1.1.2.9   misho     398:        }
1.1.2.10  misho     399: end:
1.1.2.9   misho     400:        sqlite3_finalize(stmt);
                    401: 
1.1.2.10  misho     402:        return s;
1.1.2.9   misho     403: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>