File:  [ELWIX - Embedded LightWeight unIX -] / libaitrpc / src / srv.c
Revision 1.9.2.13: download - view: text, annotated - select for diffs - revision graph
Wed May 16 09:09:42 2012 UTC (12 years, 1 month ago) by misho
Branches: rpc3_3
Diff to: branchpoint 1.9: preferred, colored
add acceptBLOBs

/*************************************************************************
* (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
*  by Michael Pounov <misho@openbsd-bg.org>
*
* $Author: misho $
* $Id: srv.c,v 1.9.2.13 2012/05/16 09:09:42 misho Exp $
*
**************************************************************************
The ELWIX and AITNET software is distributed under the following
terms:

All of the documentation and software included in the ELWIX and AITNET
Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>

Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
	by Michael Pounov <misho@elwix.org>.  All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
   notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
   notice, this list of conditions and the following disclaimer in the
   documentation and/or other materials provided with the distribution.
3. All advertising materials mentioning features or use of this software
   must display the following acknowledgement:
This product includes software developed by Michael Pounov <misho@elwix.org>
ELWIX - Embedded LightWeight unIX and its contributors.
4. Neither the name of AITNET nor the names of its contributors
   may be used to endorse or promote products derived from this software
   without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
*/
#include "global.h"


static void *
closeClient(sched_task_t *task)
{
	rpc_cli_t *c = TASK_ARG(task);
	rpc_srv_t *s = c->cli_parent;

	schedCancelby(s->srv_root, taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);

	/* close client socket */
	if (TASK_VAL(task))
		shutdown(c->cli_sock, SHUT_RDWR);
	close(c->cli_sock);

	/* free buffer */
	AIT_FREE_VAL(&c->cli_buf);

	io_arrayDel(s->srv_clients, c->cli_id, 42);
	return NULL;
}

static void *
txPacket(sched_task_t *task)
{
	rpc_cli_t *c = TASK_ARG(task);
	rpc_srv_t *s = c->cli_parent;
	rpc_func_t *f = NULL;
	u_char buf[USHRT_MAX] = { 0 };
	struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
	int ret, wlen = sizeof(struct tagRPCCall);

	/* copy RPC header */
	memcpy(buf, TASK_DATA(task), wlen);

	if (rpc->call_argc) {
		f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
		if (!f) {
			rpc->call_argc ^= rpc->call_argc;
			rpc->call_rep.ret = RPC_ERROR(-1);
			rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
		} else {
			rpc->call_argc = htons(io_arraySize(f->func_vars));
			/* Go Encapsulate variables */
			ret = io_vars2buffer(buf + wlen, sizeof buf - wlen, f->func_vars);
			/* Free return values */
			io_clrVars(f->func_vars);
			if (ret == -1) {
				rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
				rpc->call_argc ^= rpc->call_argc;
				rpc->call_rep.ret = RPC_ERROR(-1);
				rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
			} else
				wlen += ret;
		}
	}

	rpc->call_len = htons(wlen);

	/* calculate CRC */
	rpc->call_crc ^= rpc->call_crc;
	rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));

	/* send reply */
	ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
	if (ret == -1 || ret != wlen) {
		/* close connection */
		schedEvent(TASK_ROOT(task), closeClient, c, 42, NULL, 0);
	}

	return NULL;
}

static void *
execCall(sched_task_t *task)
{
	rpc_cli_t *c = TASK_ARG(task);
	rpc_srv_t *s = c->cli_parent;
	rpc_func_t *f = NULL;
	array_t *arr = NULL;
	u_char *buf = AIT_GET_BUF(&c->cli_buf) + TASK_VAL(task);
	struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
	int argc = ntohs(rpc->call_argc);

	/* Go decapsulate variables ... */
	if (argc) {
		arr = io_buffer2vars(buf + sizeof(struct tagRPCCall), 
				AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall), 
				argc, 1);
		if (!arr) {
			rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
			rpc->call_argc ^= rpc->call_argc;
			rpc->call_rep.ret = RPC_ERROR(-1);
			rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
			return NULL;
		}
	} else
		arr = NULL;

	if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
		rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
		rpc->call_argc ^= rpc->call_argc;
		rpc->call_rep.ret = RPC_ERROR(-1);
		rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
	} else {
		/* if client doesn't want reply */
		argc = rpc->call_req.flags & RPC_NOREPLY;
		rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(f, rpc, arr));
		if (rpc->call_rep.ret == htonl(-1)) {
			rpc->call_rep.eno = RPC_ERROR(errno);
			rpc->call_argc ^= rpc->call_argc;
		} else {
			rpc->call_rep.eno ^= rpc->call_rep.eno;
			if (argc) {
				/* without reply */
				io_clrVars(f->func_vars);
				rpc->call_argc ^= rpc->call_argc;
			} else {
				/* reply */
				rpc->call_argc = htons(io_arraySize(f->func_vars));
			}
		}
	}

	io_arrayDestroy(&arr);
	return NULL;
}

