--- libaitmqtt/src/cmds.c 2012/05/05 13:10:24 1.1.2.2 +++ libaitmqtt/src/cmds.c 2014/04/27 16:29:40 1.4 @@ -1,8 +1,10 @@ #include "global.h" -static inline int -_wait4data(int sock, u_short ka, short events) +#pragma GCC visibility push(hidden) + +int +mqtt_wait4data(int sock, u_short ka, short events) { int ret = 0; struct pollfd pfd; @@ -22,7 +24,9 @@ _wait4data(int sock, u_short ka, short events) return 0; /* ready */ } +#pragma GCC visibility pop + /* * mqtt_KeepAlive() - Keep Alive check routine * @@ -40,7 +44,7 @@ mqtt_KeepAlive(int sock, u_short ka, u_char tries) if (sock < 3) return -1; /* error */ - if ((ret = _wait4data(sock, ka, POLLOUT))) + if ((ret = mqtt_wait4data(sock, ka, POLLOUT))) return ret; /* ping request */ if ((ret = mqtt_msgPINGREQ(&msg)) == -1) @@ -51,7 +55,7 @@ mqtt_KeepAlive(int sock, u_short ka, u_char tries) } while (tries--) { - if ((ret = _wait4data(sock, ka, POLLIN | POLLPRI))) { + if ((ret = mqtt_wait4data(sock, ka, POLLIN | POLLPRI))) { if (ret == -1) break; else @@ -69,6 +73,57 @@ mqtt_KeepAlive(int sock, u_short ka, u_char tries) ret = 2; /* Session is broken ... must be disconnect! */ } end: + free(msg.msg_base); + return ret; +} + +/* + * mqtt_WillMessage() - Publish WILL message + * + * @sock = connected socket + * @ka = keep alive timeout + * @topic = will topic + * @data = will message + * return: -1 error, 1 timeout, 2 not ack or 0 ok + */ +int +mqtt_WillMessage(int sock, u_short ka, const char *topic, const char *data) +{ + int ret = 0; + mqtt_msg_t msg = { NULL, 0 }; + + if (!topic) + return -1; /* error */ + + /* will message */ + if ((ret = mqtt_wait4data(sock, ka, POLLOUT))) + return ret; + ret = mqtt_msgPUBLISH(&msg, topic, 0xDEAD, 0, 1, 0, data, data ? strlen(data) : 0); + if (ret == -1) + return -1; /* error */ + if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) { + LOGERR; + free(msg.msg_base); + return -1; /* error */ + } else + memset(msg.msg_base, 0, msg.msg_len); + + /* will ack */ + if ((ret = mqtt_wait4data(sock, ka, POLLIN | POLLPRI))) { + free(msg.msg_base); + return ret; + } + /* receive & decode packet */ + if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) { + LOGERR; + free(msg.msg_base); + return -1; /* error */ + } + if (mqtt_readPUBACK(&msg)) + ret = 0; /* ok */ + else + ret = 2; /* semi-error */ + free(msg.msg_base); return ret; }