|
|
| version 1.2.2.4, 2012/04/24 13:54:50 | version 1.2.2.8, 2012/04/27 16:41:56 |
|---|---|
| Line 51 cmdPUBREL(void *srv, void *arg) | Line 51 cmdPUBREL(void *srv, void *arg) |
| int | int |
| cmdSUBSCRIBE(void *srv, void *arg) | cmdSUBSCRIBE(void *srv, void *arg) |
| { | { |
| struct mqtthdr *hdr; | |
| struct tagSession *sess = (struct tagSession*) arg; | struct tagSession *sess = (struct tagSession*) arg; |
| mqtt_subscr_t *subs = NULL; | |
| int siz = 0; | |
| u_short mid = 0; | |
| register int i; | |
| struct tagStore *store; | |
| ioTRACE(2); | ioTRACE(2); |
| if (!sess) | if (!sess) |
| return -1; | return -1; |
| hdr = (struct mqtthdr*) sess->sess_buf->msg_base; | ioDEBUG(5, "Exec SUBSCRIBE session"); |
| siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs); | |
| if (siz == -1) { | |
| ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | |
| return 0; | |
| } | |
| /* add to db */ | |
| for (i = 0; i < siz; i++) { | |
| if (call.WritePUB_subscribe(&cfg, pub, mid, subs[i].sub_topic.msg_base, | |
| sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) { | |
| store = malloc(sizeof(struct tagStore)); | |
| if (!store) { | |
| ioSYSERR(0); | |
| goto end; | |
| } else { | |
| store->st_msgid = mid; | |
| mqtt_subCopy(&store->st_subscr, &subs[i]); | |
| } | |
| /* add to cache */ | |
| SESS_ELEM_LOCK(sess); | |
| SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node); | |
| SESS_ELEM_UNLOCK(sess); | |
| subs[i].sub_ret = MQTT_QOS_PASS; | |
| } else | |
| subs[i].sub_ret = MQTT_QOS_DENY; | |
| } | |
| /* send acknowledge */ | |
| siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid); | |
| if (siz == -1) { | |
| ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | |
| goto end; | |
| } | |
| if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) | |
| ioSYSERR(0); | |
| else { | |
| ioDEBUG(5, "Sended %d bytes.", siz); | |
| memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); | |
| } | |
| end: | |
| mqtt_subFree(&subs); | |
| return 0; | return 0; |
| } | } |
| int | int |
| cmdUNSUBSCRIBE(void *srv, void *arg) | cmdUNSUBSCRIBE(void *srv, void *arg) |
| { | { |
| struct mqtthdr *hdr; | |
| struct tagSession *sess = (struct tagSession*) arg; | struct tagSession *sess = (struct tagSession*) arg; |
| ioTRACE(2); | ioTRACE(2); |
| Line 75 cmdUNSUBSCRIBE(void *srv, void *arg) | Line 120 cmdUNSUBSCRIBE(void *srv, void *arg) |
| if (!sess) | if (!sess) |
| return -1; | return -1; |
| hdr = (struct mqtthdr*) sess->sess_buf->msg_base; | |
| return 0; | return 0; |
| } | } |
| int | int |
| cmdPINGREQ(void *srv, void *arg) | cmdPINGREQ(void *srv, void *arg) |
| { | { |
| struct mqtthdr *hdr; | |
| struct tagSession *sess = (struct tagSession*) arg; | struct tagSession *sess = (struct tagSession*) arg; |
| int siz = 0; | int siz = 0; |
| Line 92 cmdPINGREQ(void *srv, void *arg) | Line 134 cmdPINGREQ(void *srv, void *arg) |
| if (!sess) | if (!sess) |
| return -1; | return -1; |
| hdr = (struct mqtthdr*) sess->sess_buf->msg_base; | ioDEBUG(5, "Exec PINGREQ session"); |
| siz = mqtt_msgPINGRESP(sess->sess_buf); | siz = mqtt_msgPINGRESP(sess->sess_buf); |
| if (siz == -1) { | if (siz == -1) { |
| ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); | ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError()); |
| Line 101 cmdPINGREQ(void *srv, void *arg) | Line 143 cmdPINGREQ(void *srv, void *arg) |
| if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) { | if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) { |
| ioSYSERR(0); | ioSYSERR(0); |
| return 0; | return 0; |
| } else | } else { |
| ioDEBUG(5, "Sended %d bytes.", siz); | ioDEBUG(5, "Sended %d bytes.", siz); |
| memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len); | |
| } | |
| return 0; | return 0; |
| } | } |
| Line 112 cmdCONNECT(void *srv, void *arg) | Line 156 cmdCONNECT(void *srv, void *arg) |
| { | { |
| struct tagStore *store; | struct tagStore *store; |
| struct tagSession *sess = (struct tagSession*) arg; | struct tagSession *sess = (struct tagSession*) arg; |
| register int i; | |
| ioTRACE(2); | ioTRACE(2); |
| if (!sess) | if (!sess) |
| return -1; | return -1; |
| #if 0 | ioDEBUG(5, "Exec CONNECT session"); |
| SESS_LOCK; | SESS_LOCK; |
| TAILQ_REMOVE(&Sessions, sess, sess_node); | TAILQ_REMOVE(&Sessions, sess, sess_node); |
| SESS_UNLOCK; | SESS_UNLOCK; |
| Line 128 cmdCONNECT(void *srv, void *arg) | Line 171 cmdCONNECT(void *srv, void *arg) |
| call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); | call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%"); |
| SESS_ELEM_LOCK(sess); | SESS_ELEM_LOCK(sess); |
| for (i = 0; i < MQTT_QOS_RESERVED; i++) | while ((store = SLIST_FIRST(&sess->sess_subscr))) { |
| while ((store = SLIST_FIRST(&sess->sess_txque[i]))) { | SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node); |
| SLIST_REMOVE_HEAD(&sess->sess_txque[i], st_node); | |
| if (store->st_subscr.sub_topic.msg_base) | if (store->st_subscr.sub_topic.msg_base) |
| free(store->st_subscr.sub_topic.msg_base); | free(store->st_subscr.sub_topic.msg_base); |
| if (store->st_subscr.sub_value.msg_base) | if (store->st_subscr.sub_value.msg_base) |
| free(store->st_subscr.sub_value.msg_base); | free(store->st_subscr.sub_value.msg_base); |
| free(store); | free(store); |
| } | } |
| SESS_ELEM_UNLOCK(sess); | SESS_ELEM_UNLOCK(sess); |
| if (sess->sess_will.msg) | if (sess->sess_will.msg) |
| Line 148 cmdCONNECT(void *srv, void *arg) | Line 190 cmdCONNECT(void *srv, void *arg) |
| call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, | call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, |
| sess->sess_addr, sess->sess_user); | sess->sess_addr, sess->sess_user); |
| #endif | |
| return 0; | return 0; |
| } | } |
| Line 162 cmdDISCONNECT(void *srv, void *arg) | Line 203 cmdDISCONNECT(void *srv, void *arg) |
| if (!sess) | if (!sess) |
| return -1; | return -1; |
| ioDEBUG(5, "Exec DISCONNECT session"); | |
| SESS_LOCK; | SESS_LOCK; |
| TAILQ_REMOVE(&Sessions, sess, sess_node); | TAILQ_REMOVE(&Sessions, sess, sess_node); |
| SESS_UNLOCK; | SESS_UNLOCK; |