Annotation of mqtt/src/pubmqtt.c, revision 1.1.2.13

1.1.2.1   misho       1: #include "global.h"
                      2: 
                      3: 
1.1.2.12  misho       4: extern const char sql_schema[];
                      5: 
                      6: 
1.1.2.2   misho       7: /*
                      8:  * mqtt_db_log() Log database connection message
                      9:  *
                     10:  * @fmt = format string
                     11:  * @... = argument list
                     12:  * return: none
                     13:  */
                     14: static void
                     15: mqtt_rtlm_log(const char *fmt, ...)
                     16: {
                     17:        va_list lst;
                     18: 
                     19:        va_start(lst, fmt);
                     20:        vsyslog(LOG_ERR, fmt, lst);
                     21:        va_end(lst);
                     22: }
1.1.2.3   misho      23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
1.1.2.2   misho      24:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
                     25: 
                     26: 
                     27: /*
                     28:  * mqtt_rtlm_open() Open database connection
                     29:  *
1.1.2.3   misho      30:  * @cfg = config filename
1.1.2.2   misho      31:  * return: NULL error or SQL handle
                     32:  */
                     33: sqlite3 *
                     34: mqtt_rtlm_open(sl_config *cfg)
                     35: {
                     36:        sqlite3 *sql = NULL;
                     37:        const char *str = NULL;
                     38: 
                     39:        if (!cfg)
                     40:                return NULL;
                     41: 
1.1.2.12  misho      42:        sqlite3_config(SQLITE_CONFIG_SERIALIZED);
                     43: 
1.1.2.2   misho      44:        str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
                     45:        if (!str) {
                     46:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
                     47:                return NULL;
                     48:        }
                     49: 
1.1.2.13! misho      50:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
1.1.2.3   misho      51:                MQTT_RTLM_LOG(sql);
1.1.2.2   misho      52:                sqlite3_close(sql);
                     53:                return NULL;
                     54:        }
                     55: 
1.1.2.12  misho      56:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
                     57:                MQTT_RTLM_LOG(sql);
                     58:                sqlite3_close(sql);
                     59:                return NULL;
                     60:        }
1.1.2.2   misho      61:        return sql;
                     62: }
                     63: 
                     64: /*
                     65:  * mqtt_rtlm_close() Close database connection
                     66:  *
                     67:  * @sql = SQL handle
                     68:  * return: none
                     69:  */
                     70: void
                     71: mqtt_rtlm_close(sqlite3 *sql)
                     72: {
                     73:        sqlite3_close(sql);
                     74: }
1.1.2.4   misho      75: 
                     76: /*
                     77:  * mqtt_rtlm_init_session() Create session
                     78:  *
                     79:  * @cfg = loaded config
                     80:  * @sql = SQL handle
                     81:  * @user = username
                     82:  * @host = hostname
                     83:  * @port = port
                     84:  * return: -1 error, 0 session already appears or >0 row changed
                     85:  */
                     86: int
                     87: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host, u_short port)
                     88: {
                     89:        int ret = 0;
                     90:        char *str, szStmt[BUFSIZ] = { 0 };
                     91:        sqlite3_stmt *stmt;
                     92: 
                     93:        if (!cfg || !sql)
                     94:                return -1;
                     95: 
                     96:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                     97:        if (!str) {
1.1.2.8   misho      98:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.4   misho      99:                return -1;
                    100:        }
                    101:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Username, RemoteHost, RemotePort) "
                    102:                       "VALUES ('%s', '%s', %d);", str, user, host, port);
                    103: 
                    104:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    105:                MQTT_RTLM_LOG(sql);
                    106:                return -1;
                    107:        }
                    108:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    109:                ret = sqlite3_changes(sql);
                    110:        else {
                    111:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    112:                        MQTT_RTLM_LOG(sql);
                    113:                ret = 0;
                    114:        }
                    115:        sqlite3_finalize(stmt);
                    116: 
                    117:        return ret;
                    118: }
