Annotation of libaitrpc/src/srv.c, revision 1.12.2.2
1.1 misho 1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
1.12.2.2! misho 6: * $Id: srv.c,v 1.12.2.1 2012/11/16 08:33:06 misho Exp $
1.1 misho 7: *
1.2 misho 8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
1.7 misho 15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012
1.2 misho 16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
1.1 misho 46: #include "global.h"
47:
48:
1.12.2.2! misho 49: /* SOCK_STREAM */
1.12.2.1 misho 50: static void *acceptClients(sched_task_t *);
51: static void *closeClient(sched_task_t *);
1.12.2.2! misho 52: static void *rxPacket(sched_task_t *);
! 53: static void *txPacket(sched_task_t *);
! 54:
! 55: /* SOCK_DGRAM */
! 56:
! 57: /* SOCK_RAW */
! 58:
! 59: static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {
! 60: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
! 61: { acceptClients, closeClient, rxPacket, txPacket }, /* SOCK_STREAM */
! 62: { NULL, NULL, NULL, NULL },
! 63: { NULL, NULL, NULL, NULL } /* SOCK_RAW */
1.12.2.1 misho 64: };
65:
66:
1.10 misho 67: static void *
68: closeClient(sched_task_t *task)
69: {
70: rpc_cli_t *c = TASK_ARG(task);
71: rpc_srv_t *s = c->cli_parent;
72:
73: schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
74:
75: /* close client socket */
76: if (TASK_VAL(task))
77: shutdown(c->cli_sock, SHUT_RDWR);
78: close(c->cli_sock);
79:
80: /* free buffer */
81: AIT_FREE_VAL(&c->cli_buf);
82:
83: io_arrayDel(s->srv_clients, c->cli_id, 0);
84: if (c)
1.11 misho 85: io_free(c);
1.10 misho 86: return NULL;
87: }
1.7 misho 88:
89: static void *
90: txPacket(sched_task_t *task)
91: {
92: rpc_cli_t *c = TASK_ARG(task);
93: rpc_srv_t *s = c->cli_parent;
94: rpc_func_t *f = NULL;
1.10 misho 95: u_char buf[USHRT_MAX] = { 0 };
1.7 misho 96: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
97: int ret, wlen = sizeof(struct tagRPCCall);
1.10 misho 98:
99: /* copy RPC header */
100: memcpy(buf, TASK_DATA(task), wlen);
1.7 misho 101:
102: if (rpc->call_argc) {
1.10 misho 103: f = rpc_srv_getCall(s, ntohs(rpc->call_tag));
1.7 misho 104: if (!f) {
1.10 misho 105: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
1.7 misho 106: rpc->call_argc ^= rpc->call_argc;
107: rpc->call_rep.ret = RPC_ERROR(-1);
108: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
109: } else {
1.10 misho 110: rpc->call_argc = htons(io_arraySize(RPC_RETVARS(c)));
1.7 misho 111: /* Go Encapsulate variables */
1.10 misho 112: ret = io_vars2buffer(buf + wlen, sizeof buf - wlen, RPC_RETVARS(c));
113: /* Free return values */
114: io_freeVars(&c->cli_vars);
1.7 misho 115: if (ret == -1) {
116: rpc_SetErr(EBADRPC, "Prepare RPC packet failed");
117: rpc->call_argc ^= rpc->call_argc;
118: rpc->call_rep.ret = RPC_ERROR(-1);
119: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
120: } else
121: wlen += ret;
122: }
123: }
124:
1.8 misho 125: rpc->call_len = htons(wlen);
126:
1.7 misho 127: /* calculate CRC */
128: rpc->call_crc ^= rpc->call_crc;
1.8 misho 129: rpc->call_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.7 misho 130:
131: /* send reply */
1.10 misho 132: ret = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
133: if (ret == -1 || ret != wlen) {
134: /* close connection */
1.12.2.1 misho 135: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
136: c, 42, NULL, 0);
1.10 misho 137: }
1.7 misho 138:
139: return NULL;
140: }
141:
142: static void *
143: execCall(sched_task_t *task)
144: {
145: rpc_cli_t *c = TASK_ARG(task);
146: rpc_srv_t *s = c->cli_parent;
147: rpc_func_t *f = NULL;
148: array_t *arr = NULL;
1.10 misho 149: u_char *buf = AIT_GET_BUF(&c->cli_buf) + TASK_VAL(task);
1.7 misho 150: struct tagRPCCall *rpc = (struct tagRPCCall*) buf;
151: int argc = ntohs(rpc->call_argc);
152:
153: /* Go decapsulate variables ... */
1.8 misho 154: if (argc) {
1.7 misho 155: arr = io_buffer2vars(buf + sizeof(struct tagRPCCall),
1.10 misho 156: AIT_LEN(&c->cli_buf) - TASK_VAL(task) - sizeof(struct tagRPCCall),
1.12 misho 157: argc, 42);
1.7 misho 158: if (!arr) {
159: rpc_SetErr(ERPCMISMATCH, "#%d - %s", io_GetErrno(), io_GetError());
160: rpc->call_argc ^= rpc->call_argc;
161: rpc->call_rep.ret = RPC_ERROR(-1);
162: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
163: return NULL;
164: }
1.10 misho 165: } else
166: arr = NULL;
1.7 misho 167:
1.10 misho 168: if (!(f = rpc_srv_getCall(s, ntohs(rpc->call_tag)))) {
1.7 misho 169: rpc_SetErr(EPROGUNAVAIL, "Function not found at RPC server");
170: rpc->call_argc ^= rpc->call_argc;
171: rpc->call_rep.ret = RPC_ERROR(-1);
172: rpc->call_rep.eno = RPC_ERROR(rpc_Errno);
173: } else {
1.8 misho 174: /* if client doesn't want reply */
1.10 misho 175: argc = RPC_CHK_NOREPLY(rpc);
176: rpc->call_rep.ret = RPC_ERROR(rpc_srv_execCall(c, rpc, f->func_name, arr));
1.7 misho 177: if (rpc->call_rep.ret == htonl(-1)) {
178: rpc->call_rep.eno = RPC_ERROR(errno);
179: rpc->call_argc ^= rpc->call_argc;
180: } else {
181: rpc->call_rep.eno ^= rpc->call_rep.eno;
1.8 misho 182: if (argc) {
183: /* without reply */
1.10 misho 184: io_freeVars(&c->cli_vars);
1.8 misho 185: rpc->call_argc ^= rpc->call_argc;
1.10 misho 186: } else {
187: /* reply */
188: rpc->call_argc = htons(io_arraySize(RPC_RETVARS(c)));
189: }
1.7 misho 190: }
191: }
192:
1.10 misho 193: io_arrayDestroy(&arr);
1.7 misho 194: return NULL;
195: }
196:
197: static void *
198: rxPacket(sched_task_t *task)
199: {
200: rpc_cli_t *c = TASK_ARG(task);
201: rpc_srv_t *s = c->cli_parent;
1.10 misho 202: int len, rlen, noreply;
203: u_short crc, off = TASK_DATLEN(task);
204: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 205: struct tagRPCCall *rpc;
206:
1.12 misho 207: if (!off)
208: memset(buf, 0, AIT_LEN(&c->cli_buf));
1.12.2.2! misho 209: // else
! 210: // memmove(buf, buf + off, AIT_LEN(&c->cli_buf) - off);
1.10 misho 211: rlen = recv(TASK_FD(task), buf + off, AIT_LEN(&c->cli_buf) - off, 0);
212: if (rlen < 1) {
213: /* close connection */
1.12.2.1 misho 214: schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],
215: c, 42, NULL, 0);
1.7 misho 216: return NULL;
1.10 misho 217: } else {
218: rlen += off; /* add reminded bytes from previous rxPacket, if exists! */
219: off = 0; /* process buffer from start offset == 0 */
1.9 misho 220: }
1.7 misho 221:
1.8 misho 222: do {
1.10 misho 223: /* check RPC packet */
1.8 misho 224: if (rlen < sizeof(struct tagRPCCall)) {
1.10 misho 225: rpc_SetErr(ERPCMISMATCH, "Short RPC packet");
1.8 misho 226:
1.10 misho 227: /* reminder received previous bytes ;) */
228: schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task),
229: TASK_FD(task), TASK_DATA(task), rlen);
1.8 misho 230: return NULL;
231: } else
232: rpc = (struct tagRPCCall*) (buf + off);
233:
1.10 misho 234: len = ntohs(rpc->call_len);
235: rlen -= len;
236:
237: /* check RPC packet lengths */
238: if (rlen < 0 || len < sizeof(struct tagRPCCall)) {
239: rpc_SetErr(ERPCMISMATCH, "Broken RPC packet length");
240: /* skip entire packet */
241: break;
242: }
1.8 misho 243:
244: /* check integrity of packet */
245: crc = ntohs(rpc->call_crc);
246: rpc->call_crc ^= rpc->call_crc;
1.10 misho 247: if (crc != crcFletcher16((u_short*) rpc, len / 2)) {
1.8 misho 248: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
1.7 misho 249:
1.10 misho 250: off += len;
251: /* try next packet remaining into buffer */
252: continue;
1.8 misho 253: }
1.7 misho 254:
1.10 misho 255: noreply = RPC_CHK_NOREPLY(rpc);
1.7 misho 256:
1.8 misho 257: /* check RPC packet session info */
258: if (rpc_chkPktSession(&rpc->call_session, &s->srv_session)) {
259: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
260: rpc->call_argc ^= rpc->call_argc;
261: rpc->call_rep.ret = RPC_ERROR(-1);
262: rpc->call_rep.eno = RPC_ERROR(errno);
263: } else {
264: /* execute RPC call */
1.10 misho 265: schedEvent(TASK_ROOT(task), execCall, TASK_ARG(task), off, NULL, 0);
1.8 misho 266: }
1.7 misho 267:
1.8 misho 268: /* send RPC reply */
269: if (!noreply)
1.12.2.2! misho 270: schedWrite(TASK_ROOT(task), cbProto[s->srv_proto][CB_TXPACKET],
! 271: TASK_ARG(task), TASK_FD(task), rpc, len);
1.7 misho 272:
1.10 misho 273: off += len;
1.8 misho 274: } while (rlen > 0);
1.7 misho 275:
276: /* lets get next packet */
1.10 misho 277: schedRead(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), TASK_FD(task), TASK_DATA(task), 0);
1.7 misho 278: return NULL;
279: }
280:
1.1 misho 281: static void *
1.10 misho 282: acceptClients(sched_task_t *task)
1.1 misho 283: {
1.10 misho 284: rpc_srv_t *srv = TASK_ARG(task);
285: rpc_cli_t *c = NULL;
286: register int i;
287: socklen_t salen = sizeof(io_sockaddr_t);
1.7 misho 288:
1.10 misho 289: /* check free slots for connect */
290: for (i = 0; i < io_arraySize(srv->srv_clients) &&
291: (c = io_array(srv->srv_clients, i, rpc_cli_t*)); i++);
292: if (c) /* no more free slots! */
293: goto end;
1.11 misho 294: c = io_malloc(sizeof(rpc_cli_t));
1.10 misho 295: if (!c) {
296: LOGERR;
297: srv->srv_kill = 1;
1.1 misho 298: return NULL;
1.10 misho 299: } else {
300: memset(c, 0, sizeof(rpc_cli_t));
301: io_arraySet(srv->srv_clients, i, c);
302: c->cli_id = i;
303: c->cli_parent = srv;
304: }
305:
306: /* alloc empty buffer */
307: AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
308:
309: /* accept client */
310: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
311: if (c->cli_sock == -1) {
312: LOGERR;
313: AIT_FREE_VAL(&c->cli_buf);
314: io_arrayDel(srv->srv_clients, i, 42);
315: goto end;
1.1 misho 316: } else
1.10 misho 317: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 318:
1.12.2.2! misho 319: schedRead(TASK_ROOT(task), cbProto[srv->srv_proto][CB_RXPACKET], c,
! 320: c->cli_sock, NULL, 0);
1.10 misho 321: end:
322: schedReadSelf(task);
323: return NULL;
324: }
1.5 misho 325:
1.10 misho 326: /* ------------------------------------------------------ */
1.7 misho 327:
1.10 misho 328: static void *
329: closeBLOBClient(sched_task_t *task)
330: {
331: rpc_cli_t *c = TASK_ARG(task);
332: rpc_srv_t *s = c->cli_parent;
1.7 misho 333:
1.10 misho 334: schedCancelby(TASK_ROOT(task), taskMAX, CRITERIA_ARG, TASK_ARG(task), NULL);
1.7 misho 335:
1.10 misho 336: /* close client socket */
337: if (TASK_VAL(task))
338: shutdown(c->cli_sock, SHUT_RDWR);
1.7 misho 339: close(c->cli_sock);
1.10 misho 340:
341: /* free buffer */
342: AIT_FREE_VAL(&c->cli_buf);
343:
344: io_arrayDel(s->srv_blob.clients, c->cli_id, 0);
345: if (c)
1.11 misho 346: io_free(c);
1.7 misho 347: return NULL;
348: }
349:
350: static void *
351: txBLOB(sched_task_t *task)
352: {
1.10 misho 353: rpc_cli_t *c = TASK_ARG(task);
354: u_char *buf = AIT_GET_BUF(&c->cli_buf);
1.7 misho 355: struct tagBLOBHdr *blob = (struct tagBLOBHdr *) buf;
356: int wlen = sizeof(struct tagBLOBHdr);
357:
358: /* calculate CRC */
359: blob->hdr_crc ^= blob->hdr_crc;
1.8 misho 360: blob->hdr_crc = htons(crcFletcher16((u_short*) buf, wlen / 2));
1.7 misho 361:
362: /* send reply */
1.10 misho 363: wlen = send(TASK_FD(task), buf, wlen, MSG_NOSIGNAL);
364: if (wlen == -1 || wlen != sizeof(struct tagBLOBHdr)) {
365: /* close blob connection */
366: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
367: }
1.7 misho 368:
369: return NULL;
370: }
1.4 misho 371:
1.7 misho 372: static void *
373: rxBLOB(sched_task_t *task)
374: {
375: rpc_cli_t *c = TASK_ARG(task);
376: rpc_srv_t *s = c->cli_parent;
377: rpc_blob_t *b;
1.10 misho 378: struct tagBLOBHdr blob;
1.7 misho 379: int rlen;
380: u_short crc;
381:
1.10 misho 382: memset(&blob, 0, sizeof blob);
383: rlen = recv(TASK_FD(task), &blob, sizeof blob, 0);
384: if (rlen < 1) {
385: /* close blob connection */
386: schedEvent(TASK_ROOT(task), closeBLOBClient, c, 42, NULL, 0);
1.7 misho 387: return NULL;
388: }
1.5 misho 389:
1.10 misho 390: /* check BLOB packet */
391: if (rlen < sizeof(struct tagBLOBHdr)) {
392: rpc_SetErr(ERPCMISMATCH, "Short BLOB packet");
1.6 misho 393:
1.10 misho 394: schedReadSelf(task);
1.7 misho 395: return NULL;
396: }
1.1 misho 397:
1.7 misho 398: /* check integrity of packet */
1.10 misho 399: crc = ntohs(blob.hdr_crc);
400: blob.hdr_crc ^= blob.hdr_crc;
401: if (crc != crcFletcher16((u_short*) &blob, rlen / 2)) {
1.7 misho 402: rpc_SetErr(ERPCMISMATCH, "Bad CRC BLOB packet");
1.10 misho 403:
404: schedReadSelf(task);
1.7 misho 405: return NULL;
406: }
1.5 misho 407:
1.7 misho 408: /* check RPC packet session info */
1.10 misho 409: if ((crc = rpc_chkPktSession(&blob.hdr_session, &s->srv_session))) {
1.7 misho 410: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
1.10 misho 411: blob.hdr_cmd = error;
1.7 misho 412: goto end;
413: }
414:
415: /* Go to proceed packet ... */
1.10 misho 416: switch (blob.hdr_cmd) {
1.7 misho 417: case get:
1.10 misho 418: if (!(b = rpc_srv_getBLOB(s, ntohl(blob.hdr_var)))) {
419: rpc_SetErr(EINVAL, "Var=%x not found", ntohl(blob.hdr_var));
420: blob.hdr_cmd = no;
421: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 422: break;
423: } else
1.10 misho 424: blob.hdr_len = htonl(b->blob_len);
1.5 misho 425:
1.7 misho 426: if (rpc_srv_blobMap(s, b) != -1) {
427: /* deliver BLOB variable to client */
1.10 misho 428: blob.hdr_ret = htonl(rpc_srv_sendBLOB(c, b));
1.7 misho 429: rpc_srv_blobUnmap(b);
430: } else {
1.10 misho 431: blob.hdr_cmd = error;
432: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 433: }
434: break;
435: case set:
1.10 misho 436: if ((b = rpc_srv_registerBLOB(s, ntohl(blob.hdr_len)))) {
1.7 misho 437: /* set new BLOB variable for reply :) */
1.10 misho 438: blob.hdr_var = htonl(b->blob_var);
1.7 misho 439:
440: /* receive BLOB from client */
1.10 misho 441: blob.hdr_ret = htonl(rpc_srv_recvBLOB(c, b));
1.7 misho 442: rpc_srv_blobUnmap(b);
1.5 misho 443: } else {
1.10 misho 444: blob.hdr_cmd = error;
445: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 446: }
447: break;
448: case unset:
1.11 misho 449: if (rpc_srv_unregisterBLOB(s, ntohl(blob.hdr_var)) == -1) {
1.10 misho 450: blob.hdr_cmd = error;
451: blob.hdr_ret = RPC_ERROR(-1);
1.1 misho 452: }
453: break;
1.7 misho 454: default:
1.10 misho 455: rpc_SetErr(EPROCUNAVAIL, "Unsupported BLOB command %d", blob.hdr_cmd);
456: blob.hdr_cmd = error;
457: blob.hdr_ret = RPC_ERROR(-1);
1.7 misho 458: }
1.1 misho 459:
1.7 misho 460: end:
1.10 misho 461: memcpy(AIT_ADDR(&c->cli_buf), &blob, sizeof blob);
462: schedWrite(TASK_ROOT(task), txBLOB, TASK_ARG(task), TASK_FD(task), NULL, 0);
463: schedReadSelf(task);
1.7 misho 464: return NULL;
1.2 misho 465: }
466:
467: static void *
1.10 misho 468: acceptBLOBClients(sched_task_t *task)
1.2 misho 469: {
1.10 misho 470: rpc_srv_t *srv = TASK_ARG(task);
471: rpc_cli_t *c = NULL;
472: register int i;
473: socklen_t salen = sizeof(io_sockaddr_t);
1.12 misho 474: #ifdef TCP_NOPUSH
475: int n = 1;
476: #endif
1.7 misho 477:
1.10 misho 478: /* check free slots for connect */
479: for (i = 0; i < io_arraySize(srv->srv_blob.clients) &&
480: (c = io_array(srv->srv_blob.clients, i, rpc_cli_t*)); i++);
481: if (c) /* no more free slots! */
482: goto end;
1.11 misho 483: c = io_malloc(sizeof(rpc_cli_t));
1.10 misho 484: if (!c) {
1.7 misho 485: LOGERR;
1.10 misho 486: srv->srv_kill = srv->srv_blob.kill = 1;
1.7 misho 487: return NULL;
488: } else {
1.10 misho 489: memset(c, 0, sizeof(rpc_cli_t));
490: io_arraySet(srv->srv_blob.clients, i, c);
491: c->cli_id = i;
492: c->cli_parent = srv;
1.7 misho 493: }
1.4 misho 494:
1.10 misho 495: /* alloc empty buffer */
496: AIT_SET_BUF2(&c->cli_buf, 0, srv->srv_netbuf);
1.2 misho 497:
1.10 misho 498: /* accept client */
499: c->cli_sock = accept(TASK_FD(task), &c->cli_sa.sa, &salen);
500: if (c->cli_sock == -1) {
501: LOGERR;
502: AIT_FREE_VAL(&c->cli_buf);
503: io_arrayDel(srv->srv_blob.clients, i, 42);
504: goto end;
1.12 misho 505: } else {
506: #ifdef TCP_NOPUSH
507: setsockopt(c->cli_sock, IPPROTO_TCP, TCP_NOPUSH, &n, sizeof n);
508: #endif
1.10 misho 509: fcntl(c->cli_sock, F_SETFL, fcntl(c->cli_sock, F_GETFL) | O_NONBLOCK);
1.12 misho 510: }
1.2 misho 511:
1.10 misho 512: schedRead(TASK_ROOT(task), rxBLOB, c, c->cli_sock, NULL, 0);
513: end:
514: schedReadSelf(task);
1.7 misho 515: return NULL;
1.1 misho 516: }
517:
1.10 misho 518: /* ------------------------------------------------------ */
1.1 misho 519:
520: /*
1.7 misho 521: * rpc_srv_initBLOBServer() - Init & create BLOB Server
522: *
1.4 misho 523: * @srv = RPC server instance
1.2 misho 524: * @Port = Port for bind server, if Port == 0 default port is selected
525: * @diskDir = Disk place for BLOB file objects
526: * return: -1 == error or 0 bind and created BLOB server instance
527: */
528: int
529: rpc_srv_initBLOBServer(rpc_srv_t * __restrict srv, u_short Port, const char *diskDir)
530: {
531: int n = 1;
532:
1.10 misho 533: if (!srv || srv->srv_kill) {
1.7 misho 534: rpc_SetErr(EINVAL, "Invalid parameters can`t init BLOB server");
1.2 misho 535: return -1;
536: }
537:
538: memset(&srv->srv_blob, 0, sizeof srv->srv_blob);
539: if (access(diskDir, R_OK | W_OK) == -1) {
540: LOGERR;
541: return -1;
542: } else
1.7 misho 543: AIT_SET_STR(&srv->srv_blob.dir, diskDir);
1.2 misho 544:
1.10 misho 545: /* init blob list */
546: TAILQ_INIT(&srv->srv_blob.blobs);
547:
1.2 misho 548: srv->srv_blob.server.cli_parent = srv;
1.4 misho 549:
1.10 misho 550: memcpy(&srv->srv_blob.server.cli_sa, &srv->srv_server.cli_sa, sizeof(io_sockaddr_t));
551: switch (srv->srv_blob.server.cli_sa.sa.sa_family) {
1.4 misho 552: case AF_INET:
1.10 misho 553: srv->srv_blob.server.cli_sa.sin.sin_port =
554: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin.sin_port) + 1);
1.4 misho 555: break;
556: case AF_INET6:
1.10 misho 557: srv->srv_blob.server.cli_sa.sin6.sin6_port =
558: htons(Port ? Port : ntohs(srv->srv_blob.server.cli_sa.sin6.sin6_port) + 1);
1.4 misho 559: break;
560: case AF_LOCAL:
1.10 misho 561: strlcat(srv->srv_blob.server.cli_sa.sun.sun_path, ".blob",
562: sizeof srv->srv_blob.server.cli_sa.sun.sun_path);
1.4 misho 563: break;
564: default:
1.7 misho 565: AIT_FREE_VAL(&srv->srv_blob.dir);
1.4 misho 566: return -1;
1.2 misho 567: }
568:
1.4 misho 569: /* create BLOB server socket */
1.6 misho 570: srv->srv_blob.server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, SOCK_STREAM, 0);
1.2 misho 571: if (srv->srv_blob.server.cli_sock == -1) {
572: LOGERR;
1.7 misho 573: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 574: return -1;
575: }
576: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
577: LOGERR;
578: close(srv->srv_blob.server.cli_sock);
1.7 misho 579: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 580: return -1;
581: }
1.5 misho 582: n = srv->srv_netbuf;
583: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
584: LOGERR;
585: close(srv->srv_blob.server.cli_sock);
1.7 misho 586: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 587: return -1;
588: }
589: if (setsockopt(srv->srv_blob.server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
590: LOGERR;
591: close(srv->srv_blob.server.cli_sock);
1.7 misho 592: AIT_FREE_VAL(&srv->srv_blob.dir);
1.5 misho 593: return -1;
594: }
1.6 misho 595: if (bind(srv->srv_blob.server.cli_sock, &srv->srv_blob.server.cli_sa.sa,
596: srv->srv_blob.server.cli_sa.sa.sa_len) == -1) {
1.2 misho 597: LOGERR;
598: close(srv->srv_blob.server.cli_sock);
1.7 misho 599: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 600: return -1;
601: }
602:
1.10 misho 603: /* allocate pool for concurent blob clients */
604: srv->srv_blob.clients = io_arrayInit(io_arraySize(srv->srv_clients));
1.2 misho 605: if (!srv->srv_blob.clients) {
1.10 misho 606: rpc_SetErr(io_GetErrno(), "%s", io_GetError());
1.2 misho 607: close(srv->srv_blob.server.cli_sock);
1.7 misho 608: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 609: return -1;
1.10 misho 610: }
1.2 misho 611:
1.10 misho 612: /* init blob scheduler */
613: srv->srv_blob.root = schedBegin();
614: if (!srv->srv_blob.root) {
615: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
616: io_arrayDestroy(&srv->srv_blob.clients);
617: close(srv->srv_blob.server.cli_sock);
618: AIT_FREE_VAL(&srv->srv_blob.dir);
619: return -1;
620: }
1.2 misho 621:
622: return 0;
623: }
624:
625: /*
1.7 misho 626: * rpc_srv_endBLOBServer() - Destroy BLOB server, close all opened sockets and free resources
627: *
1.2 misho 628: * @srv = RPC Server instance
629: * return: none
630: */
1.10 misho 631: inline void
1.2 misho 632: rpc_srv_endBLOBServer(rpc_srv_t * __restrict srv)
633: {
1.10 misho 634: if (!srv)
1.2 misho 635: return;
636:
1.10 misho 637: srv->srv_blob.kill = 1;
1.2 misho 638: }
639:
640: /*
1.10 misho 641: * rpc_srv_loopBLOBServer() - Execute Main BLOB server loop and wait for clients requests
1.7 misho 642: *
1.2 misho 643: * @srv = RPC Server instance
644: * return: -1 error or 0 ok, infinite loop ...
645: */
646: int
1.10 misho 647: rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
1.2 misho 648: {
1.10 misho 649: rpc_cli_t *c;
1.2 misho 650: register int i;
1.10 misho 651: rpc_blob_t *b, *tmp;
652: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.2 misho 653:
1.10 misho 654: if (!srv || srv->srv_kill) {
1.7 misho 655: rpc_SetErr(EINVAL, "Invalid parameter can`t start BLOB server");
1.2 misho 656: return -1;
657: }
658:
1.10 misho 659: fcntl(srv->srv_blob.server.cli_sock, F_SETFL,
660: fcntl(srv->srv_blob.server.cli_sock, F_GETFL) | O_NONBLOCK);
661:
662: if (listen(srv->srv_blob.server.cli_sock, io_arraySize(srv->srv_blob.clients)) == -1) {
1.2 misho 663: LOGERR;
664: return -1;
1.10 misho 665: }
666:
667: if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,
668: srv->srv_blob.server.cli_sock, NULL, 0)) {
669: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
670: return -1;
671: }
1.2 misho 672:
1.10 misho 673: schedPolling(srv->srv_blob.root, &ts, NULL);
674: /* main rpc loop */
675: schedRun(srv->srv_blob.root, &srv->srv_blob.kill);
1.7 misho 676:
1.10 misho 677: /* close all clients connections & server socket */
678: for (i = 0; i < io_arraySize(srv->srv_blob.clients); i++) {
679: c = io_array(srv->srv_blob.clients, i, rpc_cli_t*);
680: if (c) {
681: shutdown(c->cli_sock, SHUT_RDWR);
682: close(c->cli_sock);
683:
684: schedCancelby(srv->srv_blob.root, taskMAX, CRITERIA_ARG, c, NULL);
685: AIT_FREE_VAL(&c->cli_buf);
1.2 misho 686: }
1.10 misho 687: io_arrayDel(srv->srv_blob.clients, i, 42);
688: }
689: io_arrayDestroy(&srv->srv_blob.clients);
1.2 misho 690:
1.10 misho 691: close(srv->srv_blob.server.cli_sock);
1.2 misho 692:
1.10 misho 693: /* detach blobs */
694: TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
695: TAILQ_REMOVE(&srv->srv_blob.blobs, b, blob_node);
1.2 misho 696:
1.10 misho 697: rpc_srv_blobFree(srv, b);
1.11 misho 698: io_free(b);
1.2 misho 699: }
700:
1.10 misho 701: schedEnd(&srv->srv_blob.root);
702: AIT_FREE_VAL(&srv->srv_blob.dir);
1.2 misho 703: return 0;
704: }
705:
706:
707: /*
1.7 misho 708: * rpc_srv_initServer() - Init & create RPC Server
709: *
1.1 misho 710: * @regProgID = ProgramID for authentication & recognition
711: * @regProcID = ProcessID for authentication & recognition
712: * @concurentClients = Concurent clients at same time to this server
1.10 misho 713: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
1.4 misho 714: * @csHost = Host name or address for bind server, if NULL any address
1.1 misho 715: * @Port = Port for bind server, if Port == 0 default port is selected
1.12.2.1 misho 716: * @proto = Protocol, if == 0 choose SOCK_STREAM
1.1 misho 717: * return: NULL == error or !=NULL bind and created RPC server instance
718: */
719: rpc_srv_t *
1.10 misho 720: rpc_srv_initServer(u_int regProgID, u_char regProcID, int concurentClients,
1.12.2.1 misho 721: int netBuf, const char *csHost, u_short Port, int proto)
1.1 misho 722: {
1.10 misho 723: int n = 1;
1.1 misho 724: rpc_srv_t *srv = NULL;
1.11 misho 725: io_sockaddr_t sa = IO_SOCKADDR_INIT;
1.1 misho 726:
1.12.2.1 misho 727: if (!concurentClients || !regProgID || (proto < 0 || proto > SOCK_DGRAM)) {
1.10 misho 728: rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
1.1 misho 729: return NULL;
730: }
1.12.2.1 misho 731: if (!proto)
732: proto = SOCK_STREAM;
1.10 misho 733: if (!io_gethostbyname(csHost, Port, &sa))
734: return NULL;
1.1 misho 735: if (!Port)
736: Port = RPC_DEFPORT;
1.10 misho 737: if (netBuf < RPC_MIN_BUFSIZ)
1.5 misho 738: netBuf = BUFSIZ;
1.7 misho 739: else
1.12 misho 740: netBuf = io_align(netBuf, 2); /* align netBuf length */
1.10 misho 741:
742: #ifdef HAVE_SRANDOMDEV
743: srandomdev();
744: #else
745: time_t tim;
746:
747: srandom((time(&tim) ^ getpid()));
748: #endif
1.1 misho 749:
1.11 misho 750: srv = io_malloc(sizeof(rpc_srv_t));
1.1 misho 751: if (!srv) {
752: LOGERR;
753: return NULL;
754: } else
755: memset(srv, 0, sizeof(rpc_srv_t));
756:
1.12.2.1 misho 757: srv->srv_proto = proto;
1.5 misho 758: srv->srv_netbuf = netBuf;
1.1 misho 759: srv->srv_session.sess_version = RPC_VERSION;
760: srv->srv_session.sess_program = regProgID;
761: srv->srv_session.sess_process = regProcID;
762:
763: srv->srv_server.cli_parent = srv;
1.10 misho 764: memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
765:
1.12 misho 766: /* init functions */
767: pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
768: SLIST_INIT(&srv->srv_funcs);
769: AVL_INIT(&srv->srv_funcs);
1.10 misho 770:
771: /* init scheduler */
772: srv->srv_root = schedBegin();
773: if (!srv->srv_root) {
774: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
1.12 misho 775: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.11 misho 776: io_free(srv);
1.10 misho 777: return NULL;
778: }
779:
780: /* init pool for clients */
781: srv->srv_clients = io_arrayInit(concurentClients);
782: if (!srv->srv_clients) {
783: rpc_SetErr(io_GetErrno(), "%s", io_GetError());
784: schedEnd(&srv->srv_root);
1.12 misho 785: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.11 misho 786: io_free(srv);
1.10 misho 787: return NULL;
788: }
1.4 misho 789:
790: /* create server socket */
1.12.2.1 misho 791: srv->srv_server.cli_sock = socket(srv->srv_server.cli_sa.sa.sa_family, srv->srv_proto, 0);
1.1 misho 792: if (srv->srv_server.cli_sock == -1) {
793: LOGERR;
1.10 misho 794: io_arrayDestroy(&srv->srv_clients);
795: schedEnd(&srv->srv_root);
1.12 misho 796: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.11 misho 797: io_free(srv);
1.1 misho 798: return NULL;
799: }
800: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) {
801: LOGERR;
1.10 misho 802: goto err;
1.1 misho 803: }
1.5 misho 804: n = srv->srv_netbuf;
805: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
806: LOGERR;
1.10 misho 807: goto err;
1.5 misho 808: }
809: if (setsockopt(srv->srv_server.cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
810: LOGERR;
1.10 misho 811: goto err;
1.5 misho 812: }
1.6 misho 813: if (bind(srv->srv_server.cli_sock, &srv->srv_server.cli_sa.sa,
814: srv->srv_server.cli_sa.sa.sa_len) == -1) {
1.1 misho 815: LOGERR;
1.10 misho 816: goto err;
1.1 misho 817: }
818:
1.10 misho 819: rpc_register_srvPing(srv);
1.8 misho 820:
1.1 misho 821: return srv;
1.10 misho 822: err: /* error condition */
823: close(srv->srv_server.cli_sock);
824: io_arrayDestroy(&srv->srv_clients);
825: schedEnd(&srv->srv_root);
1.12 misho 826: pthread_mutex_destroy(&srv->srv_funcs.mtx);
1.11 misho 827: io_free(srv);
1.10 misho 828: return NULL;
1.1 misho 829: }
830:
831: /*
1.7 misho 832: * rpc_srv_endServer() - Destroy RPC server, close all opened sockets and free resources
833: *
1.6 misho 834: * @psrv = RPC Server instance
1.1 misho 835: * return: none
836: */
1.10 misho 837: inline void
1.6 misho 838: rpc_srv_endServer(rpc_srv_t ** __restrict psrv)
1.1 misho 839: {
1.10 misho 840: if (!psrv || !*psrv)
1.1 misho 841: return;
842:
1.10 misho 843: /* if send kill to blob server */
844: if (!(*psrv)->srv_blob.kill)
845: rpc_srv_endBLOBServer(*psrv);
1.1 misho 846:
1.10 misho 847: (*psrv)->srv_kill = 1;
848: sleep(RPC_SCHED_POLLING);
1.2 misho 849:
1.12 misho 850: pthread_mutex_destroy(&(*psrv)->srv_funcs.mtx);
1.11 misho 851: io_free(*psrv);
1.6 misho 852: *psrv = NULL;
1.1 misho 853: }
854:
855: /*
1.7 misho 856: * rpc_srv_loopServer() - Execute Main server loop and wait for clients requests
857: *
1.1 misho 858: * @srv = RPC Server instance
859: * return: -1 error or 0 ok, infinite loop ...
860: */
861: int
1.5 misho 862: rpc_srv_loopServer(rpc_srv_t * __restrict srv)
1.1 misho 863: {
1.10 misho 864: rpc_cli_t *c;
1.1 misho 865: register int i;
1.12 misho 866: rpc_func_t *f;
1.10 misho 867: struct timespec ts = { RPC_SCHED_POLLING, 0 };
1.1 misho 868:
869: if (!srv) {
1.10 misho 870: rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
1.1 misho 871: return -1;
872: }
873:
1.10 misho 874: if (listen(srv->srv_server.cli_sock, io_arraySize(srv->srv_clients)) == -1) {
1.1 misho 875: LOGERR;
876: return -1;
1.12 misho 877: } else
878: fcntl(srv->srv_server.cli_sock, F_SETFL,
879: fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
1.1 misho 880:
1.12.2.1 misho 881: if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv,
882: srv->srv_server.cli_sock, NULL, 0)) {
1.10 misho 883: rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
884: return -1;
885: }
1.7 misho 886:
1.10 misho 887: schedPolling(srv->srv_root, &ts, NULL);
1.7 misho 888: /* main rpc loop */
1.10 misho 889: schedRun(srv->srv_root, &srv->srv_kill);
890:
891: /* close all clients connections & server socket */
892: for (i = 0; i < io_arraySize(srv->srv_clients); i++) {
893: c = io_array(srv->srv_clients, i, rpc_cli_t*);
894: if (c) {
895: shutdown(c->cli_sock, SHUT_RDWR);
896: close(c->cli_sock);
897:
898: schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
899: io_freeVars(&RPC_RETVARS(c));
900: AIT_FREE_VAL(&c->cli_buf);
1.1 misho 901: }
1.10 misho 902: io_arrayDel(srv->srv_clients, i, 42);
903: }
904: io_arrayDestroy(&srv->srv_clients);
1.2 misho 905:
1.10 misho 906: close(srv->srv_server.cli_sock);
1.2 misho 907:
1.10 misho 908: /* detach exported calls */
1.12 misho 909: RPC_FUNCS_LOCK(&srv->srv_funcs);
910: while ((f = SLIST_FIRST(&srv->srv_funcs))) {
911: SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
1.1 misho 912:
1.10 misho 913: AIT_FREE_VAL(&f->func_name);
1.11 misho 914: io_free(f);
1.1 misho 915: }
1.12 misho 916: srv->srv_funcs.avlh_root = NULL;
917: RPC_FUNCS_UNLOCK(&srv->srv_funcs);
1.1 misho 918:
1.10 misho 919: schedEnd(&srv->srv_root);
1.1 misho 920: return 0;
921: }
922:
923:
924: /*
925: * rpc_srv_execCall() Execute registered call from RPC server
1.7 misho 926: *
1.10 misho 927: * @cli = RPC client
1.1 misho 928: * @rpc = IN RPC call structure
1.10 misho 929: * @funcname = Execute RPC function
1.5 misho 930: * @args = IN RPC calling arguments from RPC client
1.1 misho 931: * return: -1 error, !=-1 ok
932: */
933: int
1.10 misho 934: rpc_srv_execCall(rpc_cli_t * __restrict cli, struct tagRPCCall * __restrict rpc,
935: ait_val_t funcname, array_t * __restrict args)
1.1 misho 936: {
937: rpc_callback_t func;
938:
1.10 misho 939: if (!cli || !rpc || !AIT_ADDR(&funcname)) {
1.7 misho 940: rpc_SetErr(EINVAL, "Invalid parameter can`t exec function");
1.1 misho 941: return -1;
942: }
943:
1.10 misho 944: func = AIT_GET_LIKE(&funcname, rpc_callback_t);
945: return func(cli, rpc, args);
1.1 misho 946: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>