Diff for /mqtt/src/mqttd_calls.c between versions 1.4 and 1.4.4.2

version 1.4, 2012/07/03 12:46:01 version 1.4.4.2, 2013/07/16 14:34:58
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 55  mkPkt(void * __restrict data, int dlen) Line 55  mkPkt(void * __restrict data, int dlen)
 {  {
         ait_val_t *p = NULL;          ait_val_t *p = NULL;
   
        if (!(p = io_allocVar())) {        if (!(p = ait_allocVar())) {
                ioDEBUG(7, "Error:: in send packet prepare #%d - %s", io_GetErrno(), io_GetError());                EVERBOSE(7, "Error:: in send packet prepare #%d - %s", 
                                 elwix_GetErrno(), elwix_GetError());
                 return NULL;                  return NULL;
         }          }
   
Line 72  freePkt(ait_val_t ** __restrict p) Line 73  freePkt(ait_val_t ** __restrict p)
         if (!p)          if (!p)
                 return;                  return;
   
        io_freeVar(p);        ait_freeVar(p);
 }  }
   
 static void *  static void *
Line 83  sendPacket(sched_task_t *task) Line 84  sendPacket(sched_task_t *task)
         u_char *pos;          u_char *pos;
   
         if (!p || AIT_ISEMPTY(p)) {          if (!p || AIT_ISEMPTY(p)) {
                ioDEBUG(9, "Error:: invalid packet or found empty content ...");                EVERBOSE(9, "Error:: invalid packet or found empty content ...");
                 return NULL;                  return NULL;
         }          }
   
        ioDEBUG(7, "Send packet length %d for socket %d\n", AIT_LEN(p), (u_int) TASK_FD(task));        EVERBOSE(7, "Send packet length %d for socket %d\n", AIT_LEN(p), (u_int) TASK_FD(task));
   
         for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {          for (slen = AIT_LEN(p), pos = AIT_GET_BUF(p); slen > 0; slen -= n, pos += n) {
                 n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);                  n = send(TASK_FD(task), pos, slen, MSG_NOSIGNAL);
                 if (n == -1) {                  if (n == -1) {
                        ioSYSERR(0);                        ESYSERR(0);
                         break;                          break;
                 }                  }
         }          }
