1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: int
8: cmdPUBLISH(void *srv, int len, void *arg)
9: {
10: struct mqtthdr *hdr;
11: struct tagSession *sess = (struct tagSession*) arg;
12:
13: ioTRACE(2);
14:
15: if (!sess)
16: return -1;
17:
18: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
19: switch (hdr->mqtt_msg.qos) {
20: case MQTT_QOS_ONCE:
21: break;
22: case MQTT_QOS_ACK:
23: break;
24: case MQTT_QOS_EXACTLY:
25: break;
26: default:
27: ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request",
28: hdr->mqtt_msg.qos);
29: return 0;
30: }
31:
32: return 0;
33: }
34:
35: int
36: cmdPUBREL(void *srv, int len, void *arg)
37: {
38: struct mqtthdr *hdr;
39: struct tagSession *sess = (struct tagSession*) arg;
40:
41: ioTRACE(2);
42:
43: if (!sess)
44: return -1;
45:
46: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
47:
48: return 0;
49: }
50:
51: int
52: cmdSUBSCRIBE(void *srv, int len, void *arg)
53: {
54: struct tagSession *sess = (struct tagSession*) arg;
55: mqtt_subscr_t *subs = NULL;
56: int siz = 0;
57: u_short mid = 0;
58: register int i;
59: struct tagStore *store;
60:
61: ioTRACE(2);
62:
63: if (!sess)
64: return -1;
65:
66: ioDEBUG(5, "Exec SUBSCRIBE session");
67: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
68: if (siz == -1) {
69: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
70: return 0;
71: }
72:
73: /* add to db */
74: for (i = 0; i < siz; i++) {
75: if (call.WritePUB_subscribe(&cfg, pub, mid, subs[i].sub_topic.msg_base,
76: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
77: store = malloc(sizeof(struct tagStore));
78: if (!store) {
79: ioSYSERR(0);
80: goto end;
81: } else {
82: store->st_msgid = mid;
83: mqtt_subCopy(&store->st_subscr, &subs[i]);
84: }
85:
86: /* add to cache */
87: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
88:
89: subs[i].sub_ret = MQTT_QOS_PASS;
90: } else
91: subs[i].sub_ret = MQTT_QOS_DENY;
92: }
93:
94: /* send acknowledge */
95: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
96: if (siz == -1) {
97: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
98: goto end;
99: }
100: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
101: ioSYSERR(0);
102: else {
103: ioDEBUG(5, "Sended %d bytes.", siz);
104: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
105: }
106: end:
107: mqtt_subFree(&subs);
108: return 0;
109: }
110:
111: int
112: cmdUNSUBSCRIBE(void *srv, int len, void *arg)
113: {
114: struct tagSession *sess = (struct tagSession*) arg;
115: mqtt_subscr_t *subs = NULL;
116: int siz = 0;
117: u_short mid = 0;
118: register int i;
119: struct tagStore *store, *tmp;
120:
121: ioTRACE(2);
122:
123: if (!sess)
124: return -1;
125:
126: ioDEBUG(5, "Exec UNSUBSCRIBE session");
127: siz = mqtt_readUNSUBSCRIBE(sess->sess_buf, &mid, &subs);
128: if (siz == -1) {
129: ioDEBUG(5, "Error:: in readUNSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
130: return 0;
131: }
132:
133: /* del from db */
134: for (i = 0; i < siz; i++) {
135: SLIST_FOREACH_SAFE(store, &sess->sess_subscr, st_node, tmp) {
136: if (store->st_subscr.sub_ret == subs[i].sub_ret &&
137: store->st_subscr.sub_topic.msg_base &&
138: !strcmp(store->st_subscr.sub_topic.msg_base,
139: subs[i].sub_topic.msg_base)) {
140: SLIST_REMOVE(&sess->sess_subscr, store, tagStore, st_node);
141:
142: if (store->st_subscr.sub_topic.msg_base)
143: free(store->st_subscr.sub_topic.msg_base);
144: if (store->st_subscr.sub_value.msg_base)
145: free(store->st_subscr.sub_value.msg_base);
146: free(store);
147: }
148: }
149:
150: call.DeletePUB_subscribe(&cfg, pub, subs[i].sub_topic.msg_base,
151: sess->sess_user, sess->sess_addr);
152: }
153:
154: /* send acknowledge */
155: siz = mqtt_msgUNSUBACK(sess->sess_buf, mid);
156: if (siz == -1) {
157: ioDEBUG(5, "Error:: in msgUNSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
158: goto end;
159: }
160: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1)
161: ioSYSERR(0);
162: else {
163: ioDEBUG(5, "Sended %d bytes.", siz);
164: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
165: }
166: end:
167: mqtt_subFree(&subs);
168: return 0;
169: }
170:
171: int
172: cmdPINGREQ(void *srv, int len, void *arg)
173: {
174: struct tagSession *sess = (struct tagSession*) arg;
175: int siz = 0;
176:
177: ioTRACE(2);
178:
179: if (!sess)
180: return -1;
181:
182: ioDEBUG(5, "Exec PINGREQ session");
183: siz = mqtt_msgPINGRESP(sess->sess_buf);
184: if (siz == -1) {
185: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
186: return 0;
187: }
188: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, MSG_NOSIGNAL)) == -1) {
189: ioSYSERR(0);
190: return 0;
191: } else {
192: ioDEBUG(5, "Sended %d bytes.", siz);
193: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
194: }
195:
196: return 0;
197: }
198:
199: int
200: cmdCONNECT(void *srv, int len, void *arg)
201: {
202: struct tagStore *store;
203: struct tagSession *sess = (struct tagSession*) arg;
204:
205: ioTRACE(2);
206:
207: if (!sess)
208: return -1;
209:
210: ioDEBUG(5, "Exec CONNECT session");
211: TAILQ_REMOVE(&Sessions, sess, sess_node);
212:
213: if (call.FiniSessPUB)
214: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
215:
216: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
217: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
218:
219: if (store->st_subscr.sub_topic.msg_base)
220: free(store->st_subscr.sub_topic.msg_base);
221: if (store->st_subscr.sub_value.msg_base)
222: free(store->st_subscr.sub_value.msg_base);
223:
224: free(store);
225: }
226:
227: if (sess->sess_will.msg)
228: free(sess->sess_will.msg);
229: if (sess->sess_will.topic)
230: free(sess->sess_will.topic);
231:
232: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
233: sess->sess_addr, sess->sess_user);
234:
235: return -3; /* reconnect client */
236: }
237:
238: int
239: cmdDISCONNECT(void *srv, int len, void *arg)
240: {
241: struct tagSession *sess = (struct tagSession*) arg;
242:
243: ioTRACE(2);
244:
245: if (!sess)
246: return -1;
247:
248: ioDEBUG(5, "Exec DISCONNECT session");
249:
250: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
251: sess->sess_addr, sess->sess_user);
252:
253: return -2; /* must terminate dispatcher */
254: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>