version 1.3.12.2, 2022/09/15 15:04:44
|
version 1.3.12.3, 2022/09/15 15:48:41
|
Line 157 mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_s
|
Line 157 mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_s
|
return NULL; |
return NULL; |
|
|
/* receive suback */ |
/* receive suback */ |
|
cli->buf = mqtt_msgAlloc(BUFSIZ); |
|
if (!cli->buf) |
|
return NULL; |
siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0); |
siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0); |
if (siz == -1) { |
if (siz == -1) { |
LOGERR; |
LOGERR; |
|
mqtt_msgFree(&cli->buf, 0); |
return NULL; |
return NULL; |
} |
} |
siz = mqtt_readSUBACK(cli->buf, &mid, &qoses); |
siz = mqtt_readSUBACK(cli->buf, &mid, &qoses); |
if (siz == -1) | if (siz == -1) { |
| mqtt_msgFree(&cli->buf, 0); |
return NULL; |
return NULL; |
|
} |
if (msgID != mid) { |
if (msgID != mid) { |
free(qoses); | if (qoses) |
| e_free(qoses); |
| mqtt_msgFree(&cli->buf, 0); |
mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, mid); |
mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, mid); |
return NULL; |
return NULL; |
} |
} |
|
|
|
mqtt_msgFree(&cli->buf, 0); |
return qoses; |
return qoses; |
} |
} |
|
|
#if 0 |
|
/* |
/* |
* mqtt_cli_Unsubscribe() - Unsubscribe from broker |
* mqtt_cli_Unsubscribe() - Unsubscribe from broker |
* |
* |
Line 186 mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_s
|
Line 194 mqtt_cli_Subscribe(mqtt_cli_t * __restrict cli, mqtt_s
|
* return: -1 error or 0 ok |
* return: -1 error or 0 ok |
*/ |
*/ |
int |
int |
mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t * __restrict Topics, | mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt_subscr_t ** __restrict Topics, |
u_short msgID, u_char Dup, u_char QoS) |
u_short msgID, u_char Dup, u_char QoS) |
{ |
{ |
int siz = 0; |
int siz = 0; |
Line 195 mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt
|
Line 203 mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt
|
return -1; |
return -1; |
|
|
/* send unsubscribe */ |
/* send unsubscribe */ |
siz = mqtt_msgUNSUBSCRIBE(cli->buf, Topics, msgID, Dup, QoS); | cli->buf = mqtt_msgUNSUBSCRIBE(Topics, msgID, Dup, QoS); |
if (siz == -1) | if (!cli->buf) |
return -1; |
return -1; |
siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL); | siz = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL); |
if (siz == -1) { |
if (siz == -1) { |
LOGERR; |
LOGERR; |
|
mqtt_msgFree(&cli->buf, 0); |
return -1; |
return -1; |
} else |
} else |
memset(cli->buf->msg_base, 0, cli->buf->msg_len); | mqtt_msgFree(&cli->buf, 0); |
|
|
if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) { |
if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) { |
return -1; |
return -1; |
Line 211 mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt
|
Line 220 mqtt_cli_Unsubscribe(mqtt_cli_t * __restrict cli, mqtt
|
return -1; |
return -1; |
|
|
/* receive unsuback */ |
/* receive unsuback */ |
|
cli->buf = mqtt_msgAlloc(BUFSIZ); |
|
if (!cli->buf) |
|
return -1; |
siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0); |
siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0); |
if (siz == -1) { |
if (siz == -1) { |
LOGERR; |
LOGERR; |
|
mqtt_msgFree(&cli->buf, 0); |
return -1; |
return -1; |
} |
} |
siz = mqtt_readUNSUBACK(cli->buf); |
siz = mqtt_readUNSUBACK(cli->buf); |
if (siz == -1) | if (siz == -1) { |
| mqtt_msgFree(&cli->buf, 0); |
return -1; |
return -1; |
|
} |
if (msgID != siz) { |
if (msgID != siz) { |
|
mqtt_msgFree(&cli->buf, 0); |
mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz); |
mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz); |
return -1; |
return -1; |
} |
} |
|
|
|
mqtt_msgFree(&cli->buf, 0); |
return 0; |
return 0; |
} |
} |
|
|
Line 250 mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short
|
Line 267 mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short
|
return -1; |
return -1; |
|
|
/* send publish */ |
/* send publish */ |
siz = mqtt_msgPUBLISH(cli->buf, csTopic, msgID, Dup, QoS, Retain, pData, datLen); | cli->buf = mqtt_msgPUBLISH(csTopic, msgID, Dup, QoS, Retain, pData, datLen); |
if (siz == -1) | if (!cli->buf) |
return -1; |
return -1; |
siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL); | wlen = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL); |
if (siz == -1) { | if (wlen == -1) { |
LOGERR; |
LOGERR; |
|
mqtt_msgFree(&cli->buf, 0); |
return -1; |
return -1; |
} else { | } else |
wlen = siz; | mqtt_msgFree(&cli->buf, 0); |
memset(cli->buf->msg_base, 0, cli->buf->msg_len); | |
} | |
|
|
if (QoS == MQTT_QOS_ONCE) /* no reply */ |
if (QoS == MQTT_QOS_ONCE) /* no reply */ |
goto end; |
goto end; |
Line 271 mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short
|
Line 287 mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short
|
return -1; |
return -1; |
|
|
/* receive PUBxxx */ |
/* receive PUBxxx */ |
|
cli->buf = mqtt_msgAlloc(BUFSIZ); |
|
if (!cli->buf) |
|
return -1; |
siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0); |
siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0); |
if (siz == -1) { |
if (siz == -1) { |
LOGERR; |
LOGERR; |
|
mqtt_msgFree(&cli->buf, 0); |
return -1; |
return -1; |
} |
} |
|
|
if (QoS == MQTT_QOS_ACK) { /* reply with PUBACK */ |
if (QoS == MQTT_QOS_ACK) { /* reply with PUBACK */ |
siz = mqtt_readPUBACK(cli->buf); |
siz = mqtt_readPUBACK(cli->buf); |
if (siz == -1) | if (siz == -1) { |
| mqtt_msgFree(&cli->buf, 0); |
return -1; |
return -1; |
|
} |
if (msgID != siz) { |
if (msgID != siz) { |
|
mqtt_msgFree(&cli->buf, 0); |
mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz); |
mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz); |
return -1; |
return -1; |
} |
} |
|
mqtt_msgFree(&cli->buf, 0); |
goto end; |
goto end; |
} else { /* reply with PUBREC */ |
} else { /* reply with PUBREC */ |
siz = mqtt_readPUBREC(cli->buf); |
siz = mqtt_readPUBREC(cli->buf); |
if (siz == -1) | if (siz == -1) { |
| mqtt_msgFree(&cli->buf, 0); |
return -1; |
return -1; |
|
} |
if (msgID != siz) { |
if (msgID != siz) { |
|
mqtt_msgFree(&cli->buf, 0); |
mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz); |
mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz); |
return -1; |
return -1; |
} |
} |
|
mqtt_msgFree(&cli->buf, 0); |
} |
} |
|
|
do { |
do { |
/* send publish release QoS == 2 */ |
/* send publish release QoS == 2 */ |
siz = mqtt_msgPUBREL(cli->buf, msgID); | cli->buf = mqtt_msgPUBREL(msgID); |
if (siz == -1) | if (!cli->buf) |
return -1; |
return -1; |
siz = send(cli->sock, cli->buf->msg_base, siz, MSG_NOSIGNAL); | siz = send(cli->sock, cli->buf->msg_base, cli->buf->msg_len, MSG_NOSIGNAL); |
if (siz == -1) { |
if (siz == -1) { |
LOGERR; |
LOGERR; |
|
mqtt_msgFree(&cli->buf, 0); |
return -1; |
return -1; |
} else |
} else |
memset(cli->buf->msg_base, 0, cli->buf->msg_len); | mqtt_msgFree(&cli->buf, 0); |
|
|
if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) { |
if ((siz = mqtt_wait4data(cli->sock, cli->timeout, POLLIN | POLLPRI)) == -1) { |
return -1; |
return -1; |
Line 318 mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short
|
Line 347 mqtt_cli_Publish(mqtt_cli_t * __restrict cli, u_short
|
} |
} |
|
|
/* receive PUBCOMP */ |
/* receive PUBCOMP */ |
|
cli->buf = mqtt_msgAlloc(BUFSIZ); |
|
if (!cli->buf) |
|
return -1; |
siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0); |
siz = recv(cli->sock, cli->buf->msg_base, cli->buf->msg_len, 0); |
if (siz == -1) { |
if (siz == -1) { |
LOGERR; |
LOGERR; |
|
mqtt_msgFree(&cli->buf, 0); |
return -1; |
return -1; |
} |
} |
|
|
siz = mqtt_readPUBCOMP(cli->buf); |
siz = mqtt_readPUBCOMP(cli->buf); |
if (siz == -1) | if (siz == -1) { |
| mqtt_msgFree(&cli->buf, 0); |
return -1; |
return -1; |
|
} |
if (msgID != siz) { |
if (msgID != siz) { |
|
mqtt_msgFree(&cli->buf, 0); |
mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz); |
mqtt_SetErr(ECANCELED, "Receive different message ID %hu != %hu", msgID, siz); |
if (Dup++ > 1) |
if (Dup++ > 1) |
return -1; |
return -1; |
else |
else |
continue; |
continue; |
} |
} |
|
mqtt_msgFree(&cli->buf, 0); |
} while (0); |
} while (0); |
|
|
end: |
end: |
return wlen; |
return wlen; |
} |
} |
#endif |
|