Annotation of mqtt/src/pub.c, revision 1.1.2.8

1.1.2.1   misho       1: #include "global.h"
                      2: 
                      3: 
                      4: /* ------------------------------------------------------------------- */
                      5: 
1.1.2.2   misho       6: /*
                      7:  * mqtt_msgPUBLISH() Create PUBLISH message
                      8:  *
                      9:  * @buf = Message buffer
                     10:  * @csTopic = Publish topic
                     11:  * @msgID = MessageID >0, if QOS != MQTT_QOS_ONCE
                     12:  * @Dup = Duplicate message
                     13:  * @QOS = QoS
1.1.2.4   misho      14:  * @Retain = Retain message
1.1.2.5   misho      15:  * @pData = Publish data into topic
                     16:  * @datlen = Publish data length
1.1.2.2   misho      17:  * return: -1 error or >-1 message size for send
                     18:  */
                     19: int
                     20: mqtt_msgPUBLISH(mqtt_msg_t * __restrict buf, const char *csTopic, u_short msgID, 
1.1.2.8 ! misho      21:                u_char Dup, u_char QOS, u_char Retain, const void *pData, int datlen)
1.1.2.2   misho      22: {
                     23:        int siz = 0;
                     24:        struct mqtthdr *hdr;
                     25:        mqtthdr_var_t *topic;
                     26:        mqtt_v_t *mid;
1.1.2.5   misho      27:        void *data;
1.1.2.2   misho      28: 
                     29:        if (!buf || !csTopic)
                     30:                return -1;
                     31:        if (QOS > MQTT_QOS_EXACTLY) {
                     32:                mqtt_SetErr(EINVAL, "Error:: invalid QoS parameter");
                     33:                return -1;
                     34:        }
                     35:        if (!msgID && QOS != MQTT_QOS_ONCE) {
                     36:                mqtt_SetErr(EINVAL, "Error:: invalid MessageID parameter must be >0");
                     37:                return -1;
                     38:        }
                     39: 
                     40:        if (mqtt_msgRealloc(buf, MQTTMSG_MAX) == -1)
                     41:                return -1;
                     42:        else {
                     43:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                     44:                siz += sizeof(struct mqtthdr);
                     45:        }
                     46: 
                     47:        /* variable header */
                     48:        topic = (mqtthdr_var_t*) (buf->msg_base + siz);
                     49:        topic->var_sb.val = htons(strlen(csTopic));
                     50:        memcpy(topic->var_data, csTopic, ntohs(topic->var_sb.val));
                     51:        siz += MQTTHDR_VAR_SIZEOF(topic);
                     52: 
                     53:        mid = (mqtt_v_t*) (buf->msg_base + siz);
                     54:        mid->val = htons(msgID);
                     55:        siz += sizeof(mqtt_v_t);
                     56: 
1.1.2.5   misho      57:        /* load with data */
                     58:        if (pData && datlen) {
                     59:                data = buf->msg_base + siz;
                     60:                memcpy(data, pData, datlen);
                     61:                siz += datlen;
                     62:        }
                     63: 
1.1.2.2   misho      64:        /* fixed header */
1.1.2.6   misho      65:        MQTTHDR_MSGINIT(hdr);
1.1.2.2   misho      66:        hdr->mqtt_msg.type = MQTT_TYPE_PUBLISH;
                     67:        hdr->mqtt_msg.qos = QOS;
                     68:        hdr->mqtt_msg.dup = Dup ? 1 : 0;
1.1.2.4   misho      69:        hdr->mqtt_msg.retain = Retain ? 1 : 0;
1.1.2.2   misho      70:        *hdr->mqtt_len = mqtt_encodeLen(siz - sizeof(struct mqtthdr));
                     71: 
                     72:        mqtt_msgRealloc(buf, siz);
                     73:        return siz;
                     74: }
                     75: 
                     76: static int
                     77: _mqtt_msgPUB_(mqtt_msg_t * __restrict buf, u_char cmd, u_short msgID)
                     78: {
                     79:        int siz = 0;
                     80:        struct mqtthdr *hdr;
                     81:        mqtt_v_t *v;
                     82: 
                     83:        if (!buf)
                     84:                return -1;
                     85: 
                     86:        if (mqtt_msgRealloc(buf, sizeof(struct mqtthdr) + sizeof(mqtt_v_t)) == -1)
                     87:                return -1;
                     88:        else {
                     89:                hdr = (struct mqtthdr *) (buf->msg_base + siz);
                     90:                siz += sizeof(struct mqtthdr);
                     91:                v = (mqtt_v_t*) (buf->msg_base + siz);
                     92:                siz += sizeof(mqtt_v_t);
                     93:        }
                     94: 
                     95:        /* fixed header */
1.1.2.6   misho      96:        MQTTHDR_MSGINIT(hdr);
1.1.2.2   misho      97:        hdr->mqtt_msg.type = cmd;
                     98:        *hdr->mqtt_len = sizeof(mqtt_v_t);
                     99: 
                    100:        /* MessageID */
                    101:        v->val = htons(msgID);
                    102: 
                    103:        return siz;
                    104: }
                    105: 
                    106: /*
                    107:  * mqtt_msgPUBACK() Create PUBACK message
                    108:  *
                    109:  * @buf = Message buffer
                    110:  * @msgID = MessageID
                    111:  * return: -1 error or >-1 message size for send
                    112:  */
                    113: inline int
                    114: mqtt_msgPUBACK(mqtt_msg_t * __restrict buf, u_short msgID)
                    115: {
                    116:        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBACK, msgID);
                    117: }
                    118: 
                    119: /*
                    120:  * mqtt_msgPUBREC() Create PUBREC message
                    121:  *
                    122:  * @buf = Message buffer
                    123:  * @msgID = MessageID
                    124:  * return: -1 error or >-1 message size for send
                    125:  */
                    126: inline int
                    127: mqtt_msgPUBREC(mqtt_msg_t * __restrict buf, u_short msgID)
                    128: {
                    129:        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREC, msgID);
                    130: }
                    131: 
                    132: /*
                    133:  * mqtt_msgPUBREL() Create PUBREL message
                    134:  *
                    135:  * @buf = Message buffer
                    136:  * @msgID = MessageID
                    137:  * return: -1 error or >-1 message size for send
                    138:  */
                    139: inline int
                    140: mqtt_msgPUBREL(mqtt_msg_t * __restrict buf, u_short msgID)
                    141: {
                    142:        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBREL, msgID);
                    143: }
                    144: 
                    145: /*
                    146:  * mqtt_msgPUBCOMP() Create PUBCOMP message
                    147:  *
                    148:  * @buf = Message buffer
                    149:  * @msgID = MessageID
                    150:  * return: -1 error or >-1 message size for send
                    151:  */
                    152: inline int
                    153: mqtt_msgPUBCOMP(mqtt_msg_t * __restrict buf, u_short msgID)
                    154: {
                    155:        return _mqtt_msgPUB_(buf, MQTT_TYPE_PUBCOMP, msgID);
                    156: }
