--- libaitio/src/sock.c 2013/08/12 20:50:27 1.1.2.1 +++ libaitio/src/sock.c 2013/11/25 18:48:40 1.9.2.1 @@ -1,3 +1,573 @@ +/************************************************************************* +* (C) 2013 AITNET ltd - Sofia/Bulgaria - +* by Michael Pounov +* +* $Author: misho $ +* $Id: sock.c,v 1.9.2.1 2013/11/25 18:48:40 misho Exp $ +* +************************************************************************** +The ELWIX and AITNET software is distributed under the following +terms: + +All of the documentation and software included in the ELWIX and AITNET +Releases is copyrighted by ELWIX - Sofia/Bulgaria + +Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 + by Michael Pounov . All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: +1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. +3. All advertising materials mentioning features or use of this software + must display the following acknowledgement: +This product includes software developed by Michael Pounov +ELWIX - Embedded LightWeight unIX and its contributors. +4. Neither the name of AITNET nor the names of its contributors + may be used to endorse or promote products derived from this software + without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY AITNET AND CONTRIBUTORS ``AS IS'' AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +SUCH DAMAGE. +*/ #include "global.h" +static void * +io_closeClient(sched_task_t *task) +{ + sock_cli_t *cli = (sock_cli_t*) TASK_ARG(task); + sock_t *s = (sock_t*) cli->cli_parent; + int stat; + + pthread_mutex_lock(&s->sock_mtx); + TAILQ_REMOVE(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + + schedCancelby(s->sock_root, taskMAX, CRITERIA_ARG, cli, NULL); + + if (*cli->cli_name) + ioFreePTY(cli->cli_pty, cli->cli_name); + + if (s->sock_type == SOCK_STREAM) { + shutdown(cli->cli_fd, SHUT_RDWR); + close(cli->cli_fd); + } + AIT_FREE_VAL(&cli->cli_buf[1]); + AIT_FREE_VAL(&cli->cli_buf[0]); + + if (cli->cli_pid > 0) { + kill(cli->cli_pid, SIGKILL); + while (waitpid(cli->cli_pid, &stat, WNOHANG) > 0) { + usleep(1000); + kill(cli->cli_pid, SIGKILL); + } + } + + e_free(cli); + taskExit(task, NULL); +} + +static void * +io_acceptClient(sched_task_t *task) +{ + int c, rlen; + sockaddr_t sa; + socklen_t salen = sizeof sa.ss; + sock_cli_t *cli = NULL; + sock_t *s = (sock_t*) TASK_ARG(task); + + if (s->sock_type == SOCK_STREAM) { + if ((c = accept(TASK_FD(task), &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } + } else { + if ((rlen = recvfrom(TASK_FD(task), + AIT_GET_BUF(&s->sock_buf), AIT_LEN(&s->sock_buf), + MSG_PEEK, &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } else + c = TASK_FD(task); + } + + cli = e_malloc(sizeof(sock_cli_t)); + if (!cli) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + if (s->sock_type == SOCK_STREAM) + close(c); + goto end; + } else { + memset(cli, 0, sizeof(sock_cli_t)); + pthread_mutex_lock(&s->sock_mtx); + TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + } + + cli->cli_parent = TASK_ARG(task); + cli->cli_fd = c; + cli->cli_func = TASK_DATA(task); + memcpy(&cli->cli_addr, &sa, sizeof cli->cli_addr); + AIT_SET_BUFSIZ(&cli->cli_buf[0], 0, AIT_LEN(&s->sock_buf)); + AIT_SET_BUFSIZ(&cli->cli_buf[1], 0, AIT_LEN(&s->sock_buf)); + + schedRead(TASK_ROOT(task), cli->cli_func, cli, cli->cli_fd, TASK_ARG(task), 0); + ioUpdTimerSocket(cli); +end: + schedReadSelf(task); + taskExit(task, NULL); +} + +static void * +io_txNet(sched_task_t *task) +{ + int wlen, ret, len = TASK_DATLEN(task); + sock_cli_t *cli = TASK_ARG(task); + sock_t *s = (sock_t*) cli->cli_parent; + u_char *buf = TASK_DATA(task); + struct pollfd pfd[1]; + + pfd->fd = TASK_FD(task); + pfd->events = POLLOUT; + pfd->revents = 0; + for(; len > 0; len -= wlen, buf += wlen) { + ioUpdTimerSocket(cli); + + if ((ret = poll(pfd, 1, s->sock_timeout.tv_sec * 1000)) < 1 || + pfd->revents & (POLLNVAL | POLLERR | POLLHUP)) { + if (!ret) + continue; + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + break; + } + + if (s->sock_type == SOCK_STREAM) + wlen = send(TASK_FD(task), buf, len, 0); + else + wlen = sendto(TASK_FD(task), buf, len, 0, + &cli->cli_addr.sa, cli->cli_addr.sa.sa_len); + if (wlen < 1) { + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + break; + } + } + + taskExit(task, NULL); +} + +static void * +io_txPty(sched_task_t *task) +{ + int wlen; + sock_cli_t *cli = TASK_ARG(task); + + ioUpdTimerSocket(cli); + + wlen = write(TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task)); + if (wlen < 1) + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + + taskExit(task, NULL); +} + +static void * +io_rxNet(sched_task_t *task) +{ + int rlen; + sock_cli_t *cli = TASK_ARG(task); + sock_t *s = (sock_t*) cli->cli_parent; + sockaddr_t sa; + socklen_t salen = sizeof sa.ss; + + ioUpdTimerSocket(cli); + + if (s->sock_type == SOCK_STREAM) + rlen = recv(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[0]), + AIT_LEN(&cli->cli_buf[0]), 0); + else { + rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[0]), + AIT_LEN(&cli->cli_buf[0]), 0, &sa.sa, &salen); + if (e_addrcmp(&cli->cli_addr, &sa, 42)) + goto end; + } + if (rlen < 1) + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + else + schedEvent(TASK_ROOT(task), io_txPty, cli, cli->cli_pty, + AIT_GET_BUF(&cli->cli_buf[0]), rlen); +end: + schedReadSelf(task); + taskExit(task, NULL); +} + +static void * +io_rxPty(sched_task_t *task) +{ + int rlen; + sock_cli_t *cli = TASK_ARG(task); + + ioUpdTimerSocket(cli); + + rlen = read(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[1]), AIT_LEN(&cli->cli_buf[1])); + if (rlen < 1) + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + else + schedEvent(TASK_ROOT(task), io_txNet, cli, cli->cli_fd, + AIT_GET_BUF(&cli->cli_buf[1]), rlen); + + schedReadSelf(task); + taskExit(task, NULL); +} + +static void * +io_bridgeClient(sched_task_t *task) +{ + int c, rlen; + pid_t pid; + sockaddr_t sa; + socklen_t salen = sizeof sa.ss; + sock_cli_t *cli = NULL; + sock_t *s = (sock_t*) TASK_ARG(task); + array_t *args = NULL; + char **argv = NULL; + + if (s->sock_type == SOCK_STREAM) { + if ((c = accept(TASK_FD(task), &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } + } else { + if ((rlen = recvfrom(TASK_FD(task), + AIT_GET_BUF(&s->sock_buf), AIT_LEN(&s->sock_buf), + MSG_PEEK, &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } else + c = TASK_FD(task); + } + + cli = e_malloc(sizeof(sock_cli_t)); + if (!cli) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + if (s->sock_type == SOCK_STREAM) + close(c); + goto end; + } else { + memset(cli, 0, sizeof(sock_cli_t)); + pthread_mutex_lock(&s->sock_mtx); + TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + } + + cli->cli_parent = TASK_ARG(task); + cli->cli_fd = c; + strlcpy(cli->cli_cmdline, TASK_DATA(task), sizeof cli->cli_cmdline); + memcpy(&cli->cli_addr, &sa, sizeof cli->cli_addr); + AIT_SET_BUFSIZ(&cli->cli_buf[0], 0, AIT_LEN(&s->sock_buf)); + AIT_SET_BUFSIZ(&cli->cli_buf[1], 0, AIT_LEN(&s->sock_buf)); + + switch ((pid = ioForkPTY(&cli->cli_pty, cli->cli_name, sizeof cli->cli_name, + NULL, NULL, NULL))) { + case -1: + ELIBERR(io); + break; + case 0: + array_Args(cli->cli_cmdline, 0, " \t", &args); + argv = array_To(args); + array_Destroy(&args); + + printf("Console %s\n", cli->cli_name); + rlen = execv(*argv, argv); + _exit(rlen); + break; + default: + cli->cli_pid = pid; + + schedRead(TASK_ROOT(task), io_rxPty, cli, cli->cli_pty, + TASK_ARG(task), 0); + schedRead(TASK_ROOT(task), io_rxNet, cli, cli->cli_fd, + TASK_ARG(task), 0); + ioUpdTimerSocket(cli); + break; + } +end: + schedReadSelf(task); + taskExit(task, NULL); +} + + +/* + * ioInitSocket() - Init socket and allocate resources + * + * @role = Socket role + * @type = Socket type + * @proto = Socket protocol + * @addr = Bind to address + * @port = Bind to port + * @buflen = Socket buffer, optional if =0 == BUFSIZ + * return: NULL error or !=NULL created socket + */ +sock_t * +ioInitSocket(int role, int type, int proto, const char *addr, u_short port, size_t buflen) +{ + sock_t *s = NULL; + int n = 1; + + if (!addr) + return NULL; + + s = e_malloc(sizeof(sock_t)); + if (!s) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + return NULL; + } else + memset(s, 0, sizeof(sock_t)); + + TAILQ_INIT(&s->sock_cli); + + s->sock_role = role; + s->sock_type = type; + s->sock_proto = proto; + if (!e_gethostbyname(addr, port, &s->sock_addr)) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + e_free(s); + return NULL; + } else { + buflen = buflen ? buflen : BUFSIZ; + buflen = E_ALIGN(buflen, 2); /* align buflen length */ + AIT_SET_BUFSIZ(&s->sock_buf, 0, buflen); + } + + s->sock_fd = socket(s->sock_addr.sa.sa_family, s->sock_type, s->sock_proto); + if (s->sock_fd == -1) { + LOGERR; + AIT_FREE_VAL(&s->sock_buf); + e_free(s); + return NULL; + } + if (setsockopt(s->sock_fd, SOL_SOCKET, SO_SNDBUF, &buflen, sizeof buflen) == -1) { + LOGERR; + AIT_FREE_VAL(&s->sock_buf); + e_free(s); + return NULL; + } + if (setsockopt(s->sock_fd, SOL_SOCKET, SO_RCVBUF, &buflen, sizeof buflen) == -1) { + LOGERR; + AIT_FREE_VAL(&s->sock_buf); + e_free(s); + return NULL; + } + if (setsockopt(s->sock_fd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) { + LOGERR; + AIT_FREE_VAL(&s->sock_buf); + e_free(s); + return NULL; + } + if (bind(s->sock_fd, &s->sock_addr.sa, s->sock_addr.sa.sa_len) == -1) { + LOGERR; + AIT_FREE_VAL(&s->sock_buf); + e_free(s); + return NULL; + } + + s->sock_root = schedBegin(); + if (!s->sock_root) { + io_SetErr(sched_GetErrno(), "%s", sched_GetError()); + AIT_FREE_VAL(&s->sock_buf); + e_free(s); + return NULL; + } + + pthread_mutex_init(&s->sock_mtx, NULL); + return s; +} + +/* + * ioCloseSocket() - Close socket and free resources + * + * @s = Socket + * return: none + */ +void +ioCloseSocket(sock_t ** __restrict s) +{ + sock_cli_t *cli; + int stat; + + if (s && *s) { + pthread_mutex_lock(&(*s)->sock_mtx); + while ((cli = TAILQ_FIRST(&(*s)->sock_cli))) { + TAILQ_REMOVE(&(*s)->sock_cli, cli, cli_node); + + schedCancelby((*s)->sock_root, taskMAX, CRITERIA_ARG, cli, NULL); + + if ((*s)->sock_type == SOCK_STREAM) { + shutdown(cli->cli_fd, SHUT_RDWR); + close(cli->cli_fd); + } + AIT_FREE_VAL(&cli->cli_buf[1]); + AIT_FREE_VAL(&cli->cli_buf[0]); + + if (cli->cli_pid > 0) { + kill(cli->cli_pid, SIGKILL); + while (waitpid(cli->cli_pid, &stat, WNOHANG) > 0) { + usleep(1000); + kill(cli->cli_pid, SIGKILL); + } + } + + e_free(cli); + } + pthread_mutex_unlock(&(*s)->sock_mtx); + + shutdown((*s)->sock_fd, SHUT_RDWR); + close((*s)->sock_fd); + + AIT_FREE_VAL(&(*s)->sock_buf); + + schedEnd(&(*s)->sock_root); + + pthread_mutex_destroy(&(*s)->sock_mtx); + e_free(*s); + *s = NULL; + } +} + +/* + * ioUpSocket() - Setup socket for use + * + * @s = Socket + * @arg = Server role = listen backlog queue and Client role = peer address + * @timeout = Socket timeout in sec (default -1 infinit) + * return: -1 error or 0 ok + */ +int +ioUpSocket(sock_t * __restrict s, void *arg, int timeout) +{ + int ret = 0; + sockaddr_t *peer = (sockaddr_t*) arg; + uintptr_t backlog = (uintptr_t) arg; + + if (!s || !arg) + return -1; + else { + s->sock_timeout.tv_sec = timeout; + s->sock_timeout.tv_nsec = (timeout < 1) ? timeout : 0; + schedPolling(s->sock_root, &s->sock_timeout, NULL); + } + + switch (s->sock_role) { + case IO_SOCK_ROLE_CLIENT: + memcpy(&s->sock_peer, peer, sizeof s->sock_peer); + + if (connect(s->sock_fd, &s->sock_peer.sa, + s->sock_peer.sa.sa_len) == -1) { + LOGERR; + return -1; + } + break; + case IO_SOCK_ROLE_SERVER: + if (s->sock_type == SOCK_STREAM) { + s->sock_backq = backlog; + + if (listen(s->sock_fd, s->sock_backq) == -1) { + LOGERR; + return -1; + } + } + break; + default: + io_SetErr(EINVAL, "Unsupported socket type"); + return -1; + } + + fcntl(s->sock_fd, F_SETFL, fcntl(s->sock_fd, F_GETFL) | O_NONBLOCK); + return ret; +} + +/* + * ioUpdTimerSocket() - Update timeout of socket + * + * @c = Client socket + * return: none + */ +void +ioUpdTimerSocket(sock_cli_t * __restrict c) +{ + sock_t *s; + + if (!c) + return; + else + s = c->cli_parent; + + schedCancelby(s->sock_root, taskTIMER, CRITERIA_ARG, c, NULL); + schedTimer(s->sock_root, io_closeClient, c, s->sock_timeout, NULL, 0); +} + +/* + * ioCloseClient() - Close client socket + * + * @c = Client socket + * return: 0 ok or !=0 error + */ +int +ioCloseClient(sock_cli_t * __restrict c) +{ + sock_t *s; + + if (!c) + return -1; + else + s = c->cli_parent; + + return !schedEvent(s->sock_root, io_closeClient, c, 0, NULL, 0); +} + +/* + * ioLoopSocket() - Start socket scheduler + * + * @s = Socket + * @rcb = Read callback + * return: -1 error or return result from scheduler + */ +int +ioLoopSocket(sock_t * __restrict s, sched_task_func_t rcb) +{ + if (!s || !rcb || s->sock_kill) + return -1; + + schedRead(s->sock_root, io_acceptClient, s, s->sock_fd, rcb, 0); + return schedRun(s->sock_root, &s->sock_kill); +} + +/* + * ioBridgeProg2Socket() - Start socket scheduler and bridge program to socket + * + * @s = Socket + * @prgname = Program name + * return: 0 ok or !=0 error + */ +int +ioBridgeProg2Socket(sock_t * __restrict s, const char *prgname) +{ + if (!s || !prgname || s->sock_kill) + return -1; + + schedRead(s->sock_root, io_bridgeClient, s, s->sock_fd, (void*) prgname, 0); + return schedRun(s->sock_root, &s->sock_kill); +}