File:  [ELWIX - Embedded LightWeight unIX -] / mqtt / src / mqttd.c
Revision 1.2.2.5: download - view: text, annotated - select for diffs - revision graph
Tue Apr 24 08:06:09 2012 UTC (12 years, 8 months ago) by misho
Branches: mqtt1_1
add new Pubs structure
fix error messages
optimize srv_socket

#include "global.h"
#include "mqttd.h"
#include "rtlm.h"
#include "utils.h"
#include "daemon.h"


io_enableDEBUG;

cfg_root_t cfg;
sessions_t Sessions;
pubs_t Pubs;
sched_root_task_t *root;
sqlite3 *acc, *pub;
pthread_mutex_t mtx_sess, mtx_pub;
FILE *logg;
extern char compiled[], compiledby[], compilehost[];
static char szCfgName[MAXPATHLEN];
intptr_t Kill;


static void
Usage(void)
{
	printf(	" -= MQTT Broker =- MQTT Service from ELWIX\n"
		"=== %s@%s === Compiled: %s ===\n\n"
		"\t-c <config>\tService config\n"
		"\t-b\t\tBatch mode\n"
		"\t-v\t\tVerbose (more -vvv, more verbose)\n"
		"\t-h\t\tHelp! This screen\n\n", 
		compiledby, compilehost, compiled);
}

static void
sigHand(int sig)
{
	int stat;

	switch (sig) {
		case SIGHUP:
			cfgUnloadConfig(&cfg);
			if (!cfgLoadConfig(szCfgName, &cfg)) {
				ioDEBUG(1, "Config reload OK!");
				break;
			}

			ioLIBERR(cfg);
		case SIGTERM:
			ioDEBUG(1, "Terminate MQTT service in progress");
			Kill++;
			break;
		case SIGCHLD:
			while (waitpid(-1, &stat, WNOHANG) > 0);
			break;
		case SIGPIPE:
			break;
	}
}


int
main(int argc, char **argv)
{
	char ch, batch = 0;
	register int i;
	int sock = -1, ret = 0;
	struct passwd *pass;
	struct sigaction sa;
	ait_val_t v;

	TAILQ_INIT(&Sessions);
	TAILQ_INIT(&Pubs);

	strlcpy(szCfgName, DEFAULT_CONFIG, sizeof szCfgName);
	while ((ch = getopt(argc, argv, "hvbc:")) != -1)
		switch (ch) {
			case 'c':
				strlcpy(szCfgName, optarg, sizeof szCfgName);
				break;
			case 'b':
				batch++;
				break;
			case 'v':
				io_incDebug;
				break;
			case 'h':
			default:
				Usage();
				return 1;
		}
	argc -= optind;
	argv += optind;

	if (cfgLoadConfig(szCfgName, &cfg)) {
		printf("Error:: can't load #%d - %s\n", cfg_GetErrno(), cfg_GetError());
		return 1;
	}
	pthread_mutex_init(&mtx_sess, NULL);
	pthread_mutex_init(&mtx_pub, NULL);
	openlog("mqttd", LOG_PID | LOG_CONS, LOG_DAEMON);
	/* load 3 plugins */
	for (i = 0; i < 3; i++)
		if (!mqttLoadRTLM(&cfg, i)) {
			printf("Error:: Can't load RTL module\n");
			while (i--)
				mqttUnloadRTLM(i);
			cfgUnloadConfig(&cfg);
			closelog();
			pthread_mutex_destroy(&mtx_pub);
			pthread_mutex_destroy(&mtx_sess);
			return 2;
		}
	acc = call.OpenACC(&cfg);
	if (!acc) {
		ret = 3;
		goto end;
	}
	pub = call.OpenPUB(&cfg);
	if (!pub) {
		ret = 3;
		goto end;
	}
	logg = call.OpenLOG(&cfg);
	if (!logg) {
		ret = 3;
		goto end;
	}

	if (mqttMkDir(&cfg)) {
		printf("Error:: in statedir #%d - %s\n", errno, strerror(errno));
		ret = 3;
		goto end;
	}

	if (!batch)
		switch (fork()) {
			case -1:
				printf("Error:: in fork() #%d - %s\n", errno, strerror(errno));
				ret = 5;
				goto end;
			case 0:
				setsid();

				ret = open("/dev/null", O_RDWR);
				if (ret != -1) {
					dup2(ret, STDIN_FILENO);
					dup2(ret, STDOUT_FILENO);
					dup2(ret, STDERR_FILENO);
					close(ret);
				}
				ioDEBUG(2, "Welcome MQTT service into shadow land!");
				break;
			default:
				ioDEBUG(2, "MQTT service go to shadow land ...");
				sleep(1);
				ret = 0;
				goto end;
		}
	else
		ioDEBUG(1, "Start service in batch mode ...");

	memset(&sa, 0, sizeof sa);
	sigemptyset(&sa.sa_mask);
	sa.sa_handler = sigHand;
	sigaction(SIGHUP, &sa, NULL);
	sigaction(SIGTERM, &sa, NULL);
	sigaction(SIGCHLD, &sa, NULL);
	sigaction(SIGPIPE, &sa, NULL);
	ioDEBUG(2, "Service is ready for starting engine ...");

	if ((sock = srv_Socket(&cfg)) == -1) {
		ret = 4;
		goto end;
	}

	cfg_loadAttribute(&cfg, "mqttd", "user", &v, MQTT_USER);
	pass = getpwnam(AIT_GET_STR(&v));
	AIT_FREE_VAL(&v);
	if (pass) {
		setgid(pass->pw_gid);
		setuid(pass->pw_uid);
		ioDEBUG(2, "Try to change group #%d and user #%d", pass->pw_gid, pass->pw_uid);
	}

	if (!(root = schedBegin())) {
		ioLIBERR(sched);
		ret = 6;
		goto end;
	}

	/* go catch the cat ... */
	Run(sock);

	schedEnd(&root);
end:	/* free all resources */
	srv_Close(sock);
	call.CloseLOG(logg);
	call.ClosePUB(pub);
	call.CloseACC(acc);
	for (i = 0; i < 3; i++)
		mqttUnloadRTLM(i);
	closelog();
	cfgUnloadConfig(&cfg);
	pthread_mutex_destroy(&mtx_pub);
	pthread_mutex_destroy(&mtx_sess);
	return ret;
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>