--- libaitmqtt/src/cmds.c 2012/05/03 08:21:28 1.1.2.1 +++ libaitmqtt/src/cmds.c 2012/05/05 13:10:24 1.1.2.2 @@ -1,3 +1,74 @@ #include "global.h" +static inline int +_wait4data(int sock, u_short ka, short events) +{ + int ret = 0; + struct pollfd pfd; + + if (sock < 3) + return -1; /* error */ + + pfd.fd = sock; + pfd.events = POLLOUT; + if ((ret = poll(&pfd, 1, ka * 1000)) == -1 || + pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { + LOGERR; + return -1; /* error */ + } else if (!ret) + return 1; /* timeout */ + + return 0; /* ready */ +} + + +/* + * mqtt_KeepAlive() - Keep Alive check routine + * + * @sock = connected socket + * @ka = keep alive timeout + * @tries = tries for receive correct ping response, usually ==1 + * return: -1 error, 0 host is alive, 1 timeout session or 2 broken session + */ +int +mqtt_KeepAlive(int sock, u_short ka, u_char tries) +{ + int ret = 0; + mqtt_msg_t msg = { NULL, 0 }; + + if (sock < 3) + return -1; /* error */ + + if ((ret = _wait4data(sock, ka, POLLOUT))) + return ret; + /* ping request */ + if ((ret = mqtt_msgPINGREQ(&msg)) == -1) + return -1; /* error */ + if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) { + LOGERR; + goto end; + } + + while (tries--) { + if ((ret = _wait4data(sock, ka, POLLIN | POLLPRI))) { + if (ret == -1) + break; + else + continue; + } + /* receive & decode packet */ + if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) { + LOGERR; + break; + } + if (!mqtt_readPINGRESP(&msg)) { + ret = 0; /* Host is alive */ + break; + } else + ret = 2; /* Session is broken ... must be disconnect! */ + } +end: + free(msg.msg_base); + return ret; +}