static void *
rxPacket(sched_task_t *task)
{
	rpc_cli_t *c = TASK_ARG(task);
	rpc_srv_t *s = c->cli_parent;
	u_char *buf = AIT_GET_BUF(&c->cli_buf);
	int len, rlen, noreply;
	u_short crc, off = 0;
	struct tagRPCCall *rpc;

	memset(buf, 0, AIT_LEN(&c->cli_buf));
	rlen = recv(TASK_FD(task), buf, AIT_LEN(&c->cli_buf), 0);
	if (rlen < 1) {
		/* close connection */
		schedEvent(TASK_ROOT(task), closeClient, c, 0, NULL, 0);
		return NULL;
	}

	do {
		/* check RPC packet */
		if (rlen < sizeof(struct tagRPCCall)) {
			rpc_SetErr(ERPCMISMATCH, "Too short RPC packet");

			schedReadSelf(task);
			return NULL;
		} else
			rpc = (struct tagRPCCall*) (buf + off);

		len = ntohs(rpc->call_len);
		rlen -= len;

		/* check integrity of packet */
		crc = ntohs(rpc->call_crc);
		rpc->call_crc ^= rpc->call_crc;
		if (crc != crcFletcher16((u_short*) (buf + off), len / 2)) {
			rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");

			off += len;
			if (rlen < 1)
				break;
			else
				continue;
		}

		noreply = rpc->call_req.flags & RPC_NOREPLY;

		/* check RPC packet session info */
		if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
			rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
			rpc->call_argc ^= rpc->call_argc;
			rpc->call_rep.ret = RPC_ERROR(-1);
			rpc->call_rep.eno = RPC_ERROR(errno);
		} else {
			/* execute RPC call */
			schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, NULL, 0);
		}

		/* send RPC reply */
		if (!noreply)
			schedWrite(TASK_ROOT(task), txPacket, TASK_ARG(task), TASK_FD(task), rpc, len);

		off += len;
	} while (rlen > 0);

	/* lets get next packet */
	schedReadSelf(task);
	return NULL;
}

static void *
acceptClients(sched_task_t *task)
{
	rpc_srv_t *srv = TASK_ARG(task);
	rpc_cli_t *c = NULL;
	register int i;
	socklen_t salen = sizeof(io_sockaddr_t);

	/* check free slots for connect */
	for (i = 0; i < io_arraySize(srv->srv_clients) && 
			(c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++);
	if (c)	/* no more free slots! */
		goto end;
	c = malloc(sizeof(rpc_cli_t));
	if (!c) {
		LOGERR;
		srv->srv_kill = 1;
		return NULL;
	} else {
		memset(c, 0, sizeof(rpc_cli_t));
		io_arraySet(srv->srv_clients, i, c);
		c->cli_id = i;
		c->cli_parent = srv;
	}

	/* alloc empty buffer */
	AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);

	/* accept client */
	c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
	if (c->cli_sock == -1) {
		LOGERR;
		AIT_FREE_VAL(&c->cli_buf);
		io_arrayDel(srv->srv_clients, i, 42);
		goto end;
	} else
		fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);

	schedRead(TASK_ROOT(task), rxPacket, c, c->cli_sock, NULL, 0);
end:
	schedReadSelf(task);
	return NULL;
}

/* ------------------------------------------------------ */

