Annotation of mqtt/src/pubmqtt.c, revision 1.1.2.11

1.1.2.1   misho       1: #include "global.h"
                      2: 
                      3: 
1.1.2.2   misho       4: /*
                      5:  * mqtt_db_log() Log database connection message
                      6:  *
                      7:  * @fmt = format string
                      8:  * @... = argument list
                      9:  * return: none
                     10:  */
                     11: static void
                     12: mqtt_rtlm_log(const char *fmt, ...)
                     13: {
                     14:        va_list lst;
                     15: 
                     16:        va_start(lst, fmt);
                     17:        vsyslog(LOG_ERR, fmt, lst);
                     18:        va_end(lst);
                     19: }
1.1.2.3   misho      20: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
1.1.2.2   misho      21:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     22: 
                     23: 
                     24: /*
                     25:  * mqtt_rtlm_open() Open database connection
                     26:  *
1.1.2.3   misho      27:  * @cfg = config filename
1.1.2.2   misho      28:  * return: NULL error or SQL handle
                     29:  */
                     30: sqlite3 *
                     31: mqtt_rtlm_open(sl_config *cfg)
                     32: {
                     33:        sqlite3 *sql = NULL;
                     34:        const char *str = NULL;
                     35: 
                     36:        if (!cfg)
                     37:                return NULL;
                     38: 
                     39:        str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
                     40:        if (!str) {
                     41:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     42:                return NULL;
                     43:        }
                     44: 
                     45:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE, NULL)) {
1.1.2.3   misho      46:                MQTT_RTLM_LOG(sql);
1.1.2.2   misho      47:                sqlite3_close(sql);
                     48:                return NULL;
                     49:        }
                     50: 
                     51:        return sql;
                     52: }
                     53: 
                     54: /*
                     55:  * mqtt_rtlm_close() Close database connection
                     56:  *
                     57:  * @sql = SQL handle
                     58:  * return: none
                     59:  */
                     60: void
                     61: mqtt_rtlm_close(sqlite3 *sql)
                     62: {
                     63:        sqlite3_close(sql);
                     64: }
1.1.2.4   misho      65: 
                     66: /*
                     67:  * mqtt_rtlm_init_session() Create session
                     68:  *
                     69:  * @cfg = loaded config
                     70:  * @sql = SQL handle
                     71:  * @user = username
                     72:  * @host = hostname
                     73:  * @port = port
                     74:  * return: -1 error, 0 session already appears or >0 row changed
                     75:  */
                     76: int
                     77: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host, u_short port)
                     78: {
                     79:        int ret = 0;
                     80:        char *str, szStmt[BUFSIZ] = { 0 };
                     81:        sqlite3_stmt *stmt;
                     82: 
                     83:        if (!cfg || !sql)
                     84:                return -1;
                     85: 
                     86:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                     87:        if (!str) {
1.1.2.8   misho      88:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.4   misho      89:                return -1;
                     90:        }
                     91:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, RemoteHost, RemotePort) "
                     92:                       "VALUES ('%s', '%s', %d);", str, user, host, port);
                     93: 
                     94:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                     95:                MQTT_RTLM_LOG(sql);
                     96:                return -1;
                     97:        }
                     98:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                     99:                ret = sqlite3_changes(sql);
                    100:        else {
                    101:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    102:                        MQTT_RTLM_LOG(sql);
                    103:                ret = 0;
                    104:        }
                    105:        sqlite3_finalize(stmt);
                    106: 
                    107:        return ret;
                    108: }
1.1.2.5   misho     109: 
                    110: /*
                    111:  * mqtt_rtlm_fini_session() Delete session(s)
                    112:  *
                    113:  * @cfg = loaded config
                    114:  * @sql = SQL handle
                    115:  * @user = username
                    116:  * @host = hostname
                    117:  * return: -1 error, 0 session already appears or >0 row changed
                    118:  */
                    119: int
                    120: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)
                    121: {
                    122:        int ret = 0;
                    123:        char *str, szStmt[BUFSIZ] = { 0 };
                    124:        sqlite3_stmt *stmt;
                    125: 
                    126:        if (!cfg || !sql)
                    127:                return -1;
                    128: 
                    129:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    130:        if (!str) {
1.1.2.8   misho     131:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.5   misho     132:                return -1;
                    133:        }
                    134:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Username = '%s' AND RemoteHost LIKE '%s';", 
                    135:                        str, user, host);
                    136: 
                    137:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    138:                MQTT_RTLM_LOG(sql);
                    139:                return -1;
                    140:        }
                    141:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    142:                ret = sqlite3_changes(sql);
                    143:        else {
                    144:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    145:                        MQTT_RTLM_LOG(sql);
                    146:                ret = 0;
                    147:        }
                    148:        sqlite3_finalize(stmt);
                    149: 
                    150:        return ret;
                    151: }
