Annotation of mqtt/src/daemon.c, revision 1.1.2.6
1.1.2.1 misho 1: #include "global.h"
2:
3:
1.1.2.5 misho 4: static void *
5: startSession(sched_task_t *task)
1.1.2.4 misho 6: {
1.1.2.6 ! misho 7: mqtt_msg_t *buf;
! 8: mqtt_cb_t cbs[MQTT_TYPE_MAX + 1] = { 0 };
! 9: int ret = 0;
! 10:
! 11: FTRACE(4);
! 12:
! 13: buf = mqtt_msgAlloc(USHRT_MAX);
! 14: if (!buf) {
! 15: syslog(LOG_ERR, "Error:: allocate message buf (%d) #%d - %s", (int) TASK_FD(task),
! 16: mqtt_GetErrno(), mqtt_GetError());
! 17: goto end;
! 18: }
! 19:
! 20: if (recv(TASK_FD(task), buf->msg_base, buf->msg_len, 0) == -1) {
! 21: syslog(LOG_ERR, "Error:: recv(%d) #%d - %s", (int) TASK_FD(task),
! 22: errno, strerror(errno));
! 23: mqtt_msgFree(&buf, 42);
! 24: goto end;
! 25: }
! 26:
! 27: ret = mqttDispatcher(cbs, buf);
! 28:
! 29: mqtt_msgFree(&buf, 42);
! 30:
! 31: switch (ret) {
! 32: case -1:
! 33: syslog(LOG_ERR, "Error:: in MQTT protocol #%d - %s", mqtt_GetErrno(), mqtt_GetError());
! 34: break;
! 35: default:
! 36: return NULL;
! 37: }
! 38: end: /* close client connection */
1.1.2.5 misho 39: close(TASK_FD(task));
40: VERB(1) syslog(LOG_DEBUG, "Close client %s with socket=%d",
41: (char*) TASK_ARG(task), (int) TASK_FD(task));
42: if (TASK_ARG(task))
43: free(TASK_ARG(task));
1.1.2.6 ! misho 44: return NULL;
1.1.2.4 misho 45: }
46:
47: /* ----------------------------------------------------------------------- */
48:
1.1.2.5 misho 49: static void *
50: thrSched(void *arg __unused)
51: {
52: FTRACE(1);
53:
54: schedRun(root, (intptr_t*) &Kill);
55: pthread_exit(NULL);
56: }
57:
1.1.2.2 misho 58: int
1.1.2.3 misho 59: Run(int sock)
1.1.2.2 misho 60: {
1.1.2.4 misho 61: io_sockaddr_t sa;
62: socklen_t sslen = sizeof sa.ss;
63: int cli;
1.1.2.5 misho 64: char *str = NULL, szAddr[STRSIZ] = { 0 };
65: pthread_t tid;
1.1.2.4 misho 66:
1.1.2.2 misho 67: FTRACE(1);
68:
1.1.2.5 misho 69: if (pthread_create(&tid, NULL, thrSched, NULL)) {
70: syslog(LOG_ERR, "Error:: thread scheduler #%d - %s", errno, strerror(errno));
71: return -1;
72: } else
73: pthread_detach(tid);
74: VERB(2) syslog(LOG_DEBUG, "Run scheduler management thread");
75:
1.1.2.3 misho 76: if (listen(sock, SOMAXCONN) == -1) {
1.1.2.4 misho 77: syslog(LOG_ERR, "Error:: listen(%d) #%d - %s\n", sock, errno, strerror(errno));
1.1.2.5 misho 78: pthread_cancel(tid);
1.1.2.3 misho 79: return -1;
80: }
81:
1.1.2.4 misho 82: while (!Kill) {
83: if ((cli = accept(sock, &sa.sa, &sslen)) == -1) {
84: syslog(LOG_ERR, "Error:: accept() #%d - %s", errno, strerror(errno));
85: continue;
86: } else
87: VERB(1) {
88: switch (sa.sa.sa_family) {
89: case AF_INET:
90: inet_ntop(AF_INET, &sa.sin.sin_addr, szAddr, sslen);
91: snprintf(szAddr, sizeof szAddr, "%s:%d",
92: szAddr, ntohs(sa.sin.sin_port));
93: break;
94: case AF_INET6:
95: inet_ntop(AF_INET6, &sa.sin6.sin6_addr, szAddr, sslen);
96: snprintf(szAddr, sizeof szAddr, "%s:%d",
97: szAddr, ntohs(sa.sin6.sin6_port));
98: break;
99: case AF_LOCAL:
100: strlcpy(szAddr, sa.sun.sun_path, sizeof szAddr);
101: break;
102: default:
103: close(cli);
104: syslog(LOG_ERR, "Error:: unsupported address type %d",
105: sa.sa.sa_family);
106: continue;
107: }
1.1.2.5 misho 108: str = strdup(szAddr);
109: syslog(LOG_DEBUG, "Connected client with socket=%d from %s", cli, str);
1.1.2.4 misho 110: }
111:
1.1.2.5 misho 112: if (!schedRead(root, startSession, str, cli)) {
1.1.2.4 misho 113: close(cli);
114: VERB(1) syslog(LOG_DEBUG, "Terminated client with socket=%d", cli);
1.1.2.5 misho 115: if (str)
116: free(str);
1.1.2.4 misho 117: }
118: }
119:
1.1.2.2 misho 120: return 0;
121: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>