--- libaitrpc/src/blob.c 2013/04/02 15:50:14 1.12 +++ libaitrpc/src/blob.c 2015/07/02 22:28:15 1.17 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: blob.c,v 1.12 2013/04/02 15:50:14 misho Exp $ +* $Id: blob.c,v 1.17 2015/07/02 22:28:15 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -12,7 +12,7 @@ terms: All of the documentation and software included in the ELWIX and AITNET Releases is copyrighted by ELWIX - Sofia/Bulgaria -Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 +Copyright 2004 - 2015 by Michael Pounov . All rights reserved. Redistribution and use in source and binary forms, with or without @@ -46,20 +46,31 @@ SUCH DAMAGE. #include "global.h" +static void * +toutBLOB(sched_task_t *task) +{ + rpc_srv_unregisterBLOB(TASK_DATA(task), ((rpc_blob_t*) TASK_ARG(task))->blob_var); + + return NULL; +} + + /* * rpc_srv_blobCreate() - Create and map blob to memory region and return object * * @srv = RPC Server instance * @len = BLOB length object + * @tout = BLOB live timeout in seconds * return: NULL error or !=NULL allocated BLOB object */ -inline rpc_blob_t * -rpc_srv_blobCreate(rpc_srv_t * __restrict srv, int len) +rpc_blob_t * +rpc_srv_blobCreate(rpc_srv_t * __restrict srv, int len, int tout) { rpc_blob_t *blob = NULL; char szFName[MAXPATHLEN]; int f; u_int rnd; + struct timespec ts = { tout ? tout : DEF_RPC_BLOB_TIMEOUT, 0 }; again: rnd = random() % UINT_MAX; @@ -96,11 +107,16 @@ again: close(f); unlink(szFName); return NULL; - } else + } else { close(f); + madvise(blob->blob_data, len, MADV_SEQUENTIAL); + } + blob->blob_len = len; blob->blob_var = rnd; + + schedTimer(srv->srv_blob.root, toutBLOB, blob, ts, srv, 0); return blob; } @@ -111,7 +127,7 @@ again: * @blob = Map to this BLOB element * return: -1 error or 0 ok */ -inline int +int rpc_srv_blobMap(rpc_srv_t * __restrict srv, rpc_blob_t * __restrict blob) { int f; @@ -155,7 +171,7 @@ rpc_srv_blobMap(rpc_srv_t * __restrict srv, rpc_blob_t * @blob = Mapped BLOB element * return: none */ -inline void +void rpc_srv_blobUnmap(rpc_blob_t * __restrict blob) { if (blob && blob->blob_data) { @@ -171,7 +187,7 @@ rpc_srv_blobUnmap(rpc_blob_t * __restrict blob) * @blob = Mapped BLOB element * return: -1 error or 0 ok */ -inline int +int rpc_srv_blobFree(rpc_srv_t * __restrict srv, rpc_blob_t * __restrict blob) { char szFName[MAXPATHLEN]; @@ -182,6 +198,8 @@ rpc_srv_blobFree(rpc_srv_t * __restrict srv, rpc_blob_ } else rpc_srv_blobUnmap(blob); + schedCancelby(srv->srv_blob.root, taskTIMER, CRITERIA_ARG, blob, NULL); + memset(szFName, 0, sizeof szFName); snprintf(szFName, sizeof szFName, BLOB_FILE, AIT_GET_STRZ(&srv->srv_blob.dir), blob->blob_var); if (unlink(szFName) == -1) { @@ -260,260 +278,6 @@ rpc_srv_recvBLOB(rpc_cli_t * __restrict cli, rpc_blob_ return -1; } } - - return ret; -} - -/* ------------------------------------------------------------ */ - -/* - * rpc_cli_sendBLOB() - Send BLOB to server - * - * @cli = Client instance - * @var = BLOB variable - * @data = BLOB data - * return: -1 error, 0 ok, 1 remote error - */ -int -rpc_cli_sendBLOB(rpc_cli_t * __restrict cli, ait_val_t * __restrict var, void * __restrict data) -{ - int ret, len; - uint8_t *pos; - struct tagBLOBHdr hdr; - struct pollfd pfd; - - if (!cli || !var || !data) { - rpc_SetErr(EINVAL, "Invalid arguments"); - return -1; - } else - memset(&hdr, 0, sizeof hdr); - - rpc_addPktSession(&hdr.hdr_session, cli->cli_parent); - hdr.hdr_cmd = set; - hdr.hdr_var = 0; - hdr.hdr_ret = 0; - hdr.hdr_len = htonl(AIT_LEN(var)); - - /* send SET request */ - pfd.fd = cli->cli_sock; - pfd.events = POLLOUT; - if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) == -1 || - pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { - LOGERR; - return -1; - } - if (send(cli->cli_sock, &hdr, sizeof hdr, MSG_NOSIGNAL) == -1) { - LOGERR; - return -1; - } - - /* send BLOB to server */ - for (ret = AIT_LEN(var), pos = data; ret > 0; ret -= len, pos += len) - if ((len = send(cli->cli_sock, pos, ret, MSG_NOSIGNAL)) == -1) { - LOGERR; - return -1; - } - - /* wait for reply */ - pfd.events = POLLIN | POLLPRI; - if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || - pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { - if (ret) - LOGERR; - else - rpc_SetErr(ETIMEDOUT, "Timeout reached! Server not respond"); - return -1; - } - if (recv(cli->cli_sock, &hdr, sizeof hdr, 0) == -1) { - LOGERR; - return 1; - } - - if (hdr.hdr_cmd != error) { - if (ntohl(hdr.hdr_len) != AIT_LEN(var)) { - rpc_SetErr(ECANCELED, "Bad return length packet"); - return 1; - } - - AIT_SET_BLOB(var, ntohl(hdr.hdr_var), ntohl(hdr.hdr_len)); - } - - return hdr.hdr_cmd == error; -} - -/* - * rpc_cli_recvBLOB() - Receive BLOB from server - * - * @cli = Client instance - * @var = BLOB variable - * @data = BLOB data, must be e_free after use! - * return: -1 error, 0 ok, 1 remote error - */ -int -rpc_cli_recvBLOB(rpc_cli_t * __restrict cli, ait_val_t * __restrict var, void ** __restrict data) -{ - int ret, len; - uint8_t *pos; - struct pollfd pfd; - struct tagBLOBHdr hdr; - - if (!cli || !var || !data) { - rpc_SetErr(EINVAL, "Invalid arguments"); - return -1; - } - - *data = e_malloc(AIT_LEN(var)); - if (!*data) { - LOGERR; - return -1; - } else { - memset(&hdr, 0, sizeof hdr); - memset(*data, 0, AIT_LEN(var)); - } - - rpc_addPktSession(&hdr.hdr_session, cli->cli_parent); - hdr.hdr_cmd = get; - hdr.hdr_var = htonl((uint32_t) AIT_GET_BLOB(var)); - hdr.hdr_ret = 0; - hdr.hdr_len = 0; - - /* send GET request */ - pfd.fd = cli->cli_sock; - pfd.events = POLLOUT; - if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) == -1 || - pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { - LOGERR; - e_free(*data); - *data = NULL; - return -1; - } - if (send(cli->cli_sock, &hdr, sizeof hdr, 0) == -1) { - LOGERR; - e_free(*data); - *data = NULL; - return -1; - } - - /* receive BLOB from server */ - pfd.events = POLLIN | POLLPRI; - for (ret = AIT_LEN(var), pos = *data; ret > 0; ret -= len, pos += len) { - if ((len = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || - pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { - LOGERR; - e_free(*data); - *data = NULL; - return -1; - } - - if ((len = recv(cli->cli_sock, pos, ret, 0)) == -1) { - LOGERR; - e_free(*data); - *data = NULL; - return -1; - } - } - - /* wait for reply */ - if ((len = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || - pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { - if (len) - LOGERR; - else - rpc_SetErr(ETIMEDOUT, "Timeout reached! Server not respond"); - e_free(*data); - *data = NULL; - return 1; - } - if (recv(cli->cli_sock, &hdr, sizeof hdr, 0) == -1) { - LOGERR; - e_free(*data); - *data = NULL; - return 1; - } - if (hdr.hdr_cmd != error) { - if (ntohl(hdr.hdr_len) != AIT_LEN(var)) { - rpc_SetErr(ECANCELED, "Bad return length packet"); - e_free(*data); - *data = NULL; - return 1; - } - } - - return hdr.hdr_cmd == error; -} - -/* - * rpc_cli_delBLOB() - Delete BLOB from server - * - * @cli = Client instance - * @var = BLOB variable - * return: -1 error, 0 ok, 1 remote error - */ -int -rpc_cli_delBLOB(rpc_cli_t * __restrict cli, ait_val_t * __restrict var) -{ - struct tagBLOBHdr hdr; - struct pollfd pfd; - int ret; - - if (!cli || !var) { - rpc_SetErr(EINVAL, "Invalid arguments"); - return -1; - } else - memset(&hdr, 0, sizeof hdr); - - rpc_addPktSession(&hdr.hdr_session, cli->cli_parent); - hdr.hdr_cmd = unset; - hdr.hdr_var = htonl((uint32_t) AIT_GET_BLOB(var)); - hdr.hdr_ret = 0; - hdr.hdr_len = 0; - - /* send UNSET request */ - pfd.fd = cli->cli_sock; - pfd.events = POLLOUT; - if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) == -1 || - pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { - LOGERR; - return -1; - } - if (send(cli->cli_sock, &hdr, sizeof hdr, MSG_NOSIGNAL) == -1) { - LOGERR; - return -1; - } - - /* wait for reply */ - pfd.events = POLLIN | POLLPRI; - if ((ret = poll(&pfd, 1, DEF_RPC_TIMEOUT * 1000)) < 1 || - pfd.revents & (POLLERR | POLLHUP | POLLNVAL)) { - if (ret) - LOGERR; - else - rpc_SetErr(ETIMEDOUT, "Timeout reached! Server not respond"); - return 1; - } - if (recv(cli->cli_sock, &hdr, sizeof hdr, 0) == -1) { - LOGERR; - return 1; - } - - return hdr.hdr_cmd == error; -} - -/* - * rpc_cli_getBLOB() - Receive BLOB from server and Delete after that - * - * @cli = Client instance - * @var = BLOB variable - * @data = BLOB data, must be e_free after use! - * return: -1 error, 0 ok, 1 remote error - */ -inline int -rpc_cli_getBLOB(rpc_cli_t * __restrict cli, ait_val_t * __restrict var, void ** __restrict data) -{ - int ret; - - ret = rpc_cli_recvBLOB(cli, var, data); - ret |= rpc_cli_delBLOB(cli, var) > 0 ? 2 : 0; return ret; }