1.1.2.6   misho     152: 
                    153: /*
                    154:  * mqtt_rtlm_chk_session() Check session(s)
                    155:  *
                    156:  * @cfg = loaded config
                    157:  * @sql = SQL handle
                    158:  * @user = username
                    159:  * @host = hostname
                    160:  * return: -1 error, 0 not logged or >0 logged found rows
                    161:  */
                    162: int
                    163: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)
                    164: {
                    165:        int ret = 0;
                    166:        char *str, szStmt[BUFSIZ] = { 0 };
                    167:        sqlite3_stmt *stmt;
                    168: 
                    169:        if (!cfg || !sql)
                    170:                return -1;
                    171: 
                    172:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    173:        if (!str) {
1.1.2.8   misho     174:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.6   misho     175:                return -1;
                    176:        }
                    177:        snprintf(szStmt, sizeof szStmt, "SELECT RemoteHost, RemotePort FROM %s WHERE "
                    178:                        "Username = '%s' AND RemoteHost LIKE '%s';", str, user, host);
                    179: 
                    180:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    181:                MQTT_RTLM_LOG(sql);
                    182:                return -1;
                    183:        }
1.1.2.7   misho     184:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    185:                ret = sqlite3_changes(sql);
                    186:        else
                    187:                ret = 0;
1.1.2.6   misho     188:        sqlite3_finalize(stmt);
                    189: 
                    190:        return ret;
                    191: }
1.1.2.8   misho     192: 
                    193: /*
                    194:  * mqtt_rtlm_write_topic() Publish topic
                    195:  *
                    196:  * @cfg = loaded config
                    197:  * @sql = SQL handle
                    198:  * @topic = topic
                    199:  * @txt = text
                    200:  * @user = username
                    201:  * @host = hostname
                    202:  * @retain = !=0 retain message to database
                    203:  * return: -1 error, 0 no publish or >0 published ok
                    204:  */
                    205: int
                    206: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, const char *topic, const char *txt, 
                    207:                const char *user, const char *host, char retain)
                    208: {
                    209:        int ret = 0;
                    210:        char *str, szStmt[BUFSIZ] = { 0 };
                    211:        sqlite3_stmt *stmt;
                    212: 
                    213:        if (!cfg || !sql || !topic)
                    214:                return -1;
                    215: 
                    216:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    217:        if (!str) {
                    218:                mqtt_rtlm_log("Error:: not found topics table name");
                    219:                return -1;
                    220:        }
                    221:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, Topic, Value, PubUser, PubDate, PubHost) "
                    222:                        "VALUES (%d, '%s', '%s', '%s', datetime('now', 'localtime'), '%s');", str, 
                    223:                        retain, topic, txt, user, host);
                    224: 
                    225:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    226:                MQTT_RTLM_LOG(sql);
                    227:                return -1;
                    228:        }
                    229:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    230:                ret = sqlite3_changes(sql);
                    231:        else {
                    232:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    233:                        MQTT_RTLM_LOG(sql);
                    234:                ret = 0;
                    235:        }
                    236:        sqlite3_finalize(stmt);
                    237: 
                    238:        return ret;
                    239: }
                    240: 
                    241: /*
                    242:  * mqtt_rtlm_delete_topic() Delete topic
                    243:  *
                    244:  * @cfg = loaded config
                    245:  * @sql = SQL handle
                    246:  * @topic = topic
                    247:  * @user = username
                    248:  * @host = hostname
                    249:  * @retain = -1 no matter
                    250:  * return: -1 error, 0 no changes or >0 deleted rows
                    251:  */
                    252: int
                    253: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, const char *topic, 
                    254:                const char *user, const char *host, char retain)
                    255: {
                    256:        int ret = 0;
                    257:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
                    258:        sqlite3_stmt *stmt;
                    259: 
                    260:        if (!cfg || !sql || !topic)
                    261:                return -1;
                    262: 
                    263:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    264:        if (!str) {
                    265:                mqtt_rtlm_log("Error:: not found topics table name");
                    266:                return -1;
                    267:        }
                    268:        switch (retain) {
                    269:                case -1:
                    270:                        rtn = "";
                    271:                        break;
                    272:                case 0:
                    273:                        rtn = "AND Retain = 0";
                    274:                        break;
                    275:                default:
                    276:                        rtn = "AND Retain != 0";
                    277:                        break;
                    278:        }
                    279:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
                    280:                        "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
                    281:                        topic, user, host, rtn);
                    282: 
                    283:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    284:                MQTT_RTLM_LOG(sql);
                    285:                return -1;
                    286:        }
                    287:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    288:                ret = sqlite3_changes(sql);
                    289:        else {
                    290:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    291:                        MQTT_RTLM_LOG(sql);
                    292:                ret = 0;
                    293:        }
                    294:        sqlite3_finalize(stmt);
                    295: 
                    296:        return ret;
                    297: }
