Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.8
1.2 misho 1: #include "global.h"
2: #include "mqttd.h"
1.2.2.1 misho 3: #include "rtlm.h"
1.2 misho 4: #include "mqttd_calls.h"
5:
6:
7: int
1.2.2.1 misho 8: cmdPUBLISH(void *srv, void *arg)
1.2 misho 9: {
10: struct mqtthdr *hdr;
1.2.2.1 misho 11: struct tagSession *sess = (struct tagSession*) arg;
1.2 misho 12:
13: ioTRACE(2);
14:
15: if (!sess)
16: return -1;
17:
18: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
19: switch (hdr->mqtt_msg.qos) {
20: case MQTT_QOS_ONCE:
21: break;
22: case MQTT_QOS_ACK:
23: break;
24: case MQTT_QOS_EXACTLY:
25: break;
26: default:
27: ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request",
28: hdr->mqtt_msg.qos);
29: return 0;
30: }
31:
32: return 0;
33: }
1.2.2.1 misho 34:
35: int
36: cmdPUBREL(void *srv, void *arg)
37: {
38: struct mqtthdr *hdr;
39: struct tagSession *sess = (struct tagSession*) arg;
40:
41: ioTRACE(2);
42:
43: if (!sess)
44: return -1;
45:
46: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
47:
48: return 0;
49: }
50:
51: int
52: cmdSUBSCRIBE(void *srv, void *arg)
53: {
54: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7 misho 55: mqtt_subscr_t *subs = NULL;
56: int siz = 0;
57: u_short mid = 0;
58: register int i;
59: struct tagStore *store;
1.2.2.1 misho 60:
61: ioTRACE(2);
62:
63: if (!sess)
64: return -1;
65:
1.2.2.7 misho 66: ioDEBUG(5, "Exec SUBSCRIBE session");
67: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
68: if (siz == -1) {
69: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
70: return 0;
71: }
72:
73: /* add to db */
74: for (i = 0; i < siz; i++) {
1.2.2.8 ! misho 75: if (call.WritePUB_subscribe(&cfg, pub, mid, subs[i].sub_topic.msg_base,
! 76: sess->sess_user, sess->sess_addr, subs[i].sub_ret) > 0) {
1.2.2.7 misho 77: store = malloc(sizeof(struct tagStore));
78: if (!store) {
79: ioSYSERR(0);
80: goto end;
81: } else {
82: store->st_msgid = mid;
1.2.2.8 ! misho 83: mqtt_subCopy(&store->st_subscr, &subs[i]);
1.2.2.7 misho 84: }
85:
86: /* add to cache */
87: SESS_ELEM_LOCK(sess);
88: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
89: SESS_ELEM_UNLOCK(sess);
90:
91: subs[i].sub_ret = MQTT_QOS_PASS;
92: } else
93: subs[i].sub_ret = MQTT_QOS_DENY;
94: }
1.2.2.1 misho 95:
1.2.2.7 misho 96: /* send acknowledge */
97: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
98: if (siz == -1) {
99: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
100: goto end;
101: }
102: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1)
103: ioSYSERR(0);
1.2.2.8 ! misho 104: else {
1.2.2.7 misho 105: ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8 ! misho 106: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
! 107: }
1.2.2.7 misho 108: end:
109: mqtt_subFree(&subs);
1.2.2.1 misho 110: return 0;
111: }
112:
113: int
114: cmdUNSUBSCRIBE(void *srv, void *arg)
115: {
116: struct tagSession *sess = (struct tagSession*) arg;
117:
118: ioTRACE(2);
119:
120: if (!sess)
121: return -1;
122:
123: return 0;
124: }
125:
126: int
127: cmdPINGREQ(void *srv, void *arg)
128: {
129: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2 misho 130: int siz = 0;
1.2.2.1 misho 131:
132: ioTRACE(2);
133:
134: if (!sess)
135: return -1;
136:
1.2.2.7 misho 137: ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2 misho 138: siz = mqtt_msgPINGRESP(sess->sess_buf);
139: if (siz == -1) {
140: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
141: return 0;
142: }
143: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) {
144: ioSYSERR(0);
145: return 0;
1.2.2.8 ! misho 146: } else {
1.2.2.2 misho 147: ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.8 ! misho 148: memset(sess->sess_buf->msg_base, 0, sess->sess_buf->msg_len);
! 149: }
1.2.2.1 misho 150:
151: return 0;
152: }
153:
154: int
155: cmdCONNECT(void *srv, void *arg)
156: {
157: struct tagStore *store;
158: struct tagSession *sess = (struct tagSession*) arg;
159:
160: ioTRACE(2);
161:
162: if (!sess)
163: return -1;
164:
1.2.2.6 misho 165: ioDEBUG(5, "Exec CONNECT session");
1.2.2.3 misho 166: SESS_LOCK;
1.2.2.1 misho 167: TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.3 misho 168: SESS_UNLOCK;
1.2.2.1 misho 169:
170: if (call.FiniSessPUB)
171: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
172:
1.2.2.3 misho 173: SESS_ELEM_LOCK(sess);
1.2.2.6 misho 174: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
175: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3 misho 176:
1.2.2.6 misho 177: if (store->st_subscr.sub_topic.msg_base)
178: free(store->st_subscr.sub_topic.msg_base);
179: if (store->st_subscr.sub_value.msg_base)
180: free(store->st_subscr.sub_value.msg_base);
181:
182: free(store);
183: }
1.2.2.3 misho 184: SESS_ELEM_UNLOCK(sess);
1.2.2.1 misho 185:
186: if (sess->sess_will.msg)
187: free(sess->sess_will.msg);
188: if (sess->sess_will.topic)
189: free(sess->sess_will.topic);
190:
191: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
192: sess->sess_addr, sess->sess_user);
193: return 0;
194: }
195:
196: int
197: cmdDISCONNECT(void *srv, void *arg)
198: {
199: struct tagSession *sess = (struct tagSession*) arg;
200:
201: ioTRACE(2);
202:
203: if (!sess)
204: return -1;
205:
1.2.2.5 misho 206: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.3 misho 207: SESS_LOCK;
1.2.2.1 misho 208: TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.3 misho 209: SESS_UNLOCK;
1.2.2.1 misho 210:
211: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
212: sess->sess_addr, sess->sess_user);
213: return 0;
214: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>