| version 1.2.2.22, 2012/05/03 15:05:37 | version 1.2.2.23, 2012/05/05 12:15:25 | 
| Line 107  finiSession(struct tagSession *sess) | Line 107  finiSession(struct tagSession *sess) | 
 | free(sess); | free(sess); | 
 | } | } | 
 |  |  | 
| static void | static void * | 
| stopSession(struct tagSession *sess) | leaveClient(sched_task_t *task) | 
 | { | { | 
| mqtt_msg_t msg = { NULL, 0 }; | struct tagSession *sess; | 
 | int ret; | int ret; | 
 |  |  | 
 | ioTRACE(4); | ioTRACE(4); | 
 |  |  | 
 |  | assert(task); | 
 |  |  | 
 |  | sess = TASK_ARG(task); | 
 | assert(sess); | assert(sess); | 
 |  |  | 
 | SESS_LOCK; | SESS_LOCK; | 
 | TAILQ_REMOVE(&Sessions, sess, sess_node); | TAILQ_REMOVE(&Sessions, sess, sess_node); | 
 | SESS_UNLOCK; | SESS_UNLOCK; | 
 |  |  | 
| ret = mqtt_msgDISCONNECT(&msg); | ret = mqtt_msgDISCONNECT(sess->sess_buf); | 
| send(sess->sess_sock, msg.msg_base, ret, MSG_NOSIGNAL); | send(TASK_FD(task), sess->sess_buf->msg_base, ret, MSG_NOSIGNAL); | 
| free(msg.msg_base); |  | 
 |  |  | 
| ioDEBUG(1, "Close socket=%d", sess->sess_sock); | ioDEBUG(1, "Close socket=%d", TASK_FD(task)); | 
 | call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, | call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, | 
 | sess->sess_addr, sess->sess_user); | sess->sess_addr, sess->sess_user); | 
 |  |  | 
 | finiSession(sess); | finiSession(sess); | 
 |  | return NULL; | 
 | } | } | 
 |  |  | 
 | static void * | static void * | 