#if 0
static void *
txBLOB(sched_task_t *task)
{
	u_char *buf = TASK_DATA(task);
	struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
	int wlen = sizeof(struct tagBLOBHdr);

	/* calculate CRC */
	blob->hdr_crc ^= blob->hdr_crc;
	blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));

	/* send reply */
	wlen = send(TASK_FD(task), buf, wlen, 0);
	if (wlen == -1)
		LOGERR;
	else if (wlen != sizeof(struct tagBLOBHdr))
		rpc_SetErr(EPROCUNAVAIL, "RPC reply, should be send %d bytes, "
				"really sended %d bytes", sizeof(struct tagBLOBHdr), wlen);

	return NULL;
}

static void *
rxBLOB(sched_task_t *task)
{
	rpc_cli_t *c = TASK_ARG(task);
	rpc_srv_t *s = c->cli_parent;
	rpc_blob_t *b;
	u_char *buf = TASK_DATA(task);
	struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
	int rlen;
	u_short crc;
	struct timespec ts;

	/* check for disable service at this moment? */
	if (!s || s->srv_blob.state == disable) {
		usleep(100000);
#ifdef HAVE_PTHREAD_YIELD
		pthread_yield();
#endif
		schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
				TASK_DATA(task), TASK_DATLEN(task));
		return NULL;
	}

	memset(buf, 0, TASK_DATLEN(task));
	rlen = recv(TASK_FD(task), buf, TASK_DATLEN(task), 0);
	if (rlen == -1) {
		LOGERR;
		c->cli_kill = kill;
		return NULL;
	} else if (!rlen) { /* receive EOF */
		c->cli_kill = kill;
		return NULL;
	}

	if (rlen < sizeof(struct tagBLOBHdr)) {
		rpc_SetErr(ERPCMISMATCH, "Too short BLOB packet");
		schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
				TASK_DATA(task), TASK_DATLEN(task));
		return NULL;
	}

	/* check integrity of packet */
	crc = ntohs(blob->hdr_crc);
	blob->hdr_crc ^= blob->hdr_crc;
	if (crc != crcFletcher16((u_short*) buf, rlen / 2)) {
		rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
		schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
				TASK_DATA(task), TASK_DATLEN(task));
		return NULL;
	}

	/* check RPC packet session info */
	if ((crc = rpc_chkPktSession(&blob->hdr_session, &s->srv_session))) {
		rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
		blob->hdr_cmd = error;
		goto end;
	} else {
		/* change socket timeout from last packet */
		ts.tv_sec = blob->hdr_session.sess_timeout;
		ts.tv_nsec = 0;
		schedPolling(TASK_ROOT(task), &ts, NULL);
	}

	/* Go to proceed packet ... */
	switch (blob->hdr_cmd) {
		case get:
			if (!(b = rpc_srv_getBLOB(s, ntohl(blob->hdr_var)))) {
				rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob->hdr_var));
				blob->hdr_cmd = no;
				blob->hdr_ret = RPC_ERROR(-1);
				break;
			} else
				blob->hdr_len = htonl(b->blob_len);

			if (rpc_srv_blobMap(s, b) != -1) {
				/* deliver BLOB variable to client */
				blob->hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
				rpc_srv_blobUnmap(b);
			} else {
				blob->hdr_cmd = error;
				blob->hdr_ret = RPC_ERROR(-1);
			}
			break;
		case set:
			if ((b = rpc_srv_registerBLOB(s, ntohl(blob->hdr_len)))) {
				/* set new BLOB variable for reply :) */
				blob->hdr_var = htonl(b->blob_var);

				/* receive BLOB from client */
				blob->hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
				rpc_srv_blobUnmap(b);
			} else {
				blob->hdr_cmd = error;
				blob->hdr_ret = RPC_ERROR(-1);
			}
			break;
		case unset:
			if (rpc_srv_unregisterBLOB(s, blob->hdr_var) == -1) {
				blob->hdr_cmd = error;
				blob->hdr_ret = RPC_ERROR(-1);
			}
			break;
		default:
			rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob->hdr_cmd);
			blob->hdr_cmd = error;
			blob->hdr_ret = RPC_ERROR(-1);
	}