1.1.2.7   misho     157: 
                    158: 
                    159: /* ============= decode ============ */
                    160: 
                    161: /*
1.1.2.8 ! misho     162:  * mqtt_readPUBLISH() Read PUBLISH message
        !           163:  *
        !           164:  * @buf = Message buffer
        !           165:  * @psTopic = Topic
        !           166:  * @topicLen = Topic length
        !           167:  * @msgID = MessageID
        !           168:  * @pData = Data buffer
        !           169:  * @datLen = Data buffer length, if *datLen == 0 allocate memory for pData
        !           170:  * return: NULL error or !=NULL MQTT fixed header
        !           171:  */
        !           172: struct mqtthdr *
        !           173: mqtt_readPUBLISH(mqtt_msg_t * __restrict buf, char * __restrict psTopic, int topicLen, 
        !           174:                u_short *msgID, void * __restrict pData, int *datLen)
        !           175: {
        !           176:        int len, ret;
        !           177:        struct mqtthdr *hdr;
        !           178:        mqtthdr_var_t *var;
        !           179:        mqtt_v_t *v;
        !           180:        caddr_t pos;
        !           181: 
        !           182:        if (!buf || !psTopic || !msgID || !pData)
        !           183:                return NULL;
        !           184: 
        !           185:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBLISH, &ret, &len);
        !           186:        if (!hdr)
        !           187:                return NULL;
        !           188:        if (len < sizeof(mqtt_v_t)) {
        !           189:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           190:                return NULL;
        !           191:        } else {
        !           192:                pos = buf->msg_base + ret + 1;
        !           193:                var = (mqtthdr_var_t*) pos;
        !           194:        }
        !           195: 
        !           196:        /* topic */
        !           197:        len -= MQTTHDR_VAR_SIZEOF(var);
        !           198:        if (len < 0) {
        !           199:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           200:                return NULL;
        !           201:        } else {
        !           202:                memset(psTopic, 0, topicLen--);
        !           203:                memcpy(psTopic, var->var_data, ntohs(var->var_sb.val) > topicLen ? 
        !           204:                                topicLen : ntohs(var->var_sb.val));
        !           205:                pos += MQTTHDR_VAR_SIZEOF(var);
        !           206:                v = (mqtt_v_t*) pos;
        !           207:        }
        !           208: 
        !           209:        len -= sizeof(mqtt_v_t);
        !           210:        if (len < 0) {
        !           211:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           212:                return NULL;
        !           213:        } else {
        !           214:                *msgID = ntohs(v->val);
        !           215:                pos += sizeof(mqtt_v_t);
        !           216:        }
        !           217: 
        !           218:        /* data */
        !           219:        if (len < 0) {
        !           220:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
        !           221:                return NULL;
        !           222:        } else {
        !           223:                if (!*datLen) {
        !           224:                        if (!(pData = malloc(len))) {
        !           225:                                LOGERR;
        !           226:                                return NULL;
        !           227:                        } else
        !           228:                                *datLen = len;
        !           229:                }
        !           230: 
        !           231:                memset(pData, 0, *datLen);
        !           232:                if (len < *datLen)
        !           233:                        *datLen = len;
        !           234:                memcpy(pData, pos, *datLen);
        !           235:        }
        !           236: 
        !           237:        return hdr;
        !           238: }
        !           239: 
        !           240: /*
1.1.2.7   misho     241:  * mqtt_readPUBACK() Read PUBACK message
                    242:  *
                    243:  * @buf = Message buffer
                    244:  * return: -1 error or MessageID
                    245:  */
                    246: u_short
                    247: mqtt_readPUBACK(mqtt_msg_t * __restrict buf)
                    248: {
                    249:        int len, ret;
                    250:        struct mqtthdr *hdr;
                    251:        mqtt_v_t *v;
                    252:        caddr_t pos;
                    253: 
                    254:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBACK, &ret, &len);
                    255:        if (!hdr)
                    256:                return (u_short) -1;
                    257:        if (len < sizeof(mqtt_v_t)) {
                    258:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    259:                return (u_short) -1;
                    260:        } else {
                    261:                pos = buf->msg_base + ret + 1;
                    262:                v = (mqtt_v_t*) pos;
                    263:        }
                    264: 
                    265:        return ntohs(v->val);
                    266: }
                    267: 
                    268: /*
                    269:  * mqtt_readPUBREC() Read PUBREC message
                    270:  *
                    271:  * @buf = Message buffer
                    272:  * return: -1 error or MessageID
                    273:  */
                    274: u_short
                    275: mqtt_readPUBREC(mqtt_msg_t * __restrict buf)
                    276: {
                    277:        int len, ret;
                    278:        struct mqtthdr *hdr;
                    279:        mqtt_v_t *v;
                    280:        caddr_t pos;
                    281: 
                    282:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREC, &ret, &len);
                    283:        if (!hdr)
                    284:                return (u_short) -1;
                    285:        if (len < sizeof(mqtt_v_t)) {
                    286:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    287:                return (u_short) -1;
                    288:        } else {
                    289:                pos = buf->msg_base + ret + 1;
                    290:                v = (mqtt_v_t*) pos;
                    291:        }
                    292: 
                    293:        return ntohs(v->val);
                    294: }
                    295: 
                    296: /*
                    297:  * mqtt_readPUBREL() Read PUBREL message
                    298:  *
                    299:  * @buf = Message buffer
                    300:  * return: -1 error or MessageID
                    301:  */
                    302: u_short
                    303: mqtt_readPUBREL(mqtt_msg_t * __restrict buf)
                    304: {
                    305:        int len, ret;
                    306:        struct mqtthdr *hdr;
                    307:        mqtt_v_t *v;
                    308:        caddr_t pos;
                    309: 
                    310:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBREL, &ret, &len);
                    311:        if (!hdr)
                    312:                return (u_short) -1;
                    313:        if (len < sizeof(mqtt_v_t)) {
                    314:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    315:                return (u_short) -1;
                    316:        } else {
                    317:                pos = buf->msg_base + ret + 1;
                    318:                v = (mqtt_v_t*) pos;
                    319:        }
                    320: 
                    321:        return ntohs(v->val);
                    322: }
                    323: 
                    324: /*
                    325:  * mqtt_readPUBCOMP() Read PUBCOMP message
                    326:  *
                    327:  * @buf = Message buffer
                    328:  * return: -1 error or MessageID
                    329:  */
                    330: u_short
                    331: mqtt_readPUBCOMP(mqtt_msg_t * __restrict buf)
                    332: {
                    333:        int len, ret;
                    334:        struct mqtthdr *hdr;
                    335:        mqtt_v_t *v;
                    336:        caddr_t pos;
                    337: 
                    338:        hdr = _mqtt_readHEADER(buf, MQTT_TYPE_PUBCOMP, &ret, &len);
                    339:        if (!hdr)
                    340:                return (u_short) -1;
                    341:        if (len < sizeof(mqtt_v_t)) {
                    342:                mqtt_SetErr(EINVAL, "Error:: short message length %d", len);
                    343:                return (u_short) -1;
                    344:        } else {
                    345:                pos = buf->msg_base + ret + 1;
                    346:                v = (mqtt_v_t*) pos;
                    347:        }
                    348: 
                    349:        return ntohs(v->val);
                    350: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>