#include "global.h" #pragma GCC visibility push(hidden) int mqtt_wait4data(int sock, u_short ka, short events) { int ret = 0; struct pollfd pfd; if (sock < 3) return -1; /* error */ pfd.fd = sock; pfd.events = POLLOUT; if ((ret = poll(&pfd, 1, ka * 1000)) == -1 || pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { LOGERR; return -1; /* error */ } else if (!ret) return 1; /* timeout */ return 0; /* ready */ } #pragma GCC visibility pop /* * mqtt_KeepAlive() - Keep Alive check routine * * @sock = connected socket * @ka = keep alive timeout * @tries = tries for receive correct ping response, usually ==1 * return: -1 error, 0 host is alive, 1 timeout session or 2 broken session */ int mqtt_KeepAlive(int sock, u_short ka, u_char tries) { int ret = 0; mqtt_msg_t msg = { NULL, 0 }; if (sock < 3) return -1; /* error */ if ((ret = mqtt_wait4data(sock, ka, POLLOUT))) return ret; /* ping request */ if ((ret = mqtt_msgPINGREQ(&msg)) == -1) return -1; /* error */ if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) { LOGERR; goto end; } while (tries--) { if ((ret = mqtt_wait4data(sock, ka, POLLIN | POLLPRI))) { if (ret == -1) break; else continue; } /* receive & decode packet */ if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) { LOGERR; break; } if (!mqtt_readPINGRESP(&msg)) { ret = 0; /* Host is alive */ break; } else ret = 2; /* Session is broken ... must be disconnect! */ } end: free(msg.msg_base); return ret; } /* * mqtt_WillMessage() - Publish WILL message * * @sock = connected socket * @ka = keep alive timeout * @topic = will topic * @data = will message * return: -1 error, 1 timeout, 2 not ack or 0 ok */ int mqtt_WillMessage(int sock, u_short ka, const char *topic, const char *data) { int ret = 0; mqtt_msg_t msg = { NULL, 0 }; if (!topic) return -1; /* error */ /* will message */ if ((ret = mqtt_wait4data(sock, ka, POLLOUT))) return ret; ret = mqtt_msgPUBLISH(&msg, topic, 0xDEAD, 0, 1, 0, data, data ? strlen(data) : 0); if (ret == -1) return -1; /* error */ if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) { LOGERR; free(msg.msg_base); return -1; /* error */ } else memset(msg.msg_base, 0, msg.msg_len); /* will ack */ if ((ret = mqtt_wait4data(sock, ka, POLLIN | POLLPRI))) { free(msg.msg_base); return ret; } /* receive & decode packet */ if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) { LOGERR; free(msg.msg_base); return -1; /* error */ } if (mqtt_readPUBACK(&msg)) ret = 0; /* ok */ else ret = 2; /* semi-error */ free(msg.msg_base); return ret; }