Annotation of mqtt/src/mqttd_calls.c, revision 1.2.2.7
1.2 misho 1: #include "global.h"
2: #include "mqttd.h"
1.2.2.1 misho 3: #include "rtlm.h"
1.2 misho 4: #include "mqttd_calls.h"
5:
6:
7: int
1.2.2.1 misho 8: cmdPUBLISH(void *srv, void *arg)
1.2 misho 9: {
10: struct mqtthdr *hdr;
1.2.2.1 misho 11: struct tagSession *sess = (struct tagSession*) arg;
1.2 misho 12:
13: ioTRACE(2);
14:
15: if (!sess)
16: return -1;
17:
18: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
19: switch (hdr->mqtt_msg.qos) {
20: case MQTT_QOS_ONCE:
21: break;
22: case MQTT_QOS_ACK:
23: break;
24: case MQTT_QOS_EXACTLY:
25: break;
26: default:
27: ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request",
28: hdr->mqtt_msg.qos);
29: return 0;
30: }
31:
32: return 0;
33: }
1.2.2.1 misho 34:
35: int
36: cmdPUBREL(void *srv, void *arg)
37: {
38: struct mqtthdr *hdr;
39: struct tagSession *sess = (struct tagSession*) arg;
40:
41: ioTRACE(2);
42:
43: if (!sess)
44: return -1;
45:
46: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
47:
48: return 0;
49: }
50:
51: int
52: cmdSUBSCRIBE(void *srv, void *arg)
53: {
54: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.7 ! misho 55: mqtt_subscr_t *subs = NULL;
! 56: int siz = 0;
! 57: u_short mid = 0;
! 58: register int i;
! 59: struct tagStore *store;
1.2.2.1 misho 60:
61: ioTRACE(2);
62:
63: if (!sess)
64: return -1;
65:
1.2.2.7 ! misho 66: ioDEBUG(5, "Exec SUBSCRIBE session");
! 67: siz = mqtt_readSUBSCRIBE(sess->sess_buf, &mid, &subs);
! 68: if (siz == -1) {
! 69: ioDEBUG(5, "Error:: in readSUBSCRIBE #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 70: return 0;
! 71: }
! 72:
! 73: /* add to db */
! 74: for (i = 0; i < siz; i++) {
! 75: if ((siz = call.WritePUB_subscribe(&cfg, pub, mid, subs[i].sub_topic.msg_base,
! 76: sess->sess_user, sess->sess_addr, subs[i].sub_ret)) > 0) {
! 77: store = malloc(sizeof(struct tagStore));
! 78: if (!store) {
! 79: ioSYSERR(0);
! 80: goto end;
! 81: } else {
! 82: store->st_msgid = mid;
! 83: store->st_subscr = subs[i];
! 84: }
! 85:
! 86: /* add to cache */
! 87: SESS_ELEM_LOCK(sess);
! 88: SLIST_INSERT_HEAD(&sess->sess_subscr, store, st_node);
! 89: SESS_ELEM_UNLOCK(sess);
! 90:
! 91: subs[i].sub_ret = MQTT_QOS_PASS;
! 92: } else
! 93: subs[i].sub_ret = MQTT_QOS_DENY;
! 94: }
1.2.2.1 misho 95:
1.2.2.7 ! misho 96: /* send acknowledge */
! 97: siz = mqtt_msgSUBACK(sess->sess_buf, subs, mid);
! 98: if (siz == -1) {
! 99: ioDEBUG(5, "Error:: in msgSUBACK #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 100: goto end;
! 101: }
! 102: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1)
! 103: ioSYSERR(0);
! 104: else
! 105: ioDEBUG(5, "Sended %d bytes.", siz);
! 106: end:
! 107: mqtt_subFree(&subs);
1.2.2.1 misho 108: return 0;
109: }
110:
111: int
112: cmdUNSUBSCRIBE(void *srv, void *arg)
113: {
114: struct tagSession *sess = (struct tagSession*) arg;
115:
116: ioTRACE(2);
117:
118: if (!sess)
119: return -1;
120:
121: return 0;
122: }
123:
124: int
125: cmdPINGREQ(void *srv, void *arg)
126: {
127: struct tagSession *sess = (struct tagSession*) arg;
1.2.2.2 misho 128: int siz = 0;
1.2.2.1 misho 129:
130: ioTRACE(2);
131:
132: if (!sess)
133: return -1;
134:
1.2.2.7 ! misho 135: ioDEBUG(5, "Exec PINGREQ session");
1.2.2.2 misho 136: siz = mqtt_msgPINGRESP(sess->sess_buf);
137: if (siz == -1) {
138: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
139: return 0;
140: }
141: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) {
142: ioSYSERR(0);
143: return 0;
144: } else
145: ioDEBUG(5, "Sended %d bytes.", siz);
1.2.2.1 misho 146:
147: return 0;
148: }
149:
150: int
151: cmdCONNECT(void *srv, void *arg)
152: {
153: struct tagStore *store;
154: struct tagSession *sess = (struct tagSession*) arg;
155:
156: ioTRACE(2);
157:
158: if (!sess)
159: return -1;
160:
1.2.2.6 misho 161: ioDEBUG(5, "Exec CONNECT session");
1.2.2.3 misho 162: SESS_LOCK;
1.2.2.1 misho 163: TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.3 misho 164: SESS_UNLOCK;
1.2.2.1 misho 165:
166: if (call.FiniSessPUB)
167: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
168:
1.2.2.3 misho 169: SESS_ELEM_LOCK(sess);
1.2.2.6 misho 170: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
171: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
1.2.2.3 misho 172:
1.2.2.6 misho 173: if (store->st_subscr.sub_topic.msg_base)
174: free(store->st_subscr.sub_topic.msg_base);
175: if (store->st_subscr.sub_value.msg_base)
176: free(store->st_subscr.sub_value.msg_base);
177:
178: free(store);
179: }
1.2.2.3 misho 180: SESS_ELEM_UNLOCK(sess);
1.2.2.1 misho 181:
182: if (sess->sess_will.msg)
183: free(sess->sess_will.msg);
184: if (sess->sess_will.topic)
185: free(sess->sess_will.topic);
186:
187: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
188: sess->sess_addr, sess->sess_user);
189: return 0;
190: }
191:
192: int
193: cmdDISCONNECT(void *srv, void *arg)
194: {
195: struct tagSession *sess = (struct tagSession*) arg;
196:
197: ioTRACE(2);
198:
199: if (!sess)
200: return -1;
201:
1.2.2.5 misho 202: ioDEBUG(5, "Exec DISCONNECT session");
1.2.2.3 misho 203: SESS_LOCK;
1.2.2.1 misho 204: TAILQ_REMOVE(&Sessions, sess, sess_node);
1.2.2.3 misho 205: SESS_UNLOCK;
1.2.2.1 misho 206:
207: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
208: sess->sess_addr, sess->sess_user);
209: return 0;
210: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>