end:
	schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), 
			TASK_DATA(task), TASK_DATLEN(task));
	schedRead(TASK_ROOT(task), rxBLOB, TASK_ARG(task), TASK_FD(task), 
			TASK_DATA(task), TASK_DATLEN(task));
	return NULL;
}
#endif

static void *
acceptBLOBClients(sched_task_t *task)
{
	rpc_srv_t *srv = TASK_ARG(task);
	rpc_cli_t *c = NULL;
	register int i;
	socklen_t salen = sizeof(io_sockaddr_t);

	/* check free slots for connect */
	for (i = 0; i < io_arraySize(srv->srv_blob.clients) && 
			(c = io_array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
	if (c)	/* no more free slots! */
		goto end;
	c = malloc(sizeof(rpc_cli_t));
	if (!c) {
		LOGERR;
		srv->srv_kill = srv->srv_blob.kill = 1;
		return NULL;
	} else {
		memset(c, 0, sizeof(rpc_cli_t));
		io_arraySet(srv->srv_blob.clients, i, c);
		c->cli_id = i;
		c->cli_parent = srv;
	}

	/* alloc empty buffer */
	AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);

	/* accept client */
	c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
	if (c->cli_sock == -1) {
		LOGERR;
		AIT_FREE_VAL(&c->cli_buf);
		io_arrayDel(srv->srv_blob.clients, i, 42);
		goto end;
	} else
		fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);

//	schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
end:
	schedReadSelf(task);
	return NULL;
}

/* ------------------------------------------------------ */

/*
 * rpc_srv_initBLOBServer() - Init & create BLOB Server
 *
 * @srv = RPC server instance
 * @Port = Port for bind server, if Port == 0 default port is selected
 * @diskDir = Disk place for BLOB file objects
 * return: -1 == error or 0 bind and created BLOB server instance
 */
int
rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
{
	int n = 1;
	io_sockaddr_t sa;

	if (!srv || srv->srv_kill) {
		rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
		return -1;
	}

	memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
	if (access(diskDir, R_OK | W_OK) == -1) {
		LOGERR;
		return -1;
	} else
		AIT_SET_STR(&srv->srv_blob.dir, diskDir);

	/* init blob list */
	TAILQ_INIT(&srv->srv_blob.blobs);

	srv->srv_blob.server.cli_parent = srv;

	memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(io_sockaddr_t));
	switch (sa.sa.sa_family) {
		case AF_INET:
			sa.sin.sin_port = htons(Port ? Port : ntohs(sa.sin.sin_port) + 1);
			break;
		case AF_INET6:
			sa.sin6.sin6_port = htons(Port ? Port : ntohs(sa.sin6.sin6_port) + 1);
			break;
		case AF_LOCAL:
			strlcat(sa.sun.sun_path, ".blob", sizeof sa.sun.sun_path);
			break;
		default:
			AIT_FREE_VAL(&srv->srv_blob.dir);
			return -1;
	}

	/* create BLOB server socket */
	srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
	if (srv->srv_blob.server.cli_sock == -1) {
		LOGERR;
		AIT_FREE_VAL(&srv->srv_blob.dir);
		return -1;
	}
	if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
		LOGERR;
		close(srv->srv_blob.server.cli_sock);
		AIT_FREE_VAL(&srv->srv_blob.dir);
		return -1;
	}
	n = srv->srv_netbuf;
	if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
		LOGERR;
		close(srv->srv_blob.server.cli_sock);
		AIT_FREE_VAL(&srv->srv_blob.dir);
		return -1;
	}
	if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
		LOGERR;
		close(srv->srv_blob.server.cli_sock);
		AIT_FREE_VAL(&srv->srv_blob.dir);
		return -1;
	}
	if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa, 
				srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
		LOGERR;
		close(srv->srv_blob.server.cli_sock);
		AIT_FREE_VAL(&srv->srv_blob.dir);
		return -1;
	}

	/* allocate pool for concurent blob clients */
	srv->srv_blob.clients = io_arrayInit(io_arraySize(srv->srv_clients));
	if (!srv->srv_blob.clients) {
		rpc_SetErr(io_GetErrno(), "%s", io_GetError());
		close(srv->srv_blob.server.cli_sock);
		AIT_FREE_VAL(&srv->srv_blob.dir);
		return -1;
	}

	/* init blob scheduler */
	srv->srv_blob.root = schedBegin();
	if (!srv->srv_blob.root) {
		rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
		io_arrayDestroy(&srv->srv_blob.clients);
		close(srv->srv_blob.server.cli_sock);
		AIT_FREE_VAL(&srv->srv_blob.dir);
		return -1;
	}

	return 0;
}