| thrSession(struct tagSession *sess) | dispatchSession(sched_task_t *task) | 
 | { | { | 
| int ret, locKill = 42; | int ret; | 
| struct pollfd pfd; |  | 
 | struct mqtthdr *hdr; | struct mqtthdr *hdr; | 
 |  | struct tagSession *sess; | 
 |  |  | 
 | pthread_cleanup_push((void(*)(void*)) stopSession, sess); |  | 
 | ioTRACE(2); | ioTRACE(2); | 
 |  |  | 
| pfd.fd = sess->sess_sock; | assert(task); | 
| pfd.events = POLLIN | POLLPRI; |  | 
| while (!Kill && locKill) { |  | 
| if ((ret = poll(&pfd, 1, sess->sess_ka * 1000)) == -1 || |  | 
| pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { |  | 
| ioDEBUG(3, "Error:: poll(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); |  | 
| break; |  | 
| } else if (!ret && (ret = mqtt_KeepAlive(sess->sess_sock, sess->sess_ka, 1))) { |  | 
| call.LOG(logg, "Session %s keep-alive missing from %s for user %s ...\n", |  | 
| sess->sess_cid, sess->sess_addr, sess->sess_user); |  | 
| break; |  | 
| } |  | 
| /* receive & decode packet */ |  | 
| if ((ret = recv(sess->sess_sock, sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) { |  | 
| ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); |  | 
| break; |  | 
| } else if (!ret) { |  | 
| ioDEBUG(4, "Session %s EOF received.", sess->sess_cid); |  | 
| break; |  | 
| } else |  | 
| hdr = (struct mqtthdr*) sess->sess_buf->msg_base; |  | 
 |  |  | 
| /* dispatch message type */ | sess = TASK_ARG(task); | 
| if (mqtt_srv_Dispatch(sess->sess_srv, sess)) | assert(sess); | 
| ioLIBERR(mqtt); |  | 
| switch (hdr->mqtt_msg.type) { |  | 
| case MQTT_TYPE_CONNECT: |  | 
| schedEvent(root, startSession, NULL, (u_long) sess->sess_sock, sess, ret); |  | 
| locKill ^= locKill; |  | 
| break; |  | 
| case MQTT_TYPE_DISCONNECT: |  | 
| finiSession(sess); |  | 
| locKill ^= locKill; |  | 
| continue; |  | 
| case MQTT_TYPE_SUBSCRIBE: |  | 
| case MQTT_TYPE_PINGREQ: |  | 
| break; |  | 
| case MQTT_TYPE_PUBLISH: |  | 
| ioDEBUG(5, "Exec PUBLISH topic QoS=%d", hdr->mqtt_msg.qos); |  | 
| /* |  | 
| if (cmdPUBLISH(sess)) |  | 
| locKill ^= locKill; |  | 
| */ |  | 
| break; |  | 
| case MQTT_TYPE_PUBREL: |  | 
| break; |  | 
| case MQTT_TYPE_UNSUBSCRIBE: |  | 
| break; |  | 
| default: |  | 
| ioDEBUG(5, "Error:: Session %s, wrong command %d - DISCARDED", |  | 
| sess->sess_cid, hdr->mqtt_msg.type); |  | 
| break; |  | 
| } |  | 
| } |  | 
 |  |  | 
| pthread_cleanup_pop(locKill); | /* receive & decode packet */ | 
| pthread_exit(NULL); | if ((ret = recv(TASK_FD(task), sess->sess_buf->msg_base, sess->sess_buf->msg_len, 0)) == -1) { | 
|  | ioDEBUG(3, "Error:: recv(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); | 
|  | finiSession(sess); | 
|  | return NULL; | 
|  | } else if (!ret) { | 
|  | ioDEBUG(4, "Session %s EOF received.", sess->sess_cid); | 
|  | finiSession(sess); | 
|  | return NULL; | 
|  | } else | 
|  | hdr = (struct mqtthdr*) sess->sess_buf->msg_base; | 
|  |  | 
|  | /* dispatch message type */ | 
|  | if (mqtt_srv_Dispatch(sess->sess_srv, sess)) | 
|  | ioLIBERR(mqtt); | 
|  | //                      schedEvent(root, startSession, NULL, (u_long) TASK_FD(task), sess, ret); | 
|  |  | 
|  | if (!schedRead(root, dispatchSession, TASK_ARG(task), TASK_FD(task), NULL, 0)) | 
|  | ioLIBERR(sched); | 
 | return NULL; | return NULL; | 
 | } | } | 
 |  |  | 
| Line 206  static void * | Line 175  static void * | 
 | startSession(sched_task_t *task) | startSession(sched_task_t *task) | 
 | { | { | 
 | u_char basebuf[USHRT_MAX]; | u_char basebuf[USHRT_MAX]; | 
| mqtt_msg_t msg = { NULL, 0 }, buf = { basebuf, sizeof basebuf }; | mqtt_msg_t buf = { basebuf, sizeof basebuf }; | 
 | mqtthdr_connflgs_t flg; | mqtthdr_connflgs_t flg; | 
 | mqtthdr_connack_t cack; | mqtthdr_connack_t cack; | 
 | struct tagSession *s, *sess = NULL; | struct tagSession *s, *sess = NULL; | 
| int ret; | int ret, wlen; | 
 |  |  | 
 | ioTRACE(4); | ioTRACE(4); | 
 |  |  | 
| Line 233  startSession(sched_task_t *task) | Line 202  startSession(sched_task_t *task) | 
 | } | } | 
 | } else { | } else { | 
 | sess = TASK_DATA(task); | sess = TASK_DATA(task); | 
 |  | assert(sess); | 
 |  |  | 
 | buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task); | buf.msg_len = TASK_DATLEN(task) > sizeof basebuf ? sizeof basebuf : TASK_DATLEN(task); | 
 | memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len); | memcpy(buf.msg_base, sess->sess_buf->msg_base, buf.msg_len); | 
 | } | } | 
