Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.13
1.2 misho 1: #include "global.h"
2: #include "mqttd.h"
1.2.2.1 misho 3: #include "rtlm.h"
1.2 misho 4: #include "mqttd_calls.h"
5:
6:
7: int
1.2.2.9 misho 8: cmdPUBLISH(void *srv, int len, void *arg)
1.2 misho 9: {
10: struct mqtthdr *hdr;
1.2.2.1 misho 11: struct tagSession *sess = (struct tagSession*) arg;
1.2 misho 12:
13: ioTRACE(2);
14:
15: if (!sess)
16: return -1;
17:
18: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
19: switch (hdr->mqtt_msg.qos) {
20: case MQTT_QOS_ONCE:
21: break;
22: case MQTT_QOS_ACK:
23: break;
24: case MQTT_QOS_EXACTLY:
25: break;
26: default:
27: ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request",
28: hdr->mqtt_msg.qos);
29: return 0;
30: }
31:
32: return 0;
33: }
1.2.2.1 misho 34:
35: int
1.2.2.9 misho 36: cmdPUBREL(void *srv, int len, void *arg)
1.2.2.1 misho 37: {
38: struct mqtthdr *hdr;
39: struct tagSession *sess = (struct tagSession*) arg;
40:
41: ioTRACE(2);
42:
43: if (!sess)
44: return -1;
45:
46: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
47:
48: return 0;
49: }
50:
51: int
1.2.2.9 misho 52: cmdSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 53: {
54: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7 misho 55: mqtt_subscr_t *subs = NULL;
56: int siz = 0;
57: u_short mid = 0;
58: register int i;
59: struct tagStore *store;
1.2.2.1 misho 60:
61: ioTRACE(2);
62:
63: if (!sess)
64: return -1;
65:
1.2.2.7 misho 66: ioDEBUG(5, "Exec SUBSCRIBE session");
67: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
68: if (siz == -1) {
69: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
70: return 0;
71: }
72:
73: /* add to db */
74: for (i = 0; i < siz; i++) {
1.2.2.8 misho 75: if (call.WritePUB_subscribe(&cfg, pub, mid, subs[i].sub_topic.msg_base,
76: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.7 misho 77: store = malloc(sizeof(struct tagStore));
78: if (!store) {
79: ioSYSERR(0);
80: goto end;
81: } else {
82: store->st_msgid = mid;
1.2.2.8 misho 83: mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7 misho 84: }
85:
86: /* add to cache */
87: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
88:
89: subs[i].sub_ret = MQTT_QOS_PASS;
90: } else
91: subs[i].sub_ret = MQTT_QOS_DENY;
92: }
1.2.2.1 misho 93:
1.2.2.7 misho 94: /* send acknowledge */
95: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
96: if (siz == -1) {
97: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
98: goto end;
99: }
1.2.2.13! misho 100: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
1.2.2.7 misho 101: ioSYSERR(0);
1.2.2.8 misho 102: else {
1.2.2.7 misho 103: ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8 misho 104: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
105: }
1.2.2.7 misho 106: end:
107: mqtt_subFree(&subs);
1.2.2.1 misho 108: return 0;
109: }
110:
111: int
1.2.2.9 misho 112: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
1.2.2.1 misho 113: {
114: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.13! misho 115: mqtt_subscr_t *subs = NULL;
! 116: int siz = 0;
! 117: u_short mid = 0;
! 118: register int i;
! 119: struct tagStore *store, *tmp;
1.2.2.1 misho 120:
121: ioTRACE(2);
122:
123: if (!sess)
124: return -1;
125:
1.2.2.13! misho 126: ioDEBUG(5, "Exec UNSUBSCRIBE session");
! 127: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
! 128: if (siz == -1) {
! 129: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 130: return 0;
! 131: }
! 132:
! 133: /* del from db */
! 134: for (i = 0; i < siz; i++) {
! 135: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
! 136: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
! 137: store->st_subscr.sub_topic.msg_base &&
! 138: !strcmp(store->st_subscr.sub_topic.msg_base,
! 139: subs[i].sub_topic.msg_base)) {
! 140: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
! 141: free(store);
! 142: }
! 143: }
! 144:
! 145: call.DeletePUB_subscribe(&cfg, pub, subs[i].sub_topic.msg_base,
! 146: sess->sess_user, sess->sess_addr);
! 147: }
! 148:
! 149: /* send acknowledge */
! 150: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
! 151: if (siz == -1) {
! 152: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 153: goto end;
! 154: }
! 155: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
! 156: ioSYSERR(0);
! 157: else {
! 158: ioDEBUG(5, "Sended %d bytes.", siz);
! 159: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
! 160: }
! 161: end:
! 162: mqtt_subFree(&subs);
1.2.2.1 misho 163: return 0;
164: }
165:
166: int
1.2.2.9 misho 167: cmdPINGREQ(void *srv, int len, void *arg)
1.2.2.1 misho 168: {
169: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2 misho 170: int siz = 0;
1.2.2.1 misho 171:
172: ioTRACE(2);
173:
174: if (!sess)
175: return -1;
176:
1.2.2.7 misho 177: ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2 misho 178: siz = mqtt_msgPINGRESP(sess->sess_buf);
179: if (siz == -1) {
180: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
181: return 0;
182: }
1.2.2.13! misho 183: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) {
1.2.2.2 misho 184: ioSYSERR(0);
185: return 0;
1.2.2.8 misho 186: } else {
1.2.2.2 misho 187: ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8 misho 188: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
189: }
1.2.2.1 misho 190:
191: return 0;
192: }
193:
194: int
1.2.2.9 misho 195: cmdCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 196: {
197: struct tagStore *store;
198: struct tagSession *sess = (struct tagSession*) arg;
199:
200: ioTRACE(2);
201:
202: if (!sess)
203: return -1;
204:
1.2.2.6 misho 205: ioDEBUG(5, "Exec CONNECT session");
1.2.2.1 misho 206: TAILQ_REMOVE(&Sessions, sess, sess_node);
207:
208: if (call.FiniSessPUB)
209: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
210:
1.2.2.6 misho 211: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
212: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3 misho 213:
1.2.2.6 misho 214: if (store->st_subscr.sub_topic.msg_base)
215: free(store->st_subscr.sub_topic.msg_base);
216: if (store->st_subscr.sub_value.msg_base)
217: free(store->st_subscr.sub_value.msg_base);
218:
219: free(store);
220: }
1.2.2.1 misho 221:
222: if (sess->sess_will.msg)
223: free(sess->sess_will.msg);
224: if (sess->sess_will.topic)
225: free(sess->sess_will.topic);
226:
227: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
228: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 229:
1.2.2.12 misho 230: return -3; /* reconnect client */
1.2.2.1 misho 231: }
232:
233: int
1.2.2.9 misho 234: cmdDISCONNECT(void *srv, int len, void *arg)
1.2.2.1 misho 235: {
236: struct tagSession *sess = (struct tagSession*) arg;
237:
238: ioTRACE(2);
239:
240: if (!sess)
241: return -1;
242:
1.2.2.5 misho 243: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.10 misho 244:
1.2.2.1 misho 245: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
246: sess->sess_addr, sess->sess_user);
1.2.2.9 misho 247:
1.2.2.10 misho 248: return -2; /* must terminate dispatcher */
1.2.2.1 misho 249: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>