/*
 * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
 *
 * @srv = RPC Server instance
 * return: none
 */
void
rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
{
	rpc_cli_t *c;
	register int i;
	rpc_blob_t *b, *tmp;

	if (!srv)
		return;

	/* close all clients connections & server socket */
	for (i = 0; i < io_arraySize(srv->srv_blob.clients); i++) {
		c = io_array(srv->srv_blob.clients, i, rpc_cli_t*);
		if (c) {
			shutdown(c->cli_sock, SHUT_RDWR);
			close(c->cli_sock);

			io_arrayDel(srv->srv_blob.clients, i, 42);
		}
	}
	io_arrayDestroy(&srv->srv_blob.clients);

	close(srv->srv_blob.server.cli_sock);

	/* detach blobs */
	TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
		TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);

		rpc_srv_blobFree(srv, b);
		free(b);
	}

	schedEnd(&srv->srv_blob.root);
	AIT_FREE_VAL(&srv->srv_blob.dir);

	srv->srv_blob.kill = 1;
}

/*
 * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
 *
 * @srv = RPC Server instance
 * return: -1 error or 0 ok, infinite loop ...
 */
int
rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
{
	if (!srv || srv->srv_kill) {
		rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
		return -1;
	}

	fcntl(srv->srv_blob.server.cli_sock, F_SETFL, 
			fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);

	if (listen(srv->srv_blob.server.cli_sock, io_arraySize(srv->srv_blob.clients)) == -1) {
		LOGERR;
		return -1;
	}

	if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
				srv->srv_blob.server.cli_sock, NULL, 0)) {
		rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
		return -1;
	}

#if 0
		for (c = srv->srv_blob.clients, i = 0; i < srv->srv_numcli && c; i++, c++)
			if (!c->cli_sa.sa.sa_family)
				break;
		if (i >= srv->srv_numcli) {
#ifdef HAVE_PTHREAD_YIELD
			pthread_yield();
#endif
			usleep(1000000);
			continue;
		}

		c->cli_sock = accept(srv->srv_blob.server.cli_sock, &c->cli_sa.sa, &salen);
		if (c->cli_sock == -1) {
			LOGERR;
			continue;
		} else
			c->cli_parent = srv;
	}
#endif

	/* main rpc loop */
	schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
	return 0;
}


/*
 * rpc_srv_initServer() - Init & create RPC Server
 *
 * @regProgID = ProgramID for authentication & recognition
 * @regProcID = ProcessID for authentication & recognition
 * @concurentClients = Concurent clients at same time to this server
 * @netBuf = Network buffer length, if =0 == BUFSIZ (also meaning max RPC packet)
 * @csHost = Host name or address for bind server, if NULL any address
 * @Port = Port for bind server, if Port == 0 default port is selected
 * return: NULL == error or !=NULL bind and created RPC server instance
 */
