Annotation of mqtt/src/mqttd.c, revision 1.2.2.6
1.1 misho 1: #include "global.h"
1.2 misho 2: #include "mqttd.h"
3: #include "rtlm.h"
4: #include "utils.h"
5: #include "daemon.h"
6:
7:
8: io_enableDEBUG;
9:
1.2.2.1 misho 10: cfg_root_t cfg;
1.2 misho 11: sessions_t Sessions;
1.2.2.5 misho 12: pubs_t Pubs;
1.2 misho 13: sched_root_task_t *root;
14: sqlite3 *acc, *pub;
15: pthread_mutex_t mtx_sess, mtx_pub;
16: FILE *logg;
17: extern char compiled[], compiledby[], compilehost[];
18: static char szCfgName[MAXPATHLEN];
1.2.2.6 ! misho 19: volatile intptr_t Kill;
1.2 misho 20:
21:
22: static void
23: Usage(void)
24: {
25: printf( " -= MQTT Broker =- MQTT Service from ELWIX\n"
26: "=== %s@%s === Compiled: %s ===\n\n"
27: "\t-c <config>\tService config\n"
28: "\t-b\t\tBatch mode\n"
29: "\t-v\t\tVerbose (more -vvv, more verbose)\n"
30: "\t-h\t\tHelp! This screen\n\n",
31: compiledby, compilehost, compiled);
32: }
33:
34: static void
35: sigHand(int sig)
36: {
37: int stat;
1.2.2.6 ! misho 38: struct tagSession *sess;
1.2 misho 39:
40: switch (sig) {
41: case SIGHUP:
1.2.2.1 misho 42: cfgUnloadConfig(&cfg);
43: if (!cfgLoadConfig(szCfgName, &cfg)) {
1.2 misho 44: ioDEBUG(1, "Config reload OK!");
45: break;
46: }
47:
1.2.2.4 misho 48: ioLIBERR(cfg);
1.2 misho 49: case SIGTERM:
50: ioDEBUG(1, "Terminate MQTT service in progress");
51: Kill++;
1.2.2.6 ! misho 52:
! 53: SESS_LOCK;
! 54: TAILQ_FOREACH(sess, &Sessions, sess_node)
! 55: if (sess->sess_tid)
! 56: pthread_cancel(sess->sess_tid);
! 57: SESS_UNLOCK;
1.2 misho 58: break;
59: case SIGCHLD:
60: while (waitpid(-1, &stat, WNOHANG) > 0);
61: break;
62: case SIGPIPE:
63: break;
64: }
65: }
1.1 misho 66:
67:
68: int
69: main(int argc, char **argv)
70: {
1.2.2.1 misho 71: char ch, batch = 0;
1.2 misho 72: register int i;
73: int sock = -1, ret = 0;
74: struct passwd *pass;
75: struct sigaction sa;
1.2.2.1 misho 76: ait_val_t v;
1.2 misho 77:
78: TAILQ_INIT(&Sessions);
1.2.2.5 misho 79: TAILQ_INIT(&Pubs);
1.2 misho 80:
81: strlcpy(szCfgName, DEFAULT_CONFIG, sizeof szCfgName);
82: while ((ch = getopt(argc, argv, "hvbc:")) != -1)
83: switch (ch) {
84: case 'c':
85: strlcpy(szCfgName, optarg, sizeof szCfgName);
86: break;
87: case 'b':
88: batch++;
89: break;
90: case 'v':
91: io_incDebug;
92: break;
93: case 'h':
94: default:
95: Usage();
96: return 1;
97: }
98: argc -= optind;
99: argv += optind;
100:
1.2.2.1 misho 101: if (cfgLoadConfig(szCfgName, &cfg)) {
1.2 misho 102: printf("Error:: can't load #%d - %s\n", cfg_GetErrno(), cfg_GetError());
103: return 1;
104: }
105: pthread_mutex_init(&mtx_sess, NULL);
106: pthread_mutex_init(&mtx_pub, NULL);
107: openlog("mqttd", LOG_PID | LOG_CONS, LOG_DAEMON);
1.2.2.1 misho 108: /* load 3 plugins */
1.2 misho 109: for (i = 0; i < 3; i++)
110: if (!mqttLoadRTLM(&cfg, i)) {
111: printf("Error:: Can't load RTL module\n");
112: while (i--)
113: mqttUnloadRTLM(i);
1.2.2.1 misho 114: cfgUnloadConfig(&cfg);
1.2 misho 115: closelog();
116: pthread_mutex_destroy(&mtx_pub);
117: pthread_mutex_destroy(&mtx_sess);
118: return 2;
119: }
120: acc = call.OpenACC(&cfg);
121: if (!acc) {
122: ret = 3;
123: goto end;
124: }
125: pub = call.OpenPUB(&cfg);
126: if (!pub) {
127: ret = 3;
128: goto end;
129: }
130: logg = call.OpenLOG(&cfg);
131: if (!logg) {
132: ret = 3;
133: goto end;
134: }
135:
136: if (mqttMkDir(&cfg)) {
137: printf("Error:: in statedir #%d - %s\n", errno, strerror(errno));
138: ret = 3;
139: goto end;
140: }
141:
142: if (!batch)
143: switch (fork()) {
144: case -1:
145: printf("Error:: in fork() #%d - %s\n", errno, strerror(errno));
146: ret = 5;
147: goto end;
148: case 0:
149: setsid();
150:
151: ret = open("/dev/null", O_RDWR);
152: if (ret != -1) {
153: dup2(ret, STDIN_FILENO);
154: dup2(ret, STDOUT_FILENO);
155: dup2(ret, STDERR_FILENO);
156: close(ret);
157: }
158: ioDEBUG(2, "Welcome MQTT service into shadow land!");
159: break;
160: default:
161: ioDEBUG(2, "MQTT service go to shadow land ...");
162: sleep(1);
163: ret = 0;
164: goto end;
165: }
166: else
1.2.2.3 misho 167: ioDEBUG(1, "Start service in batch mode ...");
1.2 misho 168:
169: memset(&sa, 0, sizeof sa);
170: sigemptyset(&sa.sa_mask);
171: sa.sa_handler = sigHand;
172: sigaction(SIGHUP, &sa, NULL);
173: sigaction(SIGTERM, &sa, NULL);
174: sigaction(SIGCHLD, &sa, NULL);
175: sigaction(SIGPIPE, &sa, NULL);
1.2.2.2 misho 176: ioDEBUG(2, "Service is ready for starting engine ...");
1.2 misho 177:
178: if ((sock = srv_Socket(&cfg)) == -1) {
179: ret = 4;
180: goto end;
181: }
182:
1.2.2.1 misho 183: cfg_loadAttribute(&cfg, "mqttd", "user", &v, MQTT_USER);
184: pass = getpwnam(AIT_GET_STR(&v));
185: AIT_FREE_VAL(&v);
1.2 misho 186: if (pass) {
187: setgid(pass->pw_gid);
188: setuid(pass->pw_uid);
189: ioDEBUG(2, "Try to change group #%d and user #%d", pass->pw_gid, pass->pw_uid);
190: }
191:
192: if (!(root = schedBegin())) {
1.2.2.3 misho 193: ioLIBERR(sched);
1.2 misho 194: ret = 6;
195: goto end;
196: }
197:
1.2.2.3 misho 198: /* go catch the cat ... */
1.2 misho 199: Run(sock);
200:
201: schedEnd(&root);
1.2.2.3 misho 202: end: /* free all resources */
1.2 misho 203: srv_Close(sock);
204: call.CloseLOG(logg);
205: call.ClosePUB(pub);
206: call.CloseACC(acc);
207: for (i = 0; i < 3; i++)
208: mqttUnloadRTLM(i);
209: closelog();
1.2.2.1 misho 210: cfgUnloadConfig(&cfg);
1.2 misho 211: pthread_mutex_destroy(&mtx_pub);
212: pthread_mutex_destroy(&mtx_sess);
213: return ret;
1.1 misho 214: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>