#include "global.h"
#pragma GCC visibility push(hidden)
int
mqtt_wait4data(int sock, u_short ka, short events)
{
int ret = 0;
struct pollfd pfd;
if (sock < 3)
return -1; /* error */
pfd.fd = sock;
pfd.events = POLLOUT;
if ((ret = poll(&pfd, 1, ka * 1000)) == -1 ||
pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
LOGERR;
return -1; /* error */
} else if (!ret)
return 1; /* timeout */
return 0; /* ready */
}
#pragma GCC visibility pop
/*
* mqtt_KeepAlive() - Keep Alive check routine
*
* @sock = connected socket
* @ka = keep alive timeout
* @tries = tries for receive correct ping response, usually ==1
* return: -1 error, 0 host is alive, 1 timeout session or 2 broken session
*/
int
mqtt_KeepAlive(int sock, u_short ka, u_char tries)
{
int ret = 0;
mqtt_msg_t msg = { NULL, 0 };
if (sock < 3)
return -1; /* error */
if ((ret = mqtt_wait4data(sock, ka, POLLOUT)))
return ret;
/* ping request */
if ((ret = mqtt_msgPINGREQ(&msg)) == -1)
return -1; /* error */
if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) {
LOGERR;
goto end;
}
while (tries--) {
if ((ret = mqtt_wait4data(sock, ka, POLLIN | POLLPRI))) {
if (ret == -1)
break;
else
continue;
}
/* receive & decode packet */
if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) {
LOGERR;
break;
}
if (!mqtt_readPINGRESP(&msg)) {
ret = 0; /* Host is alive */
break;
} else
ret = 2; /* Session is broken ... must be disconnect! */
}
end:
free(msg.msg_base);
return ret;
}
/*
* mqtt_WillMessage() - Publish WILL message
*
* @sock = connected socket
* @ka = keep alive timeout
* @topic = will topic
* @data = will message
* return: -1 error, 1 timeout, 2 not ack or 0 ok
*/
int
mqtt_WillMessage(int sock, u_short ka, const char *topic, const char *data)
{
int ret = 0;
mqtt_msg_t msg = { NULL, 0 };
if (!topic)
return -1; /* error */
/* will message */
if ((ret = mqtt_wait4data(sock, ka, POLLOUT)))
return ret;
ret = mqtt_msgPUBLISH(&msg, topic, 0xDEAD, 0, 1, 0, data, data ? strlen(data) : 0);
if (ret == -1)
return -1; /* error */
if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) {
LOGERR;
free(msg.msg_base);
return -1; /* error */
} else
memset(msg.msg_base, 0, msg.msg_len);
/* will ack */
if ((ret = mqtt_wait4data(sock, ka, POLLIN | POLLPRI))) {
free(msg.msg_base);
return ret;
}
/* receive & decode packet */
if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) {
LOGERR;
free(msg.msg_base);
return -1; /* error */
}
if (mqtt_readPUBACK(&msg))
ret = 0; /* ok */
else
ret = 2; /* semi-error */
free(msg.msg_base);
return ret;
}
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>