rpc_srv_t *
rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients, 
		int netBuf, const char *csHost, u_short Port)
{
	int n = 1;
	rpc_srv_t *srv = NULL;
	io_sockaddr_t sa;

	if (!concurentClients || !regProgID) {
		rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
		return NULL;
	}
	if (!io_gethostbyname(csHost, Port, &sa))
		return NULL;
	if (!Port)
		Port = RPC_DEFPORT;
	if (!netBuf)
		netBuf = BUFSIZ;
	else
		netBuf = io_align(netBuf, 1);	/* align netBuf length */

	srv = malloc(sizeof(rpc_srv_t));
	if (!srv) {
		LOGERR;
		return NULL;
	} else
		memset(srv, 0, sizeof(rpc_srv_t));

	srv->srv_netbuf = netBuf;
	srv->srv_session.sess_version = RPC_VERSION;
	srv->srv_session.sess_program = regProgID;
	srv->srv_session.sess_process = regProcID;

	srv->srv_server.cli_parent = srv;
	memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);

	/* init functions list */
	TAILQ_INIT(&srv->srv_funcs);

	/* init scheduler */
	srv->srv_root = schedBegin();
	if (!srv->srv_root) {
		rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
		free(srv);
		return NULL;
	}

	/* init pool for clients */
	srv->srv_clients = io_arrayInit(concurentClients);
	if (!srv->srv_clients) {
		rpc_SetErr(io_GetErrno(), "%s", io_GetError());
		schedEnd(&srv->srv_root);
		free(srv);
		return NULL;
	}

	/* create server socket */
	srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
	if (srv->srv_server.cli_sock == -1) {
		LOGERR;
		io_arrayDestroy(&srv->srv_clients);
		schedEnd(&srv->srv_root);
		free(srv);
		return NULL;
	}
	if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
		LOGERR;
		goto err;
	}
	n = srv->srv_netbuf;
	if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
		LOGERR;
		goto err;
	}
	if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
		LOGERR;
		goto err;
	}
	if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa, 
				srv->srv_server.cli_sa.sa.sa_len) == -1) {
		LOGERR;
		goto err;
	}

	return srv;
err:	/* error condition */
	close(srv->srv_server.cli_sock);
	io_arrayDestroy(&srv->srv_clients);
	schedEnd(&srv->srv_root);
	free(srv);
	return NULL;
}

/*
 * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
 *
 * @psrv = RPC Server instance
 * return: none
 */
void
rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
{
	rpc_cli_t *c;
	register int i;
	rpc_func_t *f, *tmp;

	if (!psrv || !*psrv) {
		rpc_SetErr(EINVAL, "Can`t destroy server because parameter is null!");
		return;
	}

	if (!(*psrv)->srv_blob.kill)
		rpc_srv_endBLOBServer(*psrv);

	/* close all clients connections & server socket */
	for (i = 0; i < io_arraySize((*psrv)->srv_clients); i++) {
		c = io_array((*psrv)->srv_clients, i, rpc_cli_t*);
		if (c) {
			shutdown(c->cli_sock, SHUT_RDWR);
			close(c->cli_sock);

			io_arrayDel((*psrv)->srv_clients, i, 42);
		}
	}
	io_arrayDestroy(&(*psrv)->srv_clients);

	close((*psrv)->srv_server.cli_sock);

	/* detach exported calls */
	TAILQ_FOREACH_SAFE(f, &(*psrv)->srv_funcs, func_node, tmp) {
		TAILQ_REMOVE(&(*psrv)->srv_funcs, f, func_node);

		io_freeVars(&f->func_vars);
		AIT_FREE_VAL(&f->func_name);
		free(f);
	}

	schedEnd(&(*psrv)->srv_root);
	free(*psrv);
	*psrv = NULL;
}

/*
 * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
 *
 * @srv = RPC Server instance
 * return: -1 error or 0 ok, infinite loop ...
 */
int
rpc_srv_loopServer(rpc_srv_t * __restrict srv)
{
	if (!srv) {
		rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
		return -1;
	}

	fcntl(srv->srv_server.cli_sock, F_SETFL, 
			fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);

	if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) {
		LOGERR;
		return -1;
	}

	if (!schedRead(srv->srv_root, acceptClients, srv, srv->srv_server.cli_sock, NULL, 0)) {
		rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
		return -1;
	}

	/* main rpc loop */
	schedRun(srv->srv_root, &srv->srv_kill);
	return 0;
}


/*
 * rpc_srv_execCall() Execute registered call from RPC server
 *
 * @call = Register RPC call
 * @rpc = IN RPC call structure
 * @args = IN RPC calling arguments from RPC client
 * return: -1 error, !=-1 ok
 */
int
rpc_srv_execCall(rpc_func_t * __restrict call, struct tagRPCCall * __restrict rpc, 
		array_t * __restrict args)
{
	rpc_callback_t func;

	if (!call || !rpc || !call->func_parent || !AIT_ADDR(&call->func_name)) {
		rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
		return -1;
	}

	func = AIT_GET_LIKE(&call->func_name, rpc_callback_t);
	return func(call, ntohs(rpc->call_argc), args);
}

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>