--- mqtt/src/mqttd_calls.c 2012/05/27 10:04:05 1.2.2.18 +++ mqtt/src/mqttd_calls.c 2012/06/01 10:39:48 1.2.2.19 @@ -4,6 +4,67 @@ #include "mqttd_calls.h" +static inline struct tagPkt * +mkPkt(void * __restrict data, int dlen) +{ + struct tagPkt *p = NULL; + + p = io_malloc(sizeof(struct tagPkt)); + if (!p) { + ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError()); + return NULL; + } else + memset(p, 0, sizeof(struct tagPkt)); + + p->pkt_data = io_allocVar(); + if (!p->pkt_data) { + ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError()); + io_free(p); + return NULL; + } + + if (data && dlen > 0) + AIT_SET_BUF(p->pkt_data, data, dlen); + + return p; +} + +static inline void +freePkt(struct tagPkt ** __restrict p) +{ + if (!p || !*p) + return; + + io_freeVar(&(*p)->pkt_data); + io_free(*p); + *p = NULL; +} + +static void * +sendPacket(sched_task_t *task) +{ + struct tagPkt *p = TASK_ARG(task); + register int n, slen; + u_char *pos; + + if (!p || !p->pkt_data || AIT_ISEMPTY(p->pkt_data)) { + ioDEBUG(9, "Error:: invalid packet or found empty content ..."); + return NULL; + } + + for (slen = AIT_LEN(p->pkt_data), pos = AIT_GET_BUF(p->pkt_data); slen > 0; + slen -= n, pos += n) { + n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL); + if (n == -1) { + ioSYSERR(0); + break; + } + } + + return NULL; +} + + static int pubOnce(struct tagSession *sess, u_short mid, char * __restrict psTopic, int topicLen, char * __restrict data, int datlen) @@ -275,6 +336,7 @@ cmdPINGREQ(void *srv, int len, void *arg) { struct tagSession *sess = (struct tagSession*) arg; int siz = 0; + struct tagPkt *p = NULL; ioTRACE(2); @@ -286,15 +348,12 @@ cmdPINGREQ(void *srv, int len, void *arg) if (siz == -1) { ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); return 0; - } - if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) { - ioSYSERR(0); - return 0; } else { - ioDEBUG(5, "Sended %d bytes.", siz); + p = mkPkt(sess->sess_buf->msg_base, siz); memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); } + schedWrite(root, sendPacket, p, sess->sess_sock, NULL, 0); return 0; } @@ -302,6 +361,7 @@ int cmdCONNECT(void *srv, int len, void *arg) { struct tagStore *store; + struct tagPkt *p; struct tagSession *sess = (struct tagSession*) arg; ioTRACE(2); @@ -330,6 +390,13 @@ cmdCONNECT(void *srv, int len, void *arg) free(store->st_subscr.sub_value.msg_base); io_free(store); + } + + while ((p = SLIST_FIRST(&sess->sess_sndpkt))) { + SLIST_REMOVE_HEAD(&sess->sess_sndpkt, pkt_node); + + io_freeVar(&p->pkt_data); + io_free(p); } if (sess->sess_will.msg)