1: #include "global.h"
2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "mqttd_calls.h"
5:
6:
7: int
8: cmdPUBLISH(void *srv, void *arg)
9: {
10: struct mqtthdr *hdr;
11: struct tagSession *sess = (struct tagSession*) arg;
12:
13: ioTRACE(2);
14:
15: if (!sess)
16: return -1;
17:
18: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
19: switch (hdr->mqtt_msg.qos) {
20: case MQTT_QOS_ONCE:
21: break;
22: case MQTT_QOS_ACK:
23: break;
24: case MQTT_QOS_EXACTLY:
25: break;
26: default:
27: ioDEBUG(1, "Error:: Unknown QoS %d - rejected publishing request",
28: hdr->mqtt_msg.qos);
29: return 0;
30: }
31:
32: return 0;
33: }
34:
35: int
36: cmdPUBREL(void *srv, void *arg)
37: {
38: struct mqtthdr *hdr;
39: struct tagSession *sess = (struct tagSession*) arg;
40:
41: ioTRACE(2);
42:
43: if (!sess)
44: return -1;
45:
46: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
47:
48: return 0;
49: }
50:
51: int
52: cmdSUBSCRIBE(void *srv, void *arg)
53: {
54: struct mqtthdr *hdr;
55: struct tagSession *sess = (struct tagSession*) arg;
56:
57: ioTRACE(2);
58:
59: if (!sess)
60: return -1;
61:
62: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
63:
64: return 0;
65: }
66:
67: int
68: cmdUNSUBSCRIBE(void *srv, void *arg)
69: {
70: struct mqtthdr *hdr;
71: struct tagSession *sess = (struct tagSession*) arg;
72:
73: ioTRACE(2);
74:
75: if (!sess)
76: return -1;
77:
78: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
79:
80: return 0;
81: }
82:
83: int
84: cmdPINGREQ(void *srv, void *arg)
85: {
86: struct mqtthdr *hdr;
87: struct tagSession *sess = (struct tagSession*) arg;
88: int siz = 0;
89:
90: ioTRACE(2);
91:
92: if (!sess)
93: return -1;
94:
95: hdr = (struct mqtthdr*) sess->sess_buf->msg_base;
96: siz = mqtt_msgPINGRESP(sess->sess_buf);
97: if (siz == -1) {
98: ioDEBUG(5, "Error:: in msgPINGRESP #%d - %s", mqtt_GetErrno(), mqtt_GetError());
99: return 0;
100: }
101: if ((siz = send(sess->sess_sock, sess->sess_buf->msg_base, siz, 0)) == -1) {
102: ioSYSERR(0);
103: return 0;
104: } else
105: ioDEBUG(5, "Sended %d bytes.", siz);
106:
107: return 0;
108: }
109:
110: int
111: cmdCONNECT(void *srv, void *arg)
112: {
113: struct tagStore *store;
114: struct tagSession *sess = (struct tagSession*) arg;
115: register int i;
116:
117: ioTRACE(2);
118:
119: if (!sess)
120: return -1;
121:
122: #if 0
123: SESS_LOCK;
124: TAILQ_REMOVE(&Sessions, sess, sess_node);
125: SESS_UNLOCK;
126:
127: if (call.FiniSessPUB)
128: call.FiniSessPUB(&cfg, pub, sess->sess_cid, sess->sess_user, "%");
129:
130: SESS_ELEM_LOCK(sess);
131: for (i = 0; i < MQTT_QOS_RESERVED; i++)
132: while ((store = SLIST_FIRST(&sess->sess_txque[i]))) {
133: SLIST_REMOVE_HEAD(&sess->sess_txque[i], st_node);
134:
135: if (store->st_subscr.sub_topic.msg_base)
136: free(store->st_subscr.sub_topic.msg_base);
137: if (store->st_subscr.sub_value.msg_base)
138: free(store->st_subscr.sub_value.msg_base);
139:
140: free(store);
141: }
142: SESS_ELEM_UNLOCK(sess);
143:
144: if (sess->sess_will.msg)
145: free(sess->sess_will.msg);
146: if (sess->sess_will.topic)
147: free(sess->sess_will.topic);
148:
149: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
150: sess->sess_addr, sess->sess_user);
151: #endif
152: return 0;
153: }
154:
155: int
156: cmdDISCONNECT(void *srv, void *arg)
157: {
158: struct tagSession *sess = (struct tagSession*) arg;
159:
160: ioTRACE(2);
161:
162: if (!sess)
163: return -1;
164:
165: ioDEBUG(5, "Exec DISCONNECT session");
166: SESS_LOCK;
167: TAILQ_REMOVE(&Sessions, sess, sess_node);
168: SESS_UNLOCK;
169:
170: call.LOG(logg, "Session %s stopped from %s for user %s.\n", sess->sess_cid,
171: sess->sess_addr, sess->sess_user);
172: return 0;
173: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>