1.1.2.9   misho     298: 
                    299: /*
                    300:  * mqtt_rtlm_read_topic() Get topic
                    301:  *
                    302:  * @cfg = loaded config
                    303:  * @sql = SQL handle
                    304:  * @topic = topic
1.1.2.10  misho     305:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    306:  * return: NULL error or not found and !=NULL allocated subscribe topics
1.1.2.9   misho     307:  */
1.1.2.10  misho     308: mqtt_subscr_t *
                    309: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, const char *topic, char retain)
1.1.2.9   misho     310: {
1.1.2.10  misho     311:        int rowz = 0;
                    312:        char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
1.1.2.9   misho     313:        sqlite3_stmt *stmt;
1.1.2.10  misho     314:        register int j;
                    315:        mqtt_subscr_t *s = NULL;
1.1.2.9   misho     316: 
1.1.2.10  misho     317:        if (!cfg || !sql || !topic)
                    318:                return NULL;
                    319: 
                    320:        switch (retain) {
                    321:                case -1:
                    322:                        memset(szStr, 0, sizeof szStr);
                    323:                        break;
                    324:                case 0:
                    325:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    326:                        break;
                    327:                default:
                    328:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    329:                        break;
                    330:        }
1.1.2.9   misho     331: 
                    332:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    333:        if (!str) {
                    334:                mqtt_rtlm_log("Error:: not found topics table name");
1.1.2.10  misho     335:                return NULL;
1.1.2.9   misho     336:        }
1.1.2.10  misho     337:        snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE Topic LIKE '%s' %s;", 
                    338:                        str, topic, szStr);
1.1.2.9   misho     339: 
                    340:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    341:                MQTT_RTLM_LOG(sql);
1.1.2.10  misho     342:                return NULL;
1.1.2.9   misho     343:        }
1.1.2.10  misho     344: 
                    345:        /* calculate count of rows and allocate subscribe items */
                    346:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    347:                rowz++;
                    348:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    349:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    350:                goto end;
                    351:        } else
                    352:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    353:        sqlite3_reset(stmt);
                    354: 
                    355:        /* fill with data */
                    356:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
1.1.2.11! misho     357:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.1.2.10  misho     358:                s[j].sub_topic._base = strdup(sqlite3_column_text(stmt, 1));
                    359:                s[j].sub_topic._size = strlen(s[j].sub_topic._base);
                    360:                s[j].sub_value._base = strdup(sqlite3_column_text(stmt, 2));
                    361:                s[j].sub_value._size = strlen(s[j].sub_value._base);
1.1.2.9   misho     362:        }
1.1.2.10  misho     363: end:
1.1.2.9   misho     364:        sqlite3_finalize(stmt);
                    365: 
1.1.2.10  misho     366:        return s;
1.1.2.9   misho     367: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>