File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / Attic / sub.c
Revision 1.1.2.11: download - view: text, annotated - select for diffs - revision graph
Tue Dec 6 12:36:45 2011 UTC (12 years, 7 months ago) by misho
Branches: mqtt1_0
finish read apis :)

    1: #include "global.h"
    2: 
    3: 
    4: /* ------------------------------------------------------------------- */
    5: 
    6: /*
    7:  * mqtt_msgSUBSCRIBE() Create SUBSCRIBE message
    8:  *
    9:  * @buf = Message buffer
   10:  * @Topics = MQTT subscription topics
   11:  * @msgID = MessageID
   12:  * @Dup = Duplicate message
   13:  * @QOS = QoS
   14:  * return: -1 error or >-1 message size for send
   15:  */
   16: int
   17: mqtt_msgSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
   18: 		u_short msgID, u_char Dup, u_char QOS)
   19: {
   20: 	int siz = 0;
   21: 	struct mqtthdr *hdr;
   22: 	mqtthdr_var_t *topic;
   23: 	mqtt_v_t *mid;
   24: 	mqtt_subscr_t *t;
   25: 	u_char *qos;
   26: 
   27: 	if (!buf || !Topics)
   28: 		return -1;
   29: 	if (QOS > MQTT_QOS_EXACTLY) {
   30: 		mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
   31: 		return -1;
   32: 	}
   33: 	if (!msgID && QOS != MQTT_QOS_ONCE) {
   34: 		mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
   35: 		return -1;
   36: 	}
   37: 
   38: 	if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
   39: 		return -1;
   40: 	else {
   41: 		hdr = (struct mqtthdr *) (buf->msg_base + siz);
   42: 		siz += sizeof(struct mqtthdr);
   43: 	}
   44: 
   45: 	/* variable header */
   46: 	mid = (mqtt_v_t*) (buf->msg_base + siz);
   47: 	mid->val = htons(msgID);
   48: 	siz += sizeof(mqtt_v_t);
   49: 
   50: 	/* payload with subscriptions */
   51: 	for (t = Topics; t && t->sub_topic._base; t++) {
   52: 		topic = (mqtthdr_var_t*) (buf->msg_base + siz);
   53: 		topic->var_sb.val = htons(t->sub_topic._size);
   54: 		memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
   55: 		siz += MQTTHDR_VAR_SIZEOF(topic);
   56: 		qos = (buf->msg_base + siz);
   57: 		*qos = t->sub_ret;
   58: 		siz++;
   59: 	}
   60: 
   61: 	/* fixed header */
   62: 	MQTTHDR_MSGINIT(hdr);
   63: 	hdr->mqtt_msg.type = MQTT_TYPE_SUBSCRIBE;
   64: 	hdr->mqtt_msg.qos = QOS;
   65: 	hdr->mqtt_msg.dup = Dup ? 1 : 0;
   66: 	hdr->mqtt_msg.retain = 0;
   67: 	*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
   68: 
   69: 	mqtt_msgRealloc(buf, siz);
   70: 	return siz;
   71: }
   72: 
   73: /*
   74:  * mqtt_msgSUBACK() Create SUBACK message
   75:  *
   76:  * @buf = Message buffer
   77:  * @Topics = MQTT subscription topics
   78:  * @msgID = MessageID
   79:  * return: -1 error or >-1 message size for send
   80:  */
   81: int
   82: mqtt_msgSUBACK(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, u_short msgID)
   83: {
   84: 	int siz = 0;
   85: 	struct mqtthdr *hdr;
   86: 	mqtt_v_t *v;
   87: 	mqtt_subscr_t *t;
   88: 	u_char *qos;
   89: 
   90: 	if (!buf || !Topics)
   91: 		return -1;
   92: 
   93: 	if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
   94: 		return -1;
   95: 	else {
   96: 		hdr = (struct mqtthdr *) (buf->msg_base + siz);
   97: 		siz += sizeof(struct mqtthdr);
   98: 		v = (mqtt_v_t*) (buf->msg_base + siz);
   99: 		siz += sizeof(mqtt_v_t);
  100: 	}
  101: 
  102: 	/* MessageID */
  103: 	v->val = htons(msgID);
  104: 
  105: 	/* QoS payload from subscriptions */
  106: 	for (t = Topics; t && t->sub_topic._base; t++) {
  107: 		qos = (buf->msg_base + siz);
  108: 		*qos = t->sub_ret;
  109: 		siz++;
  110: 	}
  111: 
  112: 	/* fixed header */
  113: 	MQTTHDR_MSGINIT(hdr);
  114: 	hdr->mqtt_msg.type = MQTT_TYPE_SUBACK;
  115: 	*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
  116: 
  117: 	mqtt_msgRealloc(buf, siz);
  118: 	return siz;
  119: }
  120: 
  121: /*
  122:  * mqtt_msgUNSUBSCRIBE() Create UNSUBSCRIBE message
  123:  *
  124:  * @buf = Message buffer
  125:  * @Topics = MQTT subscription topics
  126:  * @msgID = MessageID
  127:  * @Dup = Duplicate message
  128:  * @QOS = QoS
  129:  * return: -1 error or >-1 message size for send
  130:  */
  131: int
  132: mqtt_msgUNSUBSCRIBE(mqtt_msg_t * __restrict buf, mqtt_subscr_t * __restrict Topics, 
  133: 		u_short msgID, u_char Dup, u_char QOS)
  134: {
  135: 	int siz = 0;
  136: 	struct mqtthdr *hdr;
  137: 	mqtthdr_var_t *topic;
  138: 	mqtt_v_t *mid;
  139: 	mqtt_subscr_t *t;
  140: 
  141: 	if (!buf || !Topics)
  142: 		return -1;
  143: 	if (QOS > MQTT_QOS_EXACTLY) {
  144: 		mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
  145: 		return -1;
  146: 	}
  147: 	if (!msgID && QOS != MQTT_QOS_ONCE) {
  148: 		mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
  149: 		return -1;
  150: 	}
  151: 
  152: 	if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
  153: 		return -1;
  154: 	else {
  155: 		hdr = (struct mqtthdr *) (buf->msg_base + siz);
  156: 		siz += sizeof(struct mqtthdr);
  157: 	}
  158: 
  159: 	/* variable header */
  160: 	mid = (mqtt_v_t*) (buf->msg_base + siz);
  161: 	mid->val = htons(msgID);
  162: 	siz += sizeof(mqtt_v_t);
  163: 
  164: 	/* payload with subscriptions */
  165: 	for (t = Topics; t && t->sub_topic._base; t++) {
  166: 		topic = (mqtthdr_var_t*) (buf->msg_base + siz);
  167: 		topic->var_sb.val = htons(t->sub_topic._size);
  168: 		memcpy(topic->var_data, t->sub_topic._base, ntohs(topic->var_sb.val));
  169: 		siz += MQTTHDR_VAR_SIZEOF(topic);
  170: 	}
  171: 
  172: 	/* fixed header */
  173: 	MQTTHDR_MSGINIT(hdr);
  174: 	hdr->mqtt_msg.type = MQTT_TYPE_UNSUBSCRIBE;
  175: 	hdr->mqtt_msg.qos = QOS;
  176: 	hdr->mqtt_msg.dup = Dup ? 1 : 0;
  177: 	hdr->mqtt_msg.retain = 0;
  178: 	*hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
  179: 
  180: 	mqtt_msgRealloc(buf, siz);
  181: 	return siz;
  182: }
  183: 
  184: /*
  185:  * mqtt_msgUNSUBACK() Create UNSUBACK message
  186:  *
  187:  * @buf = Message buffer
  188:  * @msgID = MessageID
  189:  * return: -1 error or >-1 message size for send
  190:  */
  191: int
  192: mqtt_msgUNSUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
  193: {
  194: 	int siz = 0;
  195: 	struct mqtthdr *hdr;
  196: 	mqtt_v_t *v;
  197: 
  198: 	if (!buf)
  199: 		return -1;
  200: 
  201: 	if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
  202: 		return -1;
  203: 	else {
  204: 		hdr = (struct mqtthdr *) (buf->msg_base + siz);
  205: 		siz += sizeof(struct mqtthdr);
  206: 		v = (mqtt_v_t*) (buf->msg_base + siz);
  207: 		siz += sizeof(mqtt_v_t);
  208: 	}
  209: 
  210: 	/* fixed header */
  211: 	MQTTHDR_MSGINIT(hdr);
  212: 	hdr->mqtt_msg.type = MQTT_TYPE_UNSUBACK;
  213: 	*hdr->mqtt_len = sizeof(mqtt_v_t);
  214: 
  215: 	/* MessageID */
  216: 	v->val = htons(msgID);
  217: 
  218: 	return siz;
  219: }
  220: 
  221: 
  222: /* ============= decode ============ */
  223: 
  224: /*
  225:  * mqtt_readSUBSCRIBE() Read SUBSCRIBE message
  226:  *
  227:  * @buf = Message buffer
  228:  * @msgID = MessageID
  229:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
  230:  * return: NULL error or !=NULL MQTT fixed header
  231:  */
  232: struct mqtthdr *
  233: mqtt_readSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
  234: {
  235: 	register int i;
  236: 	int len, ret;
  237: 	struct mqtthdr *hdr;
  238: 	mqtthdr_var_t *var;
  239: 	mqtt_subscr_t *subs;
  240: 	mqtt_v_t *v;
  241: 	caddr_t pos;
  242: 
  243: 	if (!buf || !msgID || !subscr)
  244: 		return NULL;
  245: 
  246: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBSCRIBE, &ret, &len);
  247: 	if (!hdr)
  248: 		return NULL;
  249: 	pos = buf->msg_base + ret + 1;
  250: 	v = (mqtt_v_t*) pos;
  251: 
  252: 	/* MessageID */
  253: 	len -= sizeof(mqtt_v_t);
  254: 	if (len < 0) {
  255: 		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
  256: 		return NULL;
  257: 	} else {
  258: 		*msgID = ntohs(v->val);
  259: 		pos += sizeof(mqtt_v_t);
  260: 	}
  261: 
  262: 	subs = mqtt_subAlloc(0);
  263: 	if (!subs)
  264: 		return NULL;
  265: 	else
  266: 		*subscr = subs;
  267: 
  268: 	/* Subscribes */
  269: 	for (i = 0; len > 0; i++) {
  270: 		var = (mqtthdr_var_t*) pos;
  271: 		len -= MQTTHDR_VAR_SIZEOF(var) + 1;
  272: 		if (len < 0) {
  273: 			mqtt_subFree(subscr);
  274: 			mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
  275: 			return NULL;
  276: 		}
  277: 		subs = mqtt_subRealloc(subs, i + 1);
  278: 		if (!subs) {
  279: 			mqtt_subFree(subscr);
  280: 			return NULL;
  281: 		} else
  282: 			*subscr = subs;
  283: 
  284: 		memset(&subs[i], 0, sizeof subs[i]);
  285: 		subs[i].sub_topic._size = ntohs(var->var_sb.val);
  286: 		subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
  287: 		if (!subs[i].sub_topic._base) {
  288: 			LOGERR;
  289: 			mqtt_subFree(subscr);
  290: 			return NULL;
  291: 		} else
  292: 			memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
  293: 		pos += MQTTHDR_VAR_SIZEOF(var);
  294: 
  295: 		subs[i].sub_ret = *pos;
  296: 		pos++;
  297: 	}
  298: 
  299: 	return hdr;
  300: }
  301: 
  302: /*
  303:  * mqtt_readSUBACK() Read SUBACK message
  304:  *
  305:  * @buf = Message buffer
  306:  * @msgID = MessageID
  307:  * @subqos = Subscribes QoS, must be free after use with free()
  308:  * return: -1 error or >-1 readed subscribes QoS elements
  309:  */
  310: int
  311: mqtt_readSUBACK(mqtt_msg_t * __restrict buf, u_short *msgID, u_char **subqos)
  312: {
  313: 	int len, ret;
  314: 	struct mqtthdr *hdr;
  315: 	mqtt_v_t *v;
  316: 	caddr_t pos;
  317: 
  318: 	if (!buf || !msgID || !subqos)
  319: 		return -1;
  320: 
  321: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_SUBACK, &ret, &len);
  322: 	if (!hdr)
  323: 		return -1;
  324: 	pos = buf->msg_base + ret + 1;
  325: 	v = (mqtt_v_t*) pos;
  326: 
  327: 	/* MessageID */
  328: 	len -= sizeof(mqtt_v_t);
  329: 	if (len < 0) {
  330: 		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
  331: 		return -1;
  332: 	} else {
  333: 		*msgID = ntohs(v->val);
  334: 		pos += sizeof(mqtt_v_t);
  335: 	}
  336: 
  337: 	/* Subscribes */
  338: 	*subqos = malloc(len);
  339: 	if (!*subqos) {
  340: 		LOGERR;
  341: 		return -1;
  342: 	} else
  343: 		memcpy(*subqos, pos, len);
  344: 
  345: 	return len;
  346: }
  347: 
  348: /*
  349:  * mqtt_readUNSUBSCRIBE() Read UNSUBSCRIBE message
  350:  *
  351:  * @buf = Message buffer
  352:  * @msgID = MessageID
  353:  * @subscr = Subscriptions, must be free after use with mqtt_subFree()
  354:  * return: NULL error or !=NULL MQTT fixed header
  355:  */
  356: struct mqtthdr *
  357: mqtt_readUNSUBSCRIBE(mqtt_msg_t * __restrict buf, u_short *msgID, mqtt_subscr_t **subscr)
  358: {
  359: 	register int i;
  360: 	int len, ret;
  361: 	struct mqtthdr *hdr;
  362: 	mqtthdr_var_t *var;
  363: 	mqtt_subscr_t *subs;
  364: 	mqtt_v_t *v;
  365: 	caddr_t pos;
  366: 
  367: 	if (!buf || !msgID || !subscr)
  368: 		return NULL;
  369: 
  370: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBSCRIBE, &ret, &len);
  371: 	if (!hdr)
  372: 		return NULL;
  373: 	pos = buf->msg_base + ret + 1;
  374: 	v = (mqtt_v_t*) pos;
  375: 
  376: 	/* MessageID */
  377: 	len -= sizeof(mqtt_v_t);
  378: 	if (len < 0) {
  379: 		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
  380: 		return NULL;
  381: 	} else {
  382: 		*msgID = ntohs(v->val);
  383: 		pos += sizeof(mqtt_v_t);
  384: 	}
  385: 
  386: 	subs = mqtt_subAlloc(0);
  387: 	if (!subs)
  388: 		return NULL;
  389: 	else
  390: 		*subscr = subs;
  391: 
  392: 	/* Subscribes */
  393: 	for (i = 0; len > 0; i++) {
  394: 		var = (mqtthdr_var_t*) pos;
  395: 		len -= MQTTHDR_VAR_SIZEOF(var);
  396: 		if (len < 0) {
  397: 			mqtt_subFree(subscr);
  398: 			mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
  399: 			return NULL;
  400: 		}
  401: 		subs = mqtt_subRealloc(subs, i + 1);
  402: 		if (!subs) {
  403: 			mqtt_subFree(subscr);
  404: 			return NULL;
  405: 		} else
  406: 			*subscr = subs;
  407: 
  408: 		memset(&subs[i], 0, sizeof subs[i]);
  409: 		subs[i].sub_topic._size = ntohs(var->var_sb.val);
  410: 		subs[i].sub_topic._base = malloc(subs[i].sub_topic._size);
  411: 		if (!subs[i].sub_topic._base) {
  412: 			LOGERR;
  413: 			mqtt_subFree(subscr);
  414: 			return NULL;
  415: 		} else
  416: 			memcpy(subs[i].sub_topic._base, var->var_data, subs[i].sub_topic._size);
  417: 		pos += MQTTHDR_VAR_SIZEOF(var);
  418: 	}
  419: 
  420: 	return hdr;
  421: }
  422: 
  423: /*
  424:  * mqtt_readUNSUBACK() Read UNSUBACK message
  425:  *
  426:  * @buf = Message buffer
  427:  * return: -1 error or MessageID
  428:  */
  429: u_short
  430: mqtt_readUNSUBACK(mqtt_msg_t * __restrict buf)
  431: {
  432: 	int len, ret;
  433: 	struct mqtthdr *hdr;
  434: 	mqtt_v_t *v;
  435: 	caddr_t pos;
  436: 
  437: 	hdr = _mqtt_readHEADER(buf, MQTT_TYPE_UNSUBACK, &ret, &len);
  438: 	if (!hdr)
  439: 		return (u_short) -1;
  440: 	if (len < sizeof(mqtt_v_t)) {
  441: 		mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
  442: 		return (u_short) -1;
  443: 	} else {
  444: 		pos = buf->msg_base + ret + 1;
  445: 		v = (mqtt_v_t*) pos;
  446: 	}
  447: 
  448: 	return ntohs(v->val);
  449: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>