1: /*************************************************************************
2: * (C) 2010 AITNET ltd - Sofia/Bulgaria - <misho@aitbg.com>
3: * by Michael Pounov <misho@openbsd-bg.org>
4: *
5: * $Author: misho $
6: * $Id: cli.c,v 1.16.6.1 2013/08/21 15:28:15 misho Exp $
7: *
8: **************************************************************************
9: The ELWIX and AITNET software is distributed under the following
10: terms:
11:
12: All of the documentation and software included in the ELWIX and AITNET
13: Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
14:
15: Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013
16: by Michael Pounov <misho@elwix.org>. All rights reserved.
17:
18: Redistribution and use in source and binary forms, with or without
19: modification, are permitted provided that the following conditions
20: are met:
21: 1. Redistributions of source code must retain the above copyright
22: notice, this list of conditions and the following disclaimer.
23: 2. Redistributions in binary form must reproduce the above copyright
24: notice, this list of conditions and the following disclaimer in the
25: documentation and/or other materials provided with the distribution.
26: 3. All advertising materials mentioning features or use of this software
27: must display the following acknowledgement:
28: This product includes software developed by Michael Pounov <misho@elwix.org>
29: ELWIX - Embedded LightWeight unIX and its contributors.
30: 4. Neither the name of AITNET nor the names of its contributors
31: may be used to endorse or promote products derived from this software
32: without specific prior written permission.
33:
34: THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND
35: ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
36: IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
37: ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
38: FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
39: DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
40: OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
41: HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
42: LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
43: OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
44: SUCH DAMAGE.
45: */
46: #include "global.h"
47:
48:
49: /*
50: * rpc_cli_openBLOBClient() - Connect to BLOB Server
51: *
52: * @rpccli = RPC Client session
53: * @Port = Port for bind server, if Port == 0 default port is selected
54: * return: NULL == error or !=NULL connection to BLOB server established
55: */
56: rpc_cli_t *
57: rpc_cli_openBLOBClient(rpc_cli_t * __restrict rpccli, u_short Port)
58: {
59: rpc_cli_t *cli = NULL;
60: int n;
61:
62: if (!rpccli) {
63: rpc_SetErr(EINVAL, "Invalid parameters can`t connect to BLOB server");
64: return NULL;
65: }
66:
67: cli = e_malloc(sizeof(rpc_cli_t));
68: if (!cli) {
69: LOGERR;
70: return NULL;
71: } else
72: memcpy(cli, rpccli, sizeof(rpc_cli_t));
73:
74: cli->cli_buf = ait_allocVars(1);
75: if (!cli->cli_buf) {
76: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
77: e_free(cli);
78: return NULL;
79: }
80:
81: memcpy(&cli->cli_sa, &rpccli->cli_sa, sizeof(sockaddr_t));
82: switch (cli->cli_sa.sa.sa_family) {
83: case AF_INET:
84: cli->cli_sa.sin.sin_port =
85: htons(Port ? Port : ntohs(cli->cli_sa.sin.sin_port) + 1);
86: break;
87: case AF_INET6:
88: cli->cli_sa.sin6.sin6_port =
89: htons(Port ? Port : ntohs(cli->cli_sa.sin6.sin6_port) + 1);
90: break;
91: case AF_LOCAL:
92: strlcat(cli->cli_sa.sun.sun_path, ".blob", sizeof cli->cli_sa.sun.sun_path);
93: break;
94: default:
95: rpc_SetErr(EINVAL, "Invalid socket type %d", cli->cli_sa.sa.sa_family);
96: return NULL;
97: }
98:
99: AIT_COPY_VAL(array(cli->cli_buf, 0, ait_val_t*), array(rpccli->cli_buf, 0, ait_val_t*));
100: n = AIT_LEN(array(cli->cli_buf, 0, ait_val_t*));
101:
102: /* connect to BLOB server */
103: cli->cli_sock = socket(cli->cli_sa.sa.sa_family, SOCK_STREAM, 0);
104: if (cli->cli_sock == -1) {
105: LOGERR;
106: e_free(cli);
107: return NULL;
108: }
109: if (setsockopt(cli->cli_sock, SOL_SOCKET, SO_SNDBUF, &n, sizeof n) == -1) {
110: LOGERR;
111: close(cli->cli_sock);
112: e_free(cli);
113: return NULL;
114: }
115: if (setsockopt(cli->cli_sock, SOL_SOCKET, SO_RCVBUF, &n, sizeof n) == -1) {
116: LOGERR;
117: close(cli->cli_sock);
118: e_free(cli);
119: return NULL;
120: }
121: if (connect(cli->cli_sock, &cli->cli_sa.sa, cli->cli_sa.sa.sa_len) == -1) {
122: LOGERR;
123: close(cli->cli_sock);
124: e_free(cli);
125: return NULL;
126: } else
127: fcntl(cli->cli_sock, F_SETFL, fcntl(cli->cli_sock, F_GETFL) | O_NONBLOCK);
128:
129: return cli;
130: }
131:
132: /*
133: * rpc_cli_closeBLOBClient() - Close connection to BLOB server and free resources
134: *
135: * @cli = BLOB Client session
136: * return: none
137: */
138: void
139: rpc_cli_closeBLOBClient(rpc_cli_t ** __restrict cli)
140: {
141: if (!cli || !*cli)
142: return;
143:
144: shutdown((*cli)->cli_sock, SHUT_RDWR);
145: close((*cli)->cli_sock);
146:
147: ait_freeVars(&(*cli)->cli_buf);
148:
149: e_free(*cli);
150: *cli = NULL;
151: }
152:
153: /* -------------------------------------------------------------- */
154:
155: /*
156: * rpc_cli_openClient() - Connect to RPC Server
157: *
158: * @InstID = InstID for RPC session request
159: * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
160: * @csHost = Host name or IP address for bind server
161: * @Port = Port for bind server, if Port == 0 default port is selected
162: * @proto = Protocol, if == 0 choose SOCK_STREAM
163: * return: NULL == error or !=NULL connection to RPC server established
164: */
165: rpc_cli_t *
166: rpc_cli_openClient(u_char InstID, int netBuf, const char *csHost, u_short Port, int proto)
167: {
168: rpc_cli_t *cli = NULL;
169: sockaddr_t sa = E_SOCKADDR_INIT;
170:
171: if (!e_gethostbyname(csHost, Port, &sa))
172: return NULL;
173: if (!Port)
174: Port = RPC_DEFPORT;
175: if (!proto)
176: proto = SOCK_STREAM;
177: if (netBuf < RPC_MIN_BUFSIZ)
178: netBuf = BUFSIZ;
179: else
180: netBuf = E_ALIGN(netBuf, 2); /* align netBuf length */
181:
182: #ifdef HAVE_SRANDOMDEV
183: srandomdev();
184: #else
185: time_t tim;
186:
187: srandom((time(&tim) ^ getpid()));
188: #endif
189:
190: cli = e_malloc(sizeof(rpc_cli_t));
191: if (!cli) {
192: LOGERR;
193: return NULL;
194: } else
195: memset(cli, 0, sizeof(rpc_cli_t));
196:
197: /* build session */
198: cli->cli_parent = e_malloc(sizeof(rpc_sess_t));
199: if (!cli->cli_parent) {
200: LOGERR;
201: e_free(cli);
202: return NULL;
203: } else {
204: ((rpc_sess_t*) cli->cli_parent)->sess_version = RPC_VERSION;
205: ((rpc_sess_t*) cli->cli_parent)->sess_instance = InstID;
206: }
207:
208: cli->cli_buf = ait_allocVars(2);
209: if (!cli->cli_buf) {
210: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
211: e_free(cli->cli_parent);
212: e_free(cli);
213: return NULL;
214: } else
215: AIT_SET_BUFSIZ(array(cli->cli_buf, 0, ait_val_t*), 0, netBuf);
216:
217: cli->cli_id = proto;
218: memcpy(&cli->cli_sa, &sa, sizeof cli->cli_sa);
219:
220: /* connect to RPC server */
221: cli->cli_sock = socket(cli->cli_sa.sa.sa_family, cli->cli_id, 0);
222: if (cli->cli_sock == -1) {
223: LOGERR;
224: goto err;
225: }
226: if (setsockopt(cli->cli_sock, SOL_SOCKET, SO_SNDBUF,
227: &netBuf, sizeof netBuf) == -1) {
228: LOGERR;
229: goto err;
230: }
231: if (setsockopt(cli->cli_sock, SOL_SOCKET, SO_RCVBUF,
232: &netBuf, sizeof netBuf) == -1) {
233: LOGERR;
234: goto err;
235: }
236: if (cli->cli_id == SOCK_STREAM)
237: if (connect(cli->cli_sock, &cli->cli_sa.sa, cli->cli_sa.sa.sa_len) == -1) {
238: LOGERR;
239: goto err;
240: }
241:
242: fcntl(cli->cli_sock, F_SETFL, fcntl(cli->cli_sock, F_GETFL) | O_NONBLOCK);
243: return cli;
244: err:
245: ait_freeVars(&cli->cli_buf);
246: if (cli->cli_sock > 2)
247: close(cli->cli_sock);
248: e_free(cli->cli_parent);
249: e_free(cli);
250: return NULL;
251: }
252:
253: /*
254: * rpc_cli_closeClient() - Close connection to RPC server and free resources
255: *
256: * @cli = RPC Client session
257: * return: none
258: */
259: void
260: rpc_cli_closeClient(rpc_cli_t ** __restrict cli)
261: {
262: if (!cli || !*cli)
263: return;
264:
265: if ((*cli)->cli_id == SOCK_STREAM)
266: shutdown((*cli)->cli_sock, SHUT_RDWR);
267: close((*cli)->cli_sock);
268:
269: ait_freeVars(&(*cli)->cli_buf);
270:
271: if ((*cli)->cli_parent)
272: e_free((*cli)->cli_parent);
273:
274: e_free(*cli);
275: *cli = NULL;
276: }
277:
278: /*
279: * rpc_pkt_Send() - Send RPC packet
280: *
281: * @sock = Socket
282: * @type = Type of socket
283: * @sa = Server address
284: * @pkt = RPC packet
285: * @len = Length of packet
286: * return: -1 error or !=-1 sended bytes
287: */
288: int
289: rpc_pkt_Send(int sock, int type, sockaddr_t * __restrict sa, ait_val_t * __restrict pkt, int len)
290: {
291: struct pollfd pfd;
292: int ret;
293: u_char *buf;
294:
295: if (!pkt) {
296: rpc_SetErr(EINVAL, "Invalid argument(s)!");
297: return -1;
298: } else
299: buf = AIT_GET_BUF(pkt);
300:
301: pfd.fd = sock;
302: pfd.events = POLLOUT;
303: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
304: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
305: if (ret)
306: LOGERR;
307: else
308: rpc_SetErr(ETIMEDOUT, "Timeout, can't send to RPC server");
309: return -1;
310: }
311: do {
312: if (type == SOCK_STREAM)
313: ret = send(sock, buf, len, MSG_NOSIGNAL);
314: else if (sa)
315: ret = sendto(sock, buf, len, MSG_NOSIGNAL, &sa->sa, sa->sa.sa_len);
316: else {
317: rpc_SetErr(EINVAL, "Invalid argument(s)!");
318: return -1;
319: }
320: if (ret == -1) {
321: if (errno == EAGAIN)
322: continue;
323: LOGERR;
324: return -1;
325: } else if (ret != len) {
326: rpc_SetErr(EPROCUNAVAIL, "RPC request, should be send %d bytes, "
327: "really sended %d bytes", len, ret);
328: return -1;
329: }
330: } while (0);
331:
332: return ret;
333: }
334:
335: /*
336: * rpc_pkt_Receive() - Receive RPC packet
337: *
338: * @sock = Socket
339: * @type = Type of socket
340: * @sa = Server address
341: * @pkt = RPC packet
342: * return: -1 error or !=-1 sended bytes
343: */
344: int
345: rpc_pkt_Receive(int sock, int type, sockaddr_t * __restrict sa, ait_val_t * __restrict pkt)
346: {
347: struct pollfd pfd;
348: int ret, len = 0;
349: u_char *buf;
350: sockaddr_t sa2;
351: socklen_t salen;
352:
353: if (!pkt) {
354: rpc_SetErr(EINVAL, "Invalid argument(s)!");
355: return -1;
356: } else
357: buf = AIT_GET_BUF(pkt);
358:
359: /* reply from RPC server */
360: pfd.fd = sock;
361: pfd.events = POLLIN | POLLPRI;
362: do {
363: if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 ||
364: pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) {
365: if (ret) {
366: LOGERR;
367: } else {
368: if (len++ < 7)
369: continue;
370: else
371: rpc_SetErr(ETIMEDOUT,
372: "Timeout, no answer from RPC server");
373: }
374:
375: return -1;
376: }
377:
378: memset(buf, 0, AIT_LEN(pkt));
379: if (type == SOCK_STREAM)
380: ret = recv(sock, buf, AIT_LEN(pkt), 0);
381: else {
382: memset(&sa2, 0, sizeof sa2);
383: salen = sa2.ss.ss_len = sizeof(sockaddr_t);
384: ret = recvfrom(sock, buf, AIT_LEN(pkt), 0, &sa2.sa, &salen);
385: }
386: if (ret < 1) {
387: if (ret) {
388: if (errno == EAGAIN)
389: continue;
390: else
391: LOGERR;
392: }
393: return -1;
394: }
395:
396: /* check for response from known address */
397: if (type == SOCK_DGRAM)
398: if (e_addrcmp(sa, &sa2, 42)) {
399: rpc_SetErr(ERPCMISMATCH,
400: "Received RPC response from unknown address");
401: continue;
402: }
403: } while (0);
404: if (ret < sizeof(struct tagRPCCall)) {
405: rpc_SetErr(ERPCMISMATCH, "Short RPC packet %d bytes", ret);
406: return -1;
407: }
408:
409: return ret;
410: }
411:
412: /*
413: * rpc_pkt_Request() - Build RPC Request packet
414: *
415: * @pkt = Packet buffer
416: * @sess = RPC session info
417: * @tag = Function tag for execution
418: * @vars = Function argument array of values, may be NULL
419: * @noreply = We not want RPC reply
420: * @nocrc = Without CRC calculation
421: * return: -1 error or != -1 prepared bytes into packet
422: */
423: int
424: rpc_pkt_Request(ait_val_t * __restrict pkt, rpc_sess_t * __restrict sess, u_short tag,
425: array_t * __restrict vars, int noreply, int nocrc)
426: {
427: struct tagRPCCall *rpc;
428: int ret = 0, len = sizeof(struct tagRPCCall);
429: u_char *buf;
430:
431: if (!pkt || !sess) {
432: rpc_SetErr(EINVAL, "Invalid argument(s)!");
433: return -1;
434: } else
435: buf = AIT_GET_BUF(pkt);
436:
437: /* prepare RPC call */
438: rpc = (struct tagRPCCall*) buf;
439: rpc_addPktSession(&rpc->call_session, sess);
440: rpc->call_tag = htons(tag);
441: if (!vars)
442: rpc->call_argc = 0;
443: else
444: rpc->call_argc = htons(array_Size(vars));
445:
446: /* set reply */
447: rpc->call_req.flags = noreply ? RPC_NOREPLY : RPC_REPLY;
448:
449: if (array_Size(vars)) {
450: /* marshaling variables */
451: ret = ait_vars2buffer(buf + len, AIT_LEN(pkt) - len, vars);
452: if (ret == -1) {
453: rpc_SetErr(EBADRPC, "Failed to prepare RPC packet values");
454: return -1;
455: } else
456: len += ret;
457: }
458:
459: /* total packet length */
460: rpc->call_len = htons(len);
461:
462: if (!nocrc) {
463: /* calculate CRC */
464: rpc->call_crc ^= rpc->call_crc;
465: rpc->call_crc = htons(crcFletcher16((u_short*) buf, len / 2));
466: }
467:
468: return len;
469: }
470:
471: /*
472: * rpc_pkt_Replay() - Decode RPC Replay packet
473: *
474: * @pkt = Packet buffer
475: * @sess = RPC session info
476: * @tag = Function tag
477: * @vars = Function argument array of values, may be NULL
478: * @nocrc = Without CRC calculation
479: * return: -1 error or != -1 return value from function
480: */
481: int
482: rpc_pkt_Replay(ait_val_t * __restrict pkt, rpc_sess_t * __restrict sess, u_short tag,
483: array_t ** __restrict vars, int nocrc)
484: {
485: struct tagRPCCall *rpc;
486: int len;
487: u_char *buf;
488: uint16_t crc;
489:
490: if (!pkt || !sess) {
491: rpc_SetErr(EINVAL, "Invalid argument(s)!");
492: return -1;
493: } else
494: buf = AIT_GET_BUF(pkt);
495:
496: rpc = (struct tagRPCCall*) buf;
497: if (!nocrc) {
498: /* calculate CRC */
499: crc = ntohs(rpc->call_crc);
500: rpc->call_crc ^= rpc->call_crc;
501: if (crc != crcFletcher16((u_short*) buf, ntohs(rpc->call_len) / 2)) {
502: rpc_SetErr(ERPCMISMATCH, "Bad CRC RPC packet");
503: return -1;
504: }
505: }
506:
507: /* check RPC packet session info */
508: if (rpc_chkPktSession(&rpc->call_session, sess)) {
509: rpc_SetErr(ERPCMISMATCH, "Get invalid RPC session");
510: return -1;
511: }
512: if (ntohs(rpc->call_tag) != tag) {
513: rpc_SetErr(ERPCMISMATCH, "Get wrong RPC reply");
514: return -1;
515: }
516: if (ntohl(rpc->call_rep.eno) && ntohl(rpc->call_rep.ret) == -1) {
517: rpc_SetErr(ntohl(rpc->call_rep.eno), "Server side: retcode=%d #%d %s",
518: ntohl(rpc->call_rep.ret), ntohl(rpc->call_rep.eno),
519: strerror(ntohl(rpc->call_rep.eno)));
520: return -1;
521: }
522: len = ntohs(rpc->call_argc) * sizeof(ait_val_t);
523: if (len > AIT_LEN(pkt) - sizeof(struct tagRPCCall)) {
524: rpc_SetErr(EMSGSIZE, "Reply RPC packet not enough buffer space ...");
525: return -1;
526: }
527: if (len > ntohs(rpc->call_len) - sizeof(struct tagRPCCall)) {
528: rpc_SetErr(EMSGSIZE, "Reply RPC packet is too short ...");
529: return -1;
530: }
531:
532: /* RPC is OK! Go de-marshaling variables ... */
533: if (vars && ntohs(rpc->call_argc)) {
534: #ifdef CLI_RES_ZCOPY
535: *vars = ait_buffer2vars(buf + sizeof(struct tagRPCCall), len,
536: ntohs(rpc->call_argc), 42);
537: #else
538: *vars = ait_buffer2vars(buf + sizeof(struct tagRPCCall), len,
539: ntohs(rpc->call_argc), 0);
540: #endif
541: if (!*vars) {
542: rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
543: return -1;
544: }
545: }
546:
547: return ntohl(rpc->call_rep.ret);
548: }
549:
550: /*
551: * rpc_cli_execCall() - Execute RPC call
552: *
553: * @cli = RPC Client session
554: * @noreply = We not want RPC reply
555: * @tag = Function tag for execution
556: * @in_vars = IN function argument array of values, may be NULL
557: * @out_vars = OUT returned array of rpc values, if !=NULL must be free after use with ait_freeVars()
558: * return: -1 error or != -1 ok result
559: */
560: int
561: rpc_cli_execCall(rpc_cli_t *cli, int noreply, u_short tag,
562: array_t * __restrict in_vars, array_t ** __restrict out_vars)
563: {
564: int type = 0, wlen;
565:
566: if (!cli) {
567: rpc_SetErr(EINVAL, "Can`t execute call because parameter is null or invalid!");
568: return -1;
569: }
570: if (cli->cli_id == SOCK_STREAM)
571: type = cli->cli_id;
572: if (out_vars)
573: *out_vars = NULL;
574:
575: if ((wlen = rpc_pkt_Request(rpc_getBufVar(cli), cli->cli_parent, tag, in_vars, noreply, type)) == -1)
576: return -1;
577:
578: if (rpc_pkt_Send(cli->cli_sock, cli->cli_id, &cli->cli_sa, rpc_getBufVar(cli), wlen) == -1)
579: return -1;
580:
581: if (noreply) /* we not want reply */
582: return 0;
583:
584: if (rpc_pkt_Receive(cli->cli_sock, cli->cli_id, &cli->cli_sa, rpc_getBufVar(cli)) == -1)
585: return -1;
586:
587: if ((wlen = rpc_pkt_Replay(rpc_getBufVar(cli), cli->cli_parent, tag, out_vars, type)) == -1)
588: return -1;
589:
590: return 0;
591: }
592:
593: /*
594: * rpc_cli_freeCall() - Free resouce allocated by RPC call
595: *
596: * @out_vars = Returned array with variables from RPC call
597: * return: none
598: */
599: void
600: rpc_cli_freeCall(array_t ** __restrict out_vars)
601: {
602: #ifdef CLI_RES_ZCOPY
603: array_Destroy(out_vars);
604: #else
605: ait_freeVars(out_vars);
606: #endif
607: }
608:
609: /*
610: * rpc_cli_ping() - Ping RPC server
611: *
612: * @cli = connected client
613: * return: -1 error or !=-1 ping seq id
614: */
615: int
616: rpc_cli_ping(rpc_cli_t *cli)
617: {
618: int ret = 0;
619: array_t *arr = NULL;
620:
621: if (!cli)
622: return -1;
623:
624: if (rpc_cli_execCall(cli, RPC_REPLY, CALL_SRVPING, NULL, &arr))
625: return -1;
626: else
627: ret = AIT_GET_U16(array(arr, 0, ait_val_t*));
628: rpc_cli_freeCall(&arr);
629:
630: return ret;
631: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>