Line 121  search4send(struct tagSession * __restrict sess, const Line 122  search4send(struct tagSession * __restrict sess, const
                                 if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {                                  if ((ret = regcomp(&re, st->st_subscr.sub_topic.msg_base, REG_EXTENDED))) {
                                         regerror(ret, &re, szStr, sizeof szStr);                                          regerror(ret, &re, szStr, sizeof szStr);
                                         regfree(&re);                                          regfree(&re);
                                        ioDEBUG(3, "Error:: regcomp(%s) %s\n", (char*)                                        EVERBOSE(3, "Error:: regcomp(%s) %s\n", (char*)
                                                         st->st_subscr.sub_topic.msg_base, szStr);                                                          st->st_subscr.sub_topic.msg_base, szStr);
                                 }                                  }
                                 if (!regexec(&re, topic, 1, &match, 0)) {                                  if (!regexec(&re, topic, 1, &match, 0)) {
Line 147  sendRetain(sched_task_t *task) Line 148  sendRetain(sched_task_t *task)
         struct tagSession *sess;          struct tagSession *sess;
         int siz;          int siz;
   
        ioTRACE(2);        ETRACE();
   
         assert(task);          assert(task);
   
Line 155  sendRetain(sched_task_t *task) Line 156  sendRetain(sched_task_t *task)
         assert(sess);          assert(sess);
   
         if (!sess->sess_buf) {          if (!sess->sess_buf) {
                ioDEBUG(9, "WARNING! No allocated buffer!?!\n");                EVERBOSE(9, "WARNING! No allocated buffer!?!\n");
                 return NULL;                  return NULL;
         }          }
   
Line 167  sendRetain(sched_task_t *task) Line 168  sendRetain(sched_task_t *task)
                 siz = s->sub_value.msg_len;                  siz = s->sub_value.msg_len;
                 memcpy(sess->sess_buf->msg_base, s->sub_value.msg_base,                   memcpy(sess->sess_buf->msg_base, s->sub_value.msg_base, 
                                 MIN(sess->sess_buf->msg_len, s->sub_value.msg_len));                                  MIN(sess->sess_buf->msg_len, s->sub_value.msg_len));
                ioDEBUG(7, "Sending retain message %d bytes, QoS %hhd topic '%s' data length %d\n",                 EVERBOSE(7, "Sending retain message %d bytes, QoS %hhd topic '%s' data length %d\n", 
                                 siz, s->sub_ret, (char*) s->sub_topic.msg_base, s->sub_value.msg_len);                                  siz, s->sub_ret, (char*) s->sub_topic.msg_base, s->sub_value.msg_len);
                 if (siz > 0)                  if (siz > 0)
                         search4send(sess, s->sub_topic.msg_base, siz, s->sub_ret);                          search4send(sess, s->sub_topic.msg_base, siz, s->sub_ret);
Line 182  pubWill(struct tagSession * __restrict sess) Line 183  pubWill(struct tagSession * __restrict sess)
 {  {
         int datlen;          int datlen;
   
        ioTRACE(2);        ETRACE();
   
         /* prepare will packet */          /* prepare will packet */
         datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0,           datlen = mqtt_msgPUBLISH(sess->sess_buf, sess->sess_will.topic, 0xDEAD, 0, 1, 0, 
Line 245  cmdPUBLISH(void *srv, int len, void *arg) Line 246  cmdPUBLISH(void *srv, int len, void *arg)
         u_short mid = 0;          u_short mid = 0;
         ait_val_t *p = NULL;          ait_val_t *p = NULL;
   
        ioTRACE(2);        ETRACE();
   
         if (!sess)          if (!sess)
                 return -1;                  return -1;
   
        ioDEBUG(5, "Exec PUBLISH session");        EVERBOSE(5, "Exec PUBLISH session");
         siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);          siz = mqtt_readPUBLISH(sess->sess_buf, szTopic, sizeof szTopic, &mid, NULL);
         if (siz == -1) {          if (siz == -1) {
                ioDEBUG(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());                EVERBOSE(5, "Error:: in readPUBLISH #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                 return 0;                  return 0;
         }          }
   
Line 264  cmdPUBLISH(void *srv, int len, void *arg) Line 265  cmdPUBLISH(void *srv, int len, void *arg)
                                 return 0;                                  return 0;
                         siz = mqtt_msgPUBACK(sess->sess_buf, mid);                          siz = mqtt_msgPUBACK(sess->sess_buf, mid);
                         if (siz == -1) {                          if (siz == -1) {
                                ioDEBUG(5, "Error:: in msgPUBACK #%d - %s",                                 EVERBOSE(5, "Error:: in msgPUBACK #%d - %s", 
                                                 mqtt_GetErrno(), mqtt_GetError());                                                  mqtt_GetErrno(), mqtt_GetError());
                                 return 0;                                  return 0;
                         }                          }
Line 274  cmdPUBLISH(void *srv, int len, void *arg) Line 275  cmdPUBLISH(void *srv, int len, void *arg)
                                 return 0;                                  return 0;
                         siz = mqtt_msgPUBREC(sess->sess_buf, mid);                          siz = mqtt_msgPUBREC(sess->sess_buf, mid);
                         if (siz == -1) {                          if (siz == -1) {
                                ioDEBUG(5, "Error:: in msgPUBREC #%d - %s",                                 EVERBOSE(5, "Error:: in msgPUBREC #%d - %s", 
                                                 mqtt_GetErrno(), mqtt_GetError());                                                  mqtt_GetErrno(), mqtt_GetError());
                                 return 0;                                  return 0;
                         }                          }
Line 299  cmdPUBREL(void *srv, int len, void *arg) Line 300  cmdPUBREL(void *srv, int len, void *arg)
         u_short mid = 0;          u_short mid = 0;
         ait_val_t *p = NULL;          ait_val_t *p = NULL;
   
        ioTRACE(2);        ETRACE();
   
         if (!sess)          if (!sess)
                 return -1;                  return -1;
   
        ioDEBUG(5, "Exec PUBREL session");        EVERBOSE(5, "Exec PUBREL session");
         mid = mqtt_readPUBREL(sess->sess_buf);          mid = mqtt_readPUBREL(sess->sess_buf);
         if (mid == (u_short) -1) {          if (mid == (u_short) -1) {
                ioDEBUG(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());                EVERBOSE(5, "Error:: in readPUBREL #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                 return 0;                  return 0;
         }          }
   
Line 317  cmdPUBREL(void *srv, int len, void *arg) Line 318  cmdPUBREL(void *srv, int len, void *arg)
   
         siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);          siz = mqtt_msgPUBCOMP(sess->sess_buf, mid);
         if (siz == -1) {          if (siz == -1) {
                ioDEBUG(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());                EVERBOSE(5, "Error:: in msgPUBCOMP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                 return 0;                  return 0;
         }          }
   
Line 340  cmdSUBSCRIBE(void *srv, int len, void *arg) Line 341  cmdSUBSCRIBE(void *srv, int len, void *arg)
         void *ptr;          void *ptr;
         ait_val_t *p = NULL;          ait_val_t *p = NULL;
   
        ioTRACE(2);        ETRACE();
   
         if (!sess)          if (!sess)
                 return -1;                  return -1;
   
        ioDEBUG(5, "Exec SUBSCRIBE session");        EVERBOSE(5, "Exec SUBSCRIBE session");
         siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);          siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
         if (siz == -1) {          if (siz == -1) {
                ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());                EVERBOSE(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                 return 0;                  return 0;
         }          }
   
         /* add to db */          /* add to db */
         for (i = 0; i < siz; i++) {          for (i = 0; i < siz; i++) {
                store = io_malloc(sizeof(struct tagStore));                store = e_malloc(sizeof(struct tagStore));
                 if (!store) {                  if (!store) {
                        ioSYSERR(0);                        ELIBERR(elwix);
                         continue;                          continue;
                 } else {                  } else {
                         store->st_msgid = mid;                          store->st_msgid = mid;
Line 369  cmdSUBSCRIBE(void *srv, int len, void *arg) Line 370  cmdSUBSCRIBE(void *srv, int len, void *arg)
   
                 /* convert topic to regexp */                  /* convert topic to regexp */
                 if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {                  if (mqtt_expandTopic(subs[i].sub_topic.msg_base, buf, sizeof buf, 1, 1) == -1) {
                        ioDEBUG(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());                        EVERBOSE(5, "Error:: in regexp #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                 } else {                  } else {
                         ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);                          ptr = realloc(store->st_subscr.sub_topic.msg_base, strlen(buf) + 1);
                         if (!ptr) {                          if (!ptr) {
                                ioSYSERR(0);                                ESYSERR(0);
                                 continue;                                  continue;
                         } else {                          } else {
                                 store->st_subscr.sub_topic.msg_base = ptr;                                  store->st_subscr.sub_topic.msg_base = ptr;
Line 398  cmdSUBSCRIBE(void *srv, int len, void *arg) Line 399  cmdSUBSCRIBE(void *srv, int len, void *arg)
         /* send acknowledge */          /* send acknowledge */
         siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);          siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
         if (siz == -1) {          if (siz == -1) {
                ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());                EVERBOSE(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                 goto end;                  goto end;
         } else {          } else {
                 p = mkPkt(sess->sess_buf->msg_base, siz);                  p = mkPkt(sess->sess_buf->msg_base, siz);
Line 422  cmdUNSUBSCRIBE(void *srv, int len, void *arg) Line 423  cmdUNSUBSCRIBE(void *srv, int len, void *arg)
         struct tagStore *store, *tmp;          struct tagStore *store, *tmp;
         ait_val_t *p = NULL;          ait_val_t *p = NULL;
   
        ioTRACE(2);        ETRACE();
   
         if (!sess)          if (!sess)
                 return -1;                  return -1;
   
        ioDEBUG(5, "Exec UNSUBSCRIBE session");        EVERBOSE(5, "Exec UNSUBSCRIBE session");
         siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);          siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
         if (siz == -1) {          if (siz == -1) {
                ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());                EVERBOSE(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                 return 0;                  return 0;
         }          }
   
Line 447  cmdUNSUBSCRIBE(void *srv, int len, void *arg) Line 448  cmdUNSUBSCRIBE(void *srv, int len, void *arg)
                                         free(store->st_subscr.sub_topic.msg_base);                                          free(store->st_subscr.sub_topic.msg_base);
                                 if (store->st_subscr.sub_value.msg_base)                                  if (store->st_subscr.sub_value.msg_base)
                                         free(store->st_subscr.sub_value.msg_base);                                          free(store->st_subscr.sub_value.msg_base);
                                io_free(store);                                e_free(store);
                         }                          }
                 }                  }
   
Line 458  cmdUNSUBSCRIBE(void *srv, int len, void *arg) Line 459  cmdUNSUBSCRIBE(void *srv, int len, void *arg)
         /* send acknowledge */          /* send acknowledge */
         siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);          siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
         if (siz == -1) {          if (siz == -1) {
                ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());                EVERBOSE(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                 goto end;                  goto end;
         } else {          } else {
                 p = mkPkt(sess->sess_buf->msg_base, siz);                  p = mkPkt(sess->sess_buf->msg_base, siz);
Line 478  cmdPINGREQ(void *srv, int len, void *arg) Line 479  cmdPINGREQ(void *srv, int len, void *arg)
         int siz = 0;          int siz = 0;
         ait_val_t *p = NULL;          ait_val_t *p = NULL;
   
        ioTRACE(2);        ETRACE();
   
         if (!sess)          if (!sess)
                 return -1;                  return -1;
   
        ioDEBUG(5, "Exec PINGREQ session");        EVERBOSE(5, "Exec PINGREQ session");
         siz = mqtt_msgPINGRESP(sess->sess_buf);          siz = mqtt_msgPINGRESP(sess->sess_buf);
         if (siz == -1) {          if (siz == -1) {
                ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());                EVERBOSE(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
                 return 0;                  return 0;
         } else {          } else {
                 p = mkPkt(sess->sess_buf->msg_base, siz);                  p = mkPkt(sess->sess_buf->msg_base, siz);
Line 503  cmdCONNECT(void *srv, int len, void *arg) Line 504  cmdCONNECT(void *srv, int len, void *arg)
         struct tagStore *store;          struct tagStore *store;
         struct tagSession *sess = (struct tagSession*) arg;          struct tagSession *sess = (struct tagSession*) arg;
   
        ioTRACE(2);        ETRACE();
   
         if (!sess)          if (!sess)
                 return -1;                  return -1;
   
        ioDEBUG(5, "Exec CONNECT session");        EVERBOSE(5, "Exec CONNECT session");
         TAILQ_REMOVE(&Sessions, sess, sess_node);          TAILQ_REMOVE(&Sessions, sess, sess_node);
   
         schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL);          schedCancelby(root, taskTIMER, CRITERIA_CALL, sendRetain, NULL);
Line 530  cmdCONNECT(void *srv, int len, void *arg) Line 531  cmdCONNECT(void *srv, int len, void *arg)
                 if (store->st_subscr.sub_value.msg_base)                  if (store->st_subscr.sub_value.msg_base)
                         free(store->st_subscr.sub_value.msg_base);                          free(store->st_subscr.sub_value.msg_base);
   
                io_free(store);                e_free(store);
         }          }
   
         if (sess->sess_will.flag)          if (sess->sess_will.flag)
Line 552  cmdDISCONNECT(void *srv, int len, void *arg) Line 553  cmdDISCONNECT(void *srv, int len, void *arg)
 {  {
         struct tagSession *sess = (struct tagSession*) arg;          struct tagSession *sess = (struct tagSession*) arg;
   
        ioTRACE(2);        ETRACE();
   
         if (!sess)          if (!sess)
                 return -1;                  return -1;
   
        ioDEBUG(5, "Exec DISCONNECT session");        EVERBOSE(5, "Exec DISCONNECT session");
   
         call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,           call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid, 
                         sess->sess_addr, sess->sess_user);                          sess->sess_addr, sess->sess_user);

Removed from v.1.4  
changed lines
  Added in v.1.4.4.2


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>