1.1.2.5   misho     119: 
                    120: /*
                    121:  * mqtt_rtlm_fini_session() Delete session(s)
                    122:  *
                    123:  * @cfg = loaded config
                    124:  * @sql = SQL handle
                    125:  * @user = username
                    126:  * @host = hostname
                    127:  * return: -1 error, 0 session already appears or >0 row changed
                    128:  */
                    129: int
                    130: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)
                    131: {
                    132:        int ret = 0;
                    133:        char *str, szStmt[BUFSIZ] = { 0 };
                    134:        sqlite3_stmt *stmt;
                    135: 
                    136:        if (!cfg || !sql)
                    137:                return -1;
                    138: 
                    139:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    140:        if (!str) {
1.1.2.8   misho     141:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.5   misho     142:                return -1;
                    143:        }
                    144:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Username = '%s' AND RemoteHost LIKE '%s';", 
                    145:                        str, user, host);
                    146: 
                    147:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    148:                MQTT_RTLM_LOG(sql);
                    149:                return -1;
                    150:        }
                    151:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    152:                ret = sqlite3_changes(sql);
                    153:        else {
                    154:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    155:                        MQTT_RTLM_LOG(sql);
                    156:                ret = 0;
                    157:        }
                    158:        sqlite3_finalize(stmt);
                    159: 
                    160:        return ret;
                    161: }
1.1.2.6   misho     162: 
                    163: /*
                    164:  * mqtt_rtlm_chk_session() Check session(s)
                    165:  *
                    166:  * @cfg = loaded config
                    167:  * @sql = SQL handle
                    168:  * @user = username
                    169:  * @host = hostname
                    170:  * return: -1 error, 0 not logged or >0 logged found rows
                    171:  */
                    172: int
                    173: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *user, const char *host)
                    174: {
                    175:        int ret = 0;
                    176:        char *str, szStmt[BUFSIZ] = { 0 };
                    177:        sqlite3_stmt *stmt;
                    178: 
                    179:        if (!cfg || !sql)
                    180:                return -1;
                    181: 
                    182:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
                    183:        if (!str) {
1.1.2.8   misho     184:                mqtt_rtlm_log("Error:: not found online table name");
1.1.2.6   misho     185:                return -1;
                    186:        }
                    187:        snprintf(szStmt, sizeof szStmt, "SELECT RemoteHost, RemotePort FROM %s WHERE "
                    188:                        "Username = '%s' AND RemoteHost LIKE '%s';", str, user, host);
                    189: 
                    190:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    191:                MQTT_RTLM_LOG(sql);
                    192:                return -1;
                    193:        }
1.1.2.7   misho     194:        if (sqlite3_step(stmt) == SQLITE_ROW)
                    195:                ret = sqlite3_changes(sql);
                    196:        else
                    197:                ret = 0;
1.1.2.6   misho     198:        sqlite3_finalize(stmt);
                    199: 
                    200:        return ret;
                    201: }
1.1.2.8   misho     202: 
                    203: /*
                    204:  * mqtt_rtlm_write_topic() Publish topic
                    205:  *
                    206:  * @cfg = loaded config
                    207:  * @sql = SQL handle
                    208:  * @topic = topic
                    209:  * @txt = text
                    210:  * @user = username
                    211:  * @host = hostname
                    212:  * @retain = !=0 retain message to database
                    213:  * return: -1 error, 0 no publish or >0 published ok
                    214:  */
                    215: int
                    216: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, const char *topic, const char *txt, 
                    217:                const char *user, const char *host, char retain)
                    218: {
                    219:        int ret = 0;
                    220:        char *str, szStmt[BUFSIZ] = { 0 };
                    221:        sqlite3_stmt *stmt;
                    222: 
                    223:        if (!cfg || !sql || !topic)
                    224:                return -1;
                    225: 
                    226:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    227:        if (!str) {
                    228:                mqtt_rtlm_log("Error:: not found topics table name");
                    229:                return -1;
                    230:        }
                    231:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, Topic, Value, PubUser, PubDate, PubHost) "
                    232:                        "VALUES (%d, '%s', '%s', '%s', datetime('now', 'localtime'), '%s');", str, 
                    233:                        retain, topic, txt, user, host);
                    234: 
                    235:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    236:                MQTT_RTLM_LOG(sql);
                    237:                return -1;
                    238:        }
                    239:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    240:                ret = sqlite3_changes(sql);
                    241:        else {
                    242:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    243:                        MQTT_RTLM_LOG(sql);
                    244:                ret = 0;
                    245:        }
                    246:        sqlite3_finalize(stmt);
                    247: 
                    248:        return ret;
                    249: }
                    250: 
                    251: /*
                    252:  * mqtt_rtlm_delete_topic() Delete topic
                    253:  *
                    254:  * @cfg = loaded config
                    255:  * @sql = SQL handle
                    256:  * @topic = topic
                    257:  * @user = username
                    258:  * @host = hostname
                    259:  * @retain = -1 no matter
                    260:  * return: -1 error, 0 no changes or >0 deleted rows
                    261:  */
                    262: int
                    263: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, const char *topic, 
                    264:                const char *user, const char *host, char retain)
                    265: {
                    266:        int ret = 0;
                    267:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
                    268:        sqlite3_stmt *stmt;
                    269: 
                    270:        if (!cfg || !sql || !topic)
                    271:                return -1;
                    272: 
                    273:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    274:        if (!str) {
                    275:                mqtt_rtlm_log("Error:: not found topics table name");
                    276:                return -1;
                    277:        }
                    278:        switch (retain) {
                    279:                case -1:
                    280:                        rtn = "";
                    281:                        break;
                    282:                case 0:
                    283:                        rtn = "AND Retain = 0";
                    284:                        break;
                    285:                default:
                    286:                        rtn = "AND Retain != 0";
                    287:                        break;
                    288:        }
                    289:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE Topic LIKE '%s' AND "
                    290:                        "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
                    291:                        topic, user, host, rtn);
                    292: 
                    293:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    294:                MQTT_RTLM_LOG(sql);
                    295:                return -1;
                    296:        }
                    297:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
                    298:                ret = sqlite3_changes(sql);
                    299:        else {
                    300:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
                    301:                        MQTT_RTLM_LOG(sql);
                    302:                ret = 0;
                    303:        }
                    304:        sqlite3_finalize(stmt);
                    305: 
                    306:        return ret;
                    307: }
