1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: int
8: cmdPUBLISH(void *srv, void *arg)
9: {
10: struct mqtthdr *hdr;
11: struct tagSession *sess = (struct tagSession*) arg;
12:
13: ioTRACE(2);
14:
15: if (!sess)
16: return -1;
17:
18: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
19: switch (hdr->mqtt_msg.qos) {
20: case MQTT_QOS_ONCE:
21: break;
22: case MQTT_QOS_ACK:
23: break;
24: case MQTT_QOS_EXACTLY:
25: break;
26: default:
27: ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request",
28: hdr->mqtt_msg.qos);
29: return 0;
30: }
31:
32: return 0;
33: }
34:
35: int
36: cmdPUBREL(void *srv, void *arg)
37: {
38: struct mqtthdr *hdr;
39: struct tagSession *sess = (struct tagSession*) arg;
40:
41: ioTRACE(2);
42:
43: if (!sess)
44: return -1;
45:
46: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
47:
48: return 0;
49: }
50:
51: int
52: cmdSUBSCRIBE(void *srv, void *arg)
53: {
54: struct mqtthdr *hdr;
55: struct tagSession *sess = (struct tagSession*) arg;
56:
57: ioTRACE(2);
58:
59: if (!sess)
60: return -1;
61:
62: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
63:
64: return 0;
65: }
66:
67: int
68: cmdUNSUBSCRIBE(void *srv, void *arg)
69: {
70: struct mqtthdr *hdr;
71: struct tagSession *sess = (struct tagSession*) arg;
72:
73: ioTRACE(2);
74:
75: if (!sess)
76: return -1;
77:
78: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
79:
80: return 0;
81: }
82:
83: int
84: cmdPINGREQ(void *srv, void *arg)
85: {
86: struct mqtthdr *hdr;
87: struct tagSession *sess = (struct tagSession*) arg;
88: int siz = 0;
89:
90: ioTRACE(2);
91:
92: if (!sess)
93: return -1;
94:
95: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
96: siz = mqtt_msgPINGRESP(sess->sess_buf);
97: if (siz == -1) {
98: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
99: return 0;
100: }
101: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) {
102: ioSYSERR(0);
103: return 0;
104: } else
105: ioDEBUG(5, "Sended %d bytes.", siz);
106:
107: return 0;
108: }
109:
110: int
111: cmdCONNECT(void *srv, void *arg)
112: {
113: struct tagStore *store;
114: struct tagSession *sess = (struct tagSession*) arg;
115:
116: ioTRACE(2);
117:
118: if (!sess)
119: return -1;
120:
121: ioDEBUG(5, "Exec CONNECT session");
122: SESS_LOCK;
123: TAILQ_REMOVE(&Sessions, sess, sess_node);
124: SESS_UNLOCK;
125:
126: if (call.FiniSessPUB)
127: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
128:
129: SESS_ELEM_LOCK(sess);
130: while ((store = SLIST_FIRST(&sess->sess_subscr))) {
131: SLIST_REMOVE_HEAD(&sess->sess_subscr, st_node);
132:
133: if (store->st_subscr.sub_topic.msg_base)
134: free(store->st_subscr.sub_topic.msg_base);
135: if (store->st_subscr.sub_value.msg_base)
136: free(store->st_subscr.sub_value.msg_base);
137:
138: free(store);
139: }
140: SESS_ELEM_UNLOCK(sess);
141:
142: if (sess->sess_will.msg)
143: free(sess->sess_will.msg);
144: if (sess->sess_will.topic)
145: free(sess->sess_will.topic);
146:
147: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
148: sess->sess_addr, sess->sess_user);
149: return 0;
150: }
151:
152: int
153: cmdDISCONNECT(void *srv, void *arg)
154: {
155: struct tagSession *sess = (struct tagSession*) arg;
156:
157: ioTRACE(2);
158:
159: if (!sess)
160: return -1;
161:
162: ioDEBUG(5, "Exec DISCONNECT session");
163: SESS_LOCK;
164: TAILQ_REMOVE(&Sessions, sess, sess_node);
165: SESS_UNLOCK;
166:
167: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
168: sess->sess_addr, sess->sess_user);
169: return 0;
170: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>