| Line 268  startSession(sched_task_t *task) | Line 239  startSession(sched_task_t *task) | 
 | TAILQ_FOREACH(s, &Sessions, sess_node) | TAILQ_FOREACH(s, &Sessions, sess_node) | 
 | if (!strcmp(s->sess_cid, sess->sess_cid)) { | if (!strcmp(s->sess_cid, sess->sess_cid)) { | 
 | /* found stale session & disconnect it! */ | /* found stale session & disconnect it! */ | 
| stopSession(s); | schedWrite(root, leaveClient, sess, TASK_FD(task), NULL, 0); | 
 | break; | break; | 
 | } | } | 
 | } | } | 
| Line 285  startSession(sched_task_t *task) | Line 256  startSession(sched_task_t *task) | 
 | ret = MQTT_RETCODE_ACCEPTED; | ret = MQTT_RETCODE_ACCEPTED; | 
 | } | } | 
 |  |  | 
 | ret = mqtt_msgCONNACK(&msg, ret); |  | 
 | if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) { |  | 
 | ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); |  | 
 | finiSession(sess); |  | 
 | return NULL; |  | 
 | } else { |  | 
 | ioDEBUG(5, "Sended %d bytes", ret); |  | 
 | free(msg.msg_base); |  | 
 | memset(&msg, 0, sizeof msg); |  | 
 | } |  | 
 |  |  | 
 | /* Start session thread OK ... */ | /* Start session thread OK ... */ | 
 | SESS_LOCK; | SESS_LOCK; | 
 | TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); | TAILQ_INSERT_TAIL(&Sessions, sess, sess_node); | 
 | pthread_create(&sess->sess_tid, &attr, (void*(*)(void*)) thrSession, sess); |  | 
 | SESS_UNLOCK; | SESS_UNLOCK; | 
 |  |  | 
 |  | if (!schedRead(root, dispatchSession, sess, TASK_FD(task), NULL, 0)) | 
 |  | ioLIBERR(sched); | 
 |  |  | 
 | call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, | call.LOG(logg, "Session %s started from %s for user %s (timeout=%d) OK!\n", sess->sess_cid, | 
 | sess->sess_addr, sess->sess_user, sess->sess_ka); | sess->sess_addr, sess->sess_user, sess->sess_ka); | 
 | return NULL; |  | 
 | end:    /* close client connection */ | end:    /* close client connection */ | 
| ret = mqtt_msgCONNACK(&msg, ret); | wlen = mqtt_msgCONNACK(sess->sess_buf, ret); | 
| if ((ret = send(sess->sess_sock, msg.msg_base, ret, 0)) == -1) { | if ((wlen = send(TASK_FD(task), sess->sess_buf->msg_base, wlen, 0)) == -1) { | 
| ioDEBUG(3, "Error:: send(%d) #%d - %s", sess->sess_sock, errno, strerror(errno)); | ioDEBUG(3, "Error:: send(%d) #%d - %s", TASK_FD(task), errno, strerror(errno)); | 
| } else { | } else | 
| ioDEBUG(5, "Sended %d bytes", ret); | ioDEBUG(5, "Sended %d bytes", wlen); | 
| free(msg.msg_base); |  | 
| memset(&msg, 0, sizeof msg); |  | 
| } |  | 
 |  |  | 
| ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, sess->sess_sock); | if (ret != MQTT_RETCODE_ACCEPTED) { | 
| finiSession(sess); | ioDEBUG(1, "Close client %s with socket=%d", sess->sess_addr, TASK_FD(task)); | 
|  | finiSession(sess); | 
|  | } | 
 | return NULL; | return NULL; | 
 | } | } | 
 |  |  |