1.1.2.9   misho     308: 
                    309: /*
                    310:  * mqtt_rtlm_read_topic() Get topic
                    311:  *
                    312:  * @cfg = loaded config
                    313:  * @sql = SQL handle
                    314:  * @topic = topic
1.1.2.10  misho     315:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
                    316:  * return: NULL error or not found and !=NULL allocated subscribe topics
1.1.2.9   misho     317:  */
1.1.2.10  misho     318: mqtt_subscr_t *
                    319: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, const char *topic, char retain)
1.1.2.9   misho     320: {
1.1.2.10  misho     321:        int rowz = 0;
                    322:        char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
1.1.2.9   misho     323:        sqlite3_stmt *stmt;
1.1.2.10  misho     324:        register int j;
                    325:        mqtt_subscr_t *s = NULL;
1.1.2.9   misho     326: 
1.1.2.10  misho     327:        if (!cfg || !sql || !topic)
                    328:                return NULL;
                    329: 
                    330:        switch (retain) {
                    331:                case -1:
                    332:                        memset(szStr, 0, sizeof szStr);
                    333:                        break;
                    334:                case 0:
                    335:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
                    336:                        break;
                    337:                default:
                    338:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
                    339:                        break;
                    340:        }
1.1.2.9   misho     341: 
                    342:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
                    343:        if (!str) {
                    344:                mqtt_rtlm_log("Error:: not found topics table name");
1.1.2.10  misho     345:                return NULL;
1.1.2.9   misho     346:        }
1.1.2.10  misho     347:        snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE Topic LIKE '%s' %s;", 
                    348:                        str, topic, szStr);
1.1.2.9   misho     349: 
                    350:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
                    351:                MQTT_RTLM_LOG(sql);
1.1.2.10  misho     352:                return NULL;
1.1.2.9   misho     353:        }
1.1.2.10  misho     354: 
                    355:        /* calculate count of rows and allocate subscribe items */
                    356:        while (sqlite3_step(stmt) == SQLITE_ROW)
                    357:                rowz++;
                    358:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
                    359:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
                    360:                goto end;
                    361:        } else
                    362:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
                    363:        sqlite3_reset(stmt);
                    364: 
                    365:        /* fill with data */
                    366:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
1.1.2.11  misho     367:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
1.1.2.10  misho     368:                s[j].sub_topic._base = strdup(sqlite3_column_text(stmt, 1));
                    369:                s[j].sub_topic._size = strlen(s[j].sub_topic._base);
                    370:                s[j].sub_value._base = strdup(sqlite3_column_text(stmt, 2));
                    371:                s[j].sub_value._size = strlen(s[j].sub_value._base);
1.1.2.9   misho     372:        }
1.1.2.10  misho     373: end:
1.1.2.9   misho     374:        sqlite3_finalize(stmt);
                    375: 
1.1.2.10  misho     376:        return s;
1.1.2.9   misho     377: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>