--- libaitmqtt/src/aitmqtt.c 2012/04/07 20:48:39 1.1.1.1.2.3 +++ libaitmqtt/src/aitmqtt.c 2012/04/25 16:25:52 1.1.1.1.2.6 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: aitmqtt.c,v 1.1.1.1.2.3 2012/04/07 20:48:39 misho Exp $ +* $Id: aitmqtt.c,v 1.1.1.1.2.6 2012/04/25 16:25:52 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -53,10 +53,6 @@ char mqtt_Error[STRSIZ]; #pragma GCC visibility pop -// -// Error maintenance functions ... -// - // mqtt_GetErrno() Get error code of last operation inline int mqtt_GetErrno() @@ -85,7 +81,7 @@ mqtt_SetErr(int eno, char *estr, ...) } #pragma GCC visibility push(hidden) -// _mqtt_readHEADER() read fixed header from MQTT message +/* _mqtt_readHEADER() read fixed header from MQTT message */ inline struct mqtthdr * _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char cmd, int *bytes, int *len) { @@ -106,7 +102,6 @@ _mqtt_readHEADER(mqtt_msg_t * __restrict buf, u_char c } #pragma GCC visibility pop -// ---------------------------------------------------------- /* * mqtt_msgFree() Free MQTT message @@ -179,7 +174,7 @@ mqtt_msgRealloc(mqtt_msg_t * __restrict msg, u_short l if (!msg) return -1; - if (len == msg->msg_len) + if (len <= msg->msg_len) return len; p = realloc(msg->msg_base, len); @@ -543,5 +538,66 @@ mqtt_sqlTopic(const char *csInput, char * __restrict p *s = *pos; } + return ret; +} + + +/* + * mqtt_KeepAlive() - Keep Alive check routine + * + * @sock = connected socket + * @ka = keep alive timeout + * @tries = tries for receive correct ping response, usually ==1 + * return: -1 error, 0 host is alive, 1 timeout session or 2 broken session + */ +int +mqtt_KeepAlive(int sock, u_short ka, u_char tries) +{ + int ret = 0; + struct pollfd pfd; + mqtt_msg_t msg = { NULL, 0 }; + + if (sock < 3) + return -1; /* error */ + + pfd.fd = sock; + pfd.events = POLLOUT; + if ((ret = poll(&pfd, 1, ka * 1000)) == -1 || + pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { + LOGERR; + return -1; /* error */ + } else if (!ret) + return 1; /* session is abandoned ... must be disconnect! */ + /* ping request */ + if ((ret = mqtt_msgPINGREQ(&msg)) == -1) + return -1; /* error */ + if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) { + LOGERR; + goto end; + } + + pfd.events = POLLIN | POLLPRI; + while (tries--) { + if ((ret = poll(&pfd, 1, ka * 1000)) == -1 || + pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { + LOGERR; + break; + } else if (!ret) { + ret = 1; /* session is abandoned ... must be disconnect! */ + continue; + } + /* receive & decode packet */ + if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) { + LOGERR; + break; + } + if (!mqtt_readPINGRESP(&msg)) { + ret = 0; /* Host is alive */ + break; + } else + ret = 2; /* Session is broken ... must be disconnect! */ + } +end: + free(msg.msg_base); return ret; }