Annotation of mqtt/src/pubmqtt.c, revision 1.2

1.2     ! misho       1: #include "global.h"
        !             2: 
        !             3: 
        !             4: extern const char sql_schema[];
        !             5: 
        !             6: 
        !             7: /*
        !             8:  * mqtt_db_log() Log database connection message
        !             9:  *
        !            10:  * @fmt = format string
        !            11:  * @... = argument list
        !            12:  * return: none
        !            13:  */
        !            14: static void
        !            15: mqtt_rtlm_log(const char *fmt, ...)
        !            16: {
        !            17:        va_list lst;
        !            18: 
        !            19:        va_start(lst, fmt);
        !            20:        vsyslog(LOG_ERR, fmt, lst);
        !            21:        va_end(lst);
        !            22: }
        !            23: #define MQTT_RTLM_LOG(_sql)    (assert((_sql)), mqtt_rtlm_log("Error:: SQL #%d - %s", \
        !            24:                                        sqlite3_errcode((_sql)), sqlite3_errmsg((_sql))))
        !            25: 
        !            26: 
        !            27: /*
        !            28:  * mqtt_rtlm_open() Open database connection
        !            29:  *
        !            30:  * @cfg = config filename
        !            31:  * return: NULL error or SQL handle
        !            32:  */
        !            33: sqlite3 *
        !            34: mqtt_rtlm_open(sl_config *cfg)
        !            35: {
        !            36:        sqlite3 *sql = NULL;
        !            37:        const char *str = NULL;
        !            38: 
        !            39:        if (!cfg)
        !            40:                return NULL;
        !            41: 
        !            42:        /*
        !            43:        if (!sqlite3_threadsafe() || sqlite3_config(SQLITE_CONFIG_SERIALIZED))
        !            44:                return NULL;
        !            45:                */
        !            46: 
        !            47:        str = (const char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("name"));
        !            48:        if (!str) {
        !            49:                mqtt_rtlm_log("Error:: Unknown database name ...\n");
        !            50:                return NULL;
        !            51:        }
        !            52: 
        !            53:        if (sqlite3_open_v2(str, &sql, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL)) {
        !            54:                MQTT_RTLM_LOG(sql);
        !            55:                sqlite3_close(sql);
        !            56:                return NULL;
        !            57:        }
        !            58: 
        !            59:        if (sqlite3_exec(sql, sql_schema, NULL, NULL, NULL)) {
        !            60:                MQTT_RTLM_LOG(sql);
        !            61:                sqlite3_close(sql);
        !            62:                return NULL;
        !            63:        }
        !            64:        return sql;
        !            65: }
        !            66: 
        !            67: /*
        !            68:  * mqtt_rtlm_close() Close database connection
        !            69:  *
        !            70:  * @sql = SQL handle
        !            71:  * return: none
        !            72:  */
        !            73: void
        !            74: mqtt_rtlm_close(sqlite3 *sql)
        !            75: {
        !            76:        sqlite3_close(sql);
        !            77: }
        !            78: 
        !            79: /*
        !            80:  * mqtt_rtlm_init_session() Create session
        !            81:  *
        !            82:  * @cfg = loaded config
        !            83:  * @sql = SQL handle
        !            84:  * @connid = connection id
        !            85:  * @user = username
        !            86:  * @host = hostname
        !            87:  * @will = will flag if !=0 must fill arguments
        !            88:  * @... = will arguments in order topic,msg,qos,retain
        !            89:  * return: -1 error, 0 session already appears or >0 row changed
        !            90:  */
        !            91: int
        !            92: mqtt_rtlm_init_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, 
        !            93:                const char *host, char will, ...)
        !            94: {
        !            95:        va_list lst;
        !            96:        int ret = 0;
        !            97:        char *str, szStmt[BUFSIZ] = { 0 };
        !            98:        sqlite3_stmt *stmt;
        !            99: 
        !           100:        if (!cfg || !sql)
        !           101:                return -1;
        !           102: 
        !           103:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
        !           104:        if (!str) {
        !           105:                mqtt_rtlm_log("Error:: not found online table name");
        !           106:                return -1;
        !           107:        }
        !           108:        if (!will)
        !           109:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
        !           110:                                "WillFlag) VALUES ('%s', '%s', '%s', 0);", str, connid, user, host);
        !           111:        else {
        !           112:                va_start(lst, will);
        !           113:                snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (ConnID, Username, RemoteHost, "
        !           114:                                "WillFlag, WillRetain, WillQoS, WillMsg, WillTopic) "
        !           115:                                "VALUES ('%s', '%s', '%s', %d, %d, %d, '%s', '%s');", 
        !           116:                                str, connid, user, host, will, 
        !           117:                                va_arg(lst, int), va_arg(lst, int), va_arg(lst, char*), va_arg(lst, char*));
        !           118:                va_end(lst);
        !           119:        }
        !           120: 
        !           121:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
        !           122:                MQTT_RTLM_LOG(sql);
        !           123:                return -1;
        !           124:        }
        !           125:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
        !           126:                ret = sqlite3_changes(sql);
        !           127:        else {
        !           128:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
        !           129:                        MQTT_RTLM_LOG(sql);
        !           130:                ret = 0;
        !           131:        }
        !           132:        sqlite3_finalize(stmt);
        !           133: 
        !           134:        return ret;
        !           135: }
        !           136: 
        !           137: /*
        !           138:  * mqtt_rtlm_fini_session() Delete session(s)
        !           139:  *
        !           140:  * @cfg = loaded config
        !           141:  * @sql = SQL handle
        !           142:  * @connid = connection id
        !           143:  * @user = username
        !           144:  * @host = hostname
        !           145:  * return: -1 error, 0 session already appears or >0 row changed
        !           146:  */
        !           147: int
        !           148: mqtt_rtlm_fini_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
        !           149: {
        !           150:        int ret = 0;
        !           151:        char *str, szStmt[BUFSIZ] = { 0 };
        !           152:        sqlite3_stmt *stmt;
        !           153: 
        !           154:        if (!cfg || !sql)
        !           155:                return -1;
        !           156: 
        !           157:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
        !           158:        if (!str) {
        !           159:                mqtt_rtlm_log("Error:: not found online table name");
        !           160:                return -1;
        !           161:        }
        !           162:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE ConnID = '%s' AND Username = '%s' "
        !           163:                        "AND RemoteHost LIKE '%s';", str, connid, user, host);
        !           164: 
        !           165:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
        !           166:                MQTT_RTLM_LOG(sql);
        !           167:                return -1;
        !           168:        }
        !           169:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
        !           170:                ret = sqlite3_changes(sql);
        !           171:        else {
        !           172:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
        !           173:                        MQTT_RTLM_LOG(sql);
        !           174:                ret = 0;
        !           175:        }
        !           176:        sqlite3_finalize(stmt);
        !           177: 
        !           178:        return ret;
        !           179: }
        !           180: 
        !           181: /*
        !           182:  * mqtt_rtlm_chk_session() Check session(s)
        !           183:  *
        !           184:  * @cfg = loaded config
        !           185:  * @sql = SQL handle
        !           186:  * @connid = connection id
        !           187:  * @user = username
        !           188:  * @host = hostname
        !           189:  * return: -1 error, 0 not logged or >0 logged found rows
        !           190:  */
        !           191: int
        !           192: mqtt_rtlm_chk_session(sl_config *cfg, sqlite3 *sql, const char *connid, const char *user, const char *host)
        !           193: {
        !           194:        int ret = 0;
        !           195:        char *str, szStmt[BUFSIZ] = { 0 };
        !           196:        sqlite3_stmt *stmt;
        !           197: 
        !           198:        if (!cfg || !sql)
        !           199:                return -1;
        !           200: 
        !           201:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_online"));
        !           202:        if (!str) {
        !           203:                mqtt_rtlm_log("Error:: not found online table name");
        !           204:                return -1;
        !           205:        }
        !           206:        snprintf(szStmt, sizeof szStmt, "SELECT ConnID, RemoteHost FROM %s WHERE "
        !           207:                        "ConnID = '%s' AND Username = '%s' AND RemoteHost LIKE '%s';", 
        !           208:                        str, connid, user, host);
        !           209: 
        !           210:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
        !           211:                MQTT_RTLM_LOG(sql);
        !           212:                return -1;
        !           213:        }
        !           214:        if (sqlite3_step(stmt) == SQLITE_ROW)
        !           215:                ret = sqlite3_changes(sql);
        !           216:        else
        !           217:                ret = 0;
        !           218:        sqlite3_finalize(stmt);
        !           219: 
        !           220:        return ret;
        !           221: }
        !           222: 
        !           223: /*
        !           224:  * mqtt_rtlm_write_topic() Publish topic
        !           225:  *
        !           226:  * @cfg = loaded config
        !           227:  * @sql = SQL handle
        !           228:  * @msgid = MessageID
        !           229:  * @topic = topic
        !           230:  * @txt = text
        !           231:  * @user = username
        !           232:  * @host = hostname
        !           233:  * @retain = !=0 retain message to database
        !           234:  * return: -1 error, 0 no publish or >0 published ok
        !           235:  */
        !           236: int
        !           237: mqtt_rtlm_write_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, const char *txt, 
        !           238:                const char *user, const char *host, char retain)
        !           239: {
        !           240:        int ret = 0;
        !           241:        char *str, szStmt[BUFSIZ] = { 0 };
        !           242:        sqlite3_stmt *stmt;
        !           243: 
        !           244:        if (!cfg || !sql || !topic)
        !           245:                return -1;
        !           246: 
        !           247:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
        !           248:        if (!str) {
        !           249:                mqtt_rtlm_log("Error:: not found topics table name");
        !           250:                return -1;
        !           251:        }
        !           252:        snprintf(szStmt, sizeof szStmt, "INSERT INTO %s (Retain, MsgID, Topic, Value, PubUser, "
        !           253:                        "PubDate, PubHost) VALUES (%d, %d, '%s', '%s', '%s', "
        !           254:                        "datetime('now', 'localtime'), '%s');", 
        !           255:                        str, retain, msgid, topic, txt, user, host);
        !           256: 
        !           257:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
        !           258:                MQTT_RTLM_LOG(sql);
        !           259:                return -1;
        !           260:        }
        !           261:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
        !           262:                ret = sqlite3_changes(sql);
        !           263:        else {
        !           264:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
        !           265:                        MQTT_RTLM_LOG(sql);
        !           266:                ret = 0;
        !           267:        }
        !           268:        sqlite3_finalize(stmt);
        !           269: 
        !           270:        return ret;
        !           271: }
        !           272: 
        !           273: /*
        !           274:  * mqtt_rtlm_delete_topic() Delete topic
        !           275:  *
        !           276:  * @cfg = loaded config
        !           277:  * @sql = SQL handle
        !           278:  * @msgid = MessageID
        !           279:  * @topic = topic
        !           280:  * @user = username
        !           281:  * @host = hostname
        !           282:  * @retain = -1 no matter
        !           283:  * return: -1 error, 0 no changes or >0 deleted rows
        !           284:  */
        !           285: int
        !           286: mqtt_rtlm_delete_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, 
        !           287:                const char *user, const char *host, char retain)
        !           288: {
        !           289:        int ret = 0;
        !           290:        char *str, *rtn, szStmt[BUFSIZ] = { 0 };
        !           291:        sqlite3_stmt *stmt;
        !           292: 
        !           293:        if (!cfg || !sql || !topic)
        !           294:                return -1;
        !           295: 
        !           296:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
        !           297:        if (!str) {
        !           298:                mqtt_rtlm_log("Error:: not found topics table name");
        !           299:                return -1;
        !           300:        }
        !           301:        switch (retain) {
        !           302:                case -1:
        !           303:                        rtn = "";
        !           304:                        break;
        !           305:                case 0:
        !           306:                        rtn = "AND Retain = 0";
        !           307:                        break;
        !           308:                default:
        !           309:                        rtn = "AND Retain != 0";
        !           310:                        break;
        !           311:        }
        !           312:        snprintf(szStmt, sizeof szStmt, "DELETE FROM %s WHERE MsgID = %d AND Topic LIKE '%s' AND "
        !           313:                        "PubUser LIKE '%s' AND PubHost LIKE '%s' %s;", str, 
        !           314:                        msgid, topic, user, host, rtn);
        !           315: 
        !           316:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
        !           317:                MQTT_RTLM_LOG(sql);
        !           318:                return -1;
        !           319:        }
        !           320:        if ((ret = sqlite3_step(stmt)) == SQLITE_DONE)
        !           321:                ret = sqlite3_changes(sql);
        !           322:        else {
        !           323:                if (ret > SQLITE_OK && ret < SQLITE_ROW)
        !           324:                        MQTT_RTLM_LOG(sql);
        !           325:                ret = 0;
        !           326:        }
        !           327:        sqlite3_finalize(stmt);
        !           328: 
        !           329:        return ret;
        !           330: }
        !           331: 
        !           332: /*
        !           333:  * mqtt_rtlm_read_topic() Get topic
        !           334:  *
        !           335:  * @cfg = loaded config
        !           336:  * @sql = SQL handle
        !           337:  * @msgid = MessageID
        !           338:  * @topic = topic
        !           339:  * @retain = retain 0 get only dynamic, >0 get only retained and -1 no matter
        !           340:  * return: NULL error or not found and !=NULL allocated subscribe topics
        !           341:  */
        !           342: mqtt_subscr_t *
        !           343: mqtt_rtlm_read_topic(sl_config *cfg, sqlite3 *sql, u_short msgid, const char *topic, char retain)
        !           344: {
        !           345:        int rowz = 0;
        !           346:        char *str, szStr[STRSIZ], szStmt[BUFSIZ] = { 0 };
        !           347:        sqlite3_stmt *stmt;
        !           348:        register int j;
        !           349:        mqtt_subscr_t *s = NULL;
        !           350: 
        !           351:        if (!cfg || !sql || !topic)
        !           352:                return NULL;
        !           353: 
        !           354:        switch (retain) {
        !           355:                case -1:
        !           356:                        memset(szStr, 0, sizeof szStr);
        !           357:                        break;
        !           358:                case 0:
        !           359:                        snprintf(szStr, sizeof szStr, "AND Retain = 0");
        !           360:                        break;
        !           361:                default:
        !           362:                        snprintf(szStr, sizeof szStr, "AND Retain > 0");
        !           363:                        break;
        !           364:        }
        !           365: 
        !           366:        str = (char*) cfg_GetAttribute(cfg, CFG("mqtt_pub"), CFG("tbl_topics"));
        !           367:        if (!str) {
        !           368:                mqtt_rtlm_log("Error:: not found topics table name");
        !           369:                return NULL;
        !           370:        }
        !           371:        snprintf(szStmt, sizeof szStmt, "SELECT Retain, Topic, Value FROM %s WHERE "
        !           372:                        "MsgID = %d AND Topic LIKE '%s' %s;", 
        !           373:                        str, msgid, topic, szStr);
        !           374: 
        !           375:        if (sqlite3_prepare_v2(sql, szStmt, strlen(szStmt), &stmt, NULL)) {
        !           376:                MQTT_RTLM_LOG(sql);
        !           377:                return NULL;
        !           378:        }
        !           379: 
        !           380:        /* calculate count of rows and allocate subscribe items */
        !           381:        while (sqlite3_step(stmt) == SQLITE_ROW)
        !           382:                rowz++;
        !           383:        if (!(s = malloc((rowz + 1) * sizeof(mqtt_subscr_t)))) {
        !           384:                mqtt_rtlm_log("Error:: System #%d - %s", errno, strerror(errno));
        !           385:                goto end;
        !           386:        } else
        !           387:                memset(s, 0, (rowz + 1) * sizeof(mqtt_subscr_t));
        !           388:        sqlite3_reset(stmt);
        !           389: 
        !           390:        /* fill with data */
        !           391:        for (j = 0; j < rowz && sqlite3_step(stmt) == SQLITE_ROW; j++) {
        !           392:                s[j].sub_ret = (char) sqlite3_column_int(stmt, 0);
        !           393:                s[j].sub_topic._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 1));
        !           394:                s[j].sub_topic._size = strlen((char*) s[j].sub_topic._base);
        !           395:                s[j].sub_value._base = (u_char*) strdup((char*) sqlite3_column_text(stmt, 2));
        !           396:                s[j].sub_value._size = strlen((char*) s[j].sub_value._base);
        !           397:        }
        !           398: end:
        !           399:        sqlite3_finalize(stmt);
        !           400: 
        !           401:        return s;
        !           402: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>