1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: int
8: cmdPUBLISH(void *srv, void *arg)
9: {
10: struct mqtthdr *hdr;
11: struct tagSession *sess = (struct tagSession*) arg;
12:
13: ioTRACE(2);
14:
15: if (!sess)
16: return -1;
17:
18: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
19: switch (hdr->mqtt_msg.qos) {
20: case MQTT_QOS_ONCE:
21: break;
22: case MQTT_QOS_ACK:
23: break;
24: case MQTT_QOS_EXACTLY:
25: break;
26: default:
27: ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request",
28: hdr->mqtt_msg.qos);
29: return 0;
30: }
31:
32: return 0;
33: }
34:
35: int
36: cmdPUBREL(void *srv, void *arg)
37: {
38: struct mqtthdr *hdr;
39: struct tagSession *sess = (struct tagSession*) arg;
40:
41: ioTRACE(2);
42:
43: if (!sess)
44: return -1;
45:
46: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
47:
48: return 0;
49: }
50:
51: int
52: cmdSUBSCRIBE(void *srv, void *arg)
53: {
54: struct mqtthdr *hdr;
55: struct tagSession *sess = (struct tagSession*) arg;
56:
57: ioTRACE(2);
58:
59: if (!sess)
60: return -1;
61:
62: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
63:
64: return 0;
65: }
66:
67: int
68: cmdUNSUBSCRIBE(void *srv, void *arg)
69: {
70: struct mqtthdr *hdr;
71: struct tagSession *sess = (struct tagSession*) arg;
72:
73: ioTRACE(2);
74:
75: if (!sess)
76: return -1;
77:
78: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
79:
80: return 0;
81: }
82:
83: int
84: cmdPINGREQ(void *srv, void *arg)
85: {
86: struct mqtthdr *hdr;
87: struct tagSession *sess = (struct tagSession*) arg;
88: int siz = 0;
89:
90: ioTRACE(2);
91:
92: if (!sess)
93: return -1;
94:
95: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
96: siz = mqtt_msgPINGRESP(sess->sess_buf);
97: if (siz == -1) {
98: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
99: return 0;
100: }
101: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) {
102: ioSYSERR(0);
103: return 0;
104: } else
105: ioDEBUG(5, "Sended %d bytes.", siz);
106:
107: return 0;
108: }
109:
110: int
111: cmdCONNECT(void *srv, void *arg)
112: {
113: struct tagStore *store;
114: struct tagSession *sess = (struct tagSession*) arg;
115: register int i;
116:
117: ioTRACE(2);
118:
119: if (!sess)
120: return -1;
121:
122: SESS_LOCK;
123: TAILQ_REMOVE(&Sessions, sess, sess_node);
124: SESS_UNLOCK;
125:
126: if (call.FiniSessPUB)
127: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
128:
129: SESS_ELEM_LOCK(sess);
130: for (i = 0; i < MQTT_QOS_RESERVED; i++)
131: while ((store = SLIST_FIRST(&sess->sess_txque[i]))) {
132: SLIST_REMOVE_HEAD(&sess->sess_txque[i], st_node);
133:
134: if (store->st_subscr.sub_topic.msg_base)
135: free(store->st_subscr.sub_topic.msg_base);
136: if (store->st_subscr.sub_value.msg_base)
137: free(store->st_subscr.sub_value.msg_base);
138:
139: free(store);
140: }
141: SESS_ELEM_UNLOCK(sess);
142:
143: if (sess->sess_will.msg)
144: free(sess->sess_will.msg);
145: if (sess->sess_will.topic)
146: free(sess->sess_will.topic);
147:
148: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
149: sess->sess_addr, sess->sess_user);
150: return 0;
151: }
152:
153: int
154: cmdDISCONNECT(void *srv, void *arg)
155: {
156: struct tagSession *sess = (struct tagSession*) arg;
157:
158: ioTRACE(2);
159:
160: if (!sess)
161: return -1;
162:
163: SESS_LOCK;
164: TAILQ_REMOVE(&Sessions, sess, sess_node);
165: SESS_UNLOCK;
166:
167: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
168: sess->sess_addr, sess->sess_user);
169: return 0;
170: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>