1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: int
8: cmdPUBLISH(void *srv, void *arg)
9: {
10: struct mqtthdr *hdr;
11: struct tagSession *sess = (struct tagSession*) arg;
12:
13: ioTRACE(2);
14:
15: if (!sess)
16: return -1;
17:
18: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
19: switch (hdr->mqtt_msg.qos) {
20: case MQTT_QOS_ONCE:
21: break;
22: case MQTT_QOS_ACK:
23: break;
24: case MQTT_QOS_EXACTLY:
25: break;
26: default:
27: ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request",
28: hdr->mqtt_msg.qos);
29: return 0;
30: }
31:
32: return 0;
33: }
34:
35: int
36: cmdPUBREL(void *srv, void *arg)
37: {
38: struct mqtthdr *hdr;
39: struct tagSession *sess = (struct tagSession*) arg;
40:
41: ioTRACE(2);
42:
43: if (!sess)
44: return -1;
45:
46: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
47:
48: return 0;
49: }
50:
51: int
52: cmdSUBSCRIBE(void *srv, void *arg)
53: {
54: struct mqtthdr *hdr;
55: struct tagSession *sess = (struct tagSession*) arg;
56:
57: ioTRACE(2);
58:
59: if (!sess)
60: return -1;
61:
62: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
63:
64: return 0;
65: }
66:
67: int
68: cmdUNSUBSCRIBE(void *srv, void *arg)
69: {
70: struct mqtthdr *hdr;
71: struct tagSession *sess = (struct tagSession*) arg;
72:
73: ioTRACE(2);
74:
75: if (!sess)
76: return -1;
77:
78: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
79:
80: return 0;
81: }
82:
83: int
84: cmdPINGREQ(void *srv, void *arg)
85: {
86: struct mqtthdr *hdr;
87: struct tagSession *sess = (struct tagSession*) arg;
88:
89: ioTRACE(2);
90:
91: if (!sess)
92: return -1;
93:
94: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
95:
96: return 0;
97: }
98:
99: int
100: cmdCONNECT(void *srv, void *arg)
101: {
102: struct tagStore *store;
103: struct tagSession *sess = (struct tagSession*) arg;
104:
105: ioTRACE(2);
106:
107: if (!sess)
108: return -1;
109:
110: pthread_mutex_lock(&mtx_sess);
111: TAILQ_REMOVE(&Sessions, sess, sess_node);
112: pthread_mutex_unlock(&mtx_sess);
113:
114: if (call.FiniSessPUB)
115: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
116:
117: while ((store = TAILQ_FIRST(&sess->sess_sndqueue))) {
118: TAILQ_REMOVE(&sess->sess_sndqueue, store, st_node);
119:
120: if (store->st_subscr.sub_topic.msg_base)
121: free(store->st_subscr.sub_topic.msg_base);
122: if (store->st_subscr.sub_value.msg_base)
123: free(store->st_subscr.sub_value.msg_base);
124:
125: free(store);
126: }
127:
128: if (sess->sess_will.msg)
129: free(sess->sess_will.msg);
130: if (sess->sess_will.topic)
131: free(sess->sess_will.topic);
132:
133: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
134: sess->sess_addr, sess->sess_user);
135: return 0;
136: }
137:
138: int
139: cmdDISCONNECT(void *srv, void *arg)
140: {
141: struct tagSession *sess = (struct tagSession*) arg;
142:
143: ioTRACE(2);
144:
145: if (!sess)
146: return -1;
147:
148: pthread_mutex_lock(&mtx_sess);
149: TAILQ_REMOVE(&Sessions, sess, sess_node);
150: pthread_mutex_unlock(&mtx_sess);
151:
152: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
153: sess->sess_addr, sess->sess_user);
154: return 0;
155: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>