File:  [ELWIX - Embedded LightWeight unIX -] / libaitmqtt / src / cmds.c
Revision 1.4: download - view: text, annotated - select for diffs - revision graph
Sun Apr 27 16:29:40 2014 UTC (10 years ago) by misho
Branches: MAIN
CVS tags: mqtt1_8, MQTT1_7, HEAD
version 1.7

#include "global.h"


#pragma GCC visibility push(hidden)

int
mqtt_wait4data(int sock, u_short ka, short events)
{
	int ret = 0;
	struct pollfd pfd;

	if (sock < 3)
		return -1;	/* error */

	pfd.fd = sock;
	pfd.events = POLLOUT;
	if ((ret = poll(&pfd, 1, ka * 1000)) == -1 || 
			pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
		LOGERR;
		return -1;	/* error */
	} else if (!ret)
		return 1;	/* timeout */

	return 0;		/* ready */
}

#pragma GCC visibility pop


/*
 * mqtt_KeepAlive() - Keep Alive check routine
 *
 * @sock = connected socket
 * @ka = keep alive timeout
 * @tries = tries for receive correct ping response, usually ==1
 * return: -1 error, 0 host is alive, 1 timeout session or 2 broken session
 */
int
mqtt_KeepAlive(int sock, u_short ka, u_char tries)
{
	int ret = 0;
	mqtt_msg_t msg = { NULL, 0 };

	if (sock < 3)
		return -1;	/* error */

	if ((ret = mqtt_wait4data(sock, ka, POLLOUT)))
		return ret;
	/* ping request */
	if ((ret = mqtt_msgPINGREQ(&msg)) == -1)
		return -1;	/* error */
	if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) {
		LOGERR;
		goto end;
	}

	while (tries--) {
		if ((ret = mqtt_wait4data(sock, ka, POLLIN | POLLPRI))) {
			if (ret == -1)
				break;
			else
				continue;
		}
		/* receive & decode packet */
		if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) {
			LOGERR;
			break;
		}
		if (!mqtt_readPINGRESP(&msg)) {
			ret = 0;	/* Host is alive */
			break;
		} else
			ret = 2;	/* Session is broken ... must be disconnect! */
	}
end:
	free(msg.msg_base);
	return ret;
}

/*
 * mqtt_WillMessage() - Publish WILL message
 *
 * @sock = connected socket
 * @ka = keep alive timeout
 * @topic = will topic
 * @data = will message
 * return: -1 error, 1 timeout, 2 not ack or 0 ok
 */
int
mqtt_WillMessage(int sock, u_short ka, const char *topic, const char *data)
{
	int ret = 0;
	mqtt_msg_t msg = { NULL, 0 };

	if (!topic)
		return -1;	/* error */

	/* will message */
	if ((ret = mqtt_wait4data(sock, ka, POLLOUT)))
		return ret;
	ret = mqtt_msgPUBLISH(&msg, topic, 0xDEAD, 0, 1, 0, data, data ? strlen(data) : 0);
	if (ret == -1)
		return -1;	/* error */
	if ((ret = send(sock, msg.msg_base, ret, MSG_NOSIGNAL)) == -1) {
		LOGERR;
		free(msg.msg_base);
		return -1;	/* error */
	} else
		memset(msg.msg_base, 0, msg.msg_len);

	/* will ack */
	if ((ret = mqtt_wait4data(sock, ka, POLLIN | POLLPRI))) {
		free(msg.msg_base);
		return ret;
	}
	/* receive & decode packet */
	if ((ret = recv(sock, msg.msg_base, msg.msg_len, 0)) == -1) {
		LOGERR;
		free(msg.msg_base);
		return -1;	/* error */
	}
	if (mqtt_readPUBACK(&msg))
		ret = 0;	/* ok */
	else
		ret = 2;	/* semi-error */

	free(msg.msg_base);
	return ret;
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>