--- libaitio/src/sock.c 2013/09/02 11:16:27 1.3 +++ libaitio/src/sock.c 2014/02/08 22:06:17 1.14 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: sock.c,v 1.3 2013/09/02 11:16:27 misho Exp $ +* $Id: sock.c,v 1.14 2014/02/08 22:06:17 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -12,7 +12,7 @@ terms: All of the documentation and software included in the ELWIX and AITNET Releases is copyrighted by ELWIX - Sofia/Bulgaria -Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 +Copyright 2004 - 2014 by Michael Pounov . All rights reserved. Redistribution and use in source and binary forms, with or without @@ -46,6 +46,340 @@ SUCH DAMAGE. #include "global.h" +static void * +io_closeClient(sched_task_t *task) +{ + sock_cli_t *cli = (sock_cli_t*) TASK_ARG(task); + sock_t *s = (sock_t*) cli->cli_parent; + int stat; + + pthread_mutex_lock(&s->sock_mtx); + TAILQ_REMOVE(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + + schedCancelby(s->sock_root, taskMAX, CRITERIA_ARG, cli, NULL); + + if (*cli->cli_name) + ioFreePTY(cli->cli_pty, cli->cli_name); + if (s->sock_prog) { + io_progDetach(s->sock_prog, cli->cli_pty); +#if 0 + io_progCloseAt(s->sock_prog, cli->cli_pty); +#endif + cli->cli_pty ^= cli->cli_pty; + io_progCheck(s->sock_prog, 42); + } + + if (s->sock_type == SOCK_STREAM) { + shutdown(cli->cli_fd, SHUT_RDWR); + close(cli->cli_fd); + } + AIT_FREE_VAL(&cli->cli_buf[1]); + AIT_FREE_VAL(&cli->cli_buf[0]); + + if (cli->cli_pid > 0) { + kill(cli->cli_pid, SIGKILL); + while (waitpid(cli->cli_pid, &stat, WNOHANG) > 0) { + usleep(1000); + kill(cli->cli_pid, SIGKILL); + } + } + + e_free(cli); + taskExit(task, NULL); +} + +static void * +io_acceptClient(sched_task_t *task) +{ + int c, rlen; + sockaddr_t sa; + socklen_t salen = sizeof sa.ss; + sock_cli_t *cli = NULL; + sock_t *s = (sock_t*) TASK_ARG(task); + + if (s->sock_type == SOCK_STREAM) { + if ((c = accept(TASK_FD(task), &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } + } else { + if ((rlen = recvfrom(TASK_FD(task), + AIT_GET_BUF(&s->sock_buf), AIT_LEN(&s->sock_buf), + MSG_PEEK, &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } else + c = TASK_FD(task); + } + + cli = e_malloc(sizeof(sock_cli_t)); + if (!cli) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + if (s->sock_type == SOCK_STREAM) + close(c); + goto end; + } else { + memset(cli, 0, sizeof(sock_cli_t)); + pthread_mutex_lock(&s->sock_mtx); + TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + } + + cli->cli_parent = TASK_ARG(task); + cli->cli_fd = c; + cli->cli_func = TASK_DATA(task); + memcpy(&cli->cli_addr, &sa, sizeof cli->cli_addr); + AIT_SET_BUFSIZ(&cli->cli_buf[0], 0, AIT_LEN(&s->sock_buf)); + AIT_SET_BUFSIZ(&cli->cli_buf[1], 0, AIT_LEN(&s->sock_buf)); + + schedRead(TASK_ROOT(task), cli->cli_func, cli, cli->cli_fd, TASK_ARG(task), 0); + ioUpdTimerSocket(cli); +end: + schedReadSelf(task); + taskExit(task, NULL); +} + +static void * +io_txNet(sched_task_t *task) +{ + int wlen, ret, len = TASK_DATLEN(task); + sock_cli_t *cli = TASK_ARG(task); + sock_t *s = (sock_t*) cli->cli_parent; + u_char *buf = TASK_DATA(task); + struct pollfd pfd[1]; + + pfd->fd = TASK_FD(task); + pfd->events = POLLOUT; + pfd->revents = 0; + for(; len > 0; len -= wlen, buf += wlen) { + ioUpdTimerSocket(cli); + + if ((ret = poll(pfd, 1, s->sock_timeout.tv_sec * 1000)) < 1 || + pfd->revents & (POLLNVAL | POLLERR | POLLHUP)) { +#if 0 + if (!ret) + continue; +#endif + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + break; + } + + if (s->sock_type == SOCK_STREAM) + wlen = send(TASK_FD(task), buf, len, 0); + else + wlen = sendto(TASK_FD(task), buf, len, 0, + &cli->cli_addr.sa, cli->cli_addr.sa.sa_len); + if (wlen < 1) { + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + break; + } + } + + taskExit(task, NULL); +} + +static void * +io_txPty(sched_task_t *task) +{ + int wlen; + sock_cli_t *cli = TASK_ARG(task); + + ioUpdTimerSocket(cli); + + wlen = write(TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task)); + if (wlen < 1) + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + + taskExit(task, NULL); +} + +static void * +io_rxNet(sched_task_t *task) +{ + int rlen; + sock_cli_t *cli = TASK_ARG(task); + sock_t *s = (sock_t*) cli->cli_parent; + sockaddr_t sa; + socklen_t salen = sizeof sa.ss; + + ioUpdTimerSocket(cli); + + if (s->sock_type == SOCK_STREAM) + rlen = recv(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[0]), + AIT_LEN(&cli->cli_buf[0]), 0); + else { + rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[0]), + AIT_LEN(&cli->cli_buf[0]), 0, &sa.sa, &salen); + if (e_addrcmp(&cli->cli_addr, &sa, 42)) + goto end; + } + if (rlen < 1) + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + else + schedEvent(TASK_ROOT(task), io_txPty, cli, cli->cli_pty, + AIT_GET_BUF(&cli->cli_buf[0]), rlen); +end: + schedReadSelf(task); + taskExit(task, NULL); +} + +static void * +io_rxPty(sched_task_t *task) +{ + int rlen; + sock_cli_t *cli = TASK_ARG(task); + + ioUpdTimerSocket(cli); + + rlen = read(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[1]), AIT_LEN(&cli->cli_buf[1])); + if (rlen < 1) + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + else + schedEvent(TASK_ROOT(task), io_txNet, cli, cli->cli_fd, + AIT_GET_BUF(&cli->cli_buf[1]), rlen); + + schedReadSelf(task); + taskExit(task, NULL); +} + +static void * +io_bridgeClient(sched_task_t *task) +{ + int c, rlen, pty; + pid_t pid; + sockaddr_t sa; + socklen_t salen = sizeof sa.ss; + sock_cli_t *cli = NULL; + sock_t *s = (sock_t*) TASK_ARG(task); + array_t *args = NULL; + char **argv = NULL; + + if (s->sock_type == SOCK_STREAM) { + if ((c = accept(TASK_FD(task), &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } + } else { + if ((rlen = recvfrom(TASK_FD(task), + AIT_GET_BUF(&s->sock_buf), AIT_LEN(&s->sock_buf), + MSG_PEEK, &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } else + c = TASK_FD(task); + } + + cli = e_malloc(sizeof(sock_cli_t)); + if (!cli) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + if (s->sock_type == SOCK_STREAM) + close(c); + goto end; + } else { + memset(cli, 0, sizeof(sock_cli_t)); + pthread_mutex_lock(&s->sock_mtx); + TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + } + + cli->cli_parent = TASK_ARG(task); + cli->cli_fd = c; + strlcpy(cli->cli_cmdline, TASK_DATA(task), sizeof cli->cli_cmdline); + memcpy(&cli->cli_addr, &sa, sizeof cli->cli_addr); + AIT_SET_BUFSIZ(&cli->cli_buf[0], 0, AIT_LEN(&s->sock_buf)); + AIT_SET_BUFSIZ(&cli->cli_buf[1], 0, AIT_LEN(&s->sock_buf)); + + switch ((pid = ioForkPTY(&pty, cli->cli_name, sizeof cli->cli_name, + NULL, NULL, NULL))) { + case -1: + ELIBERR(io); + break; + case 0: + cli->cli_pty = pty; + + array_Args(cli->cli_cmdline, 0, " \t", &args); + argv = array_To(args); + array_Destroy(&args); + +#if 0 + printf("Console %s\n", cli->cli_name); +#endif + rlen = execv(*argv, argv); + _exit(rlen); + break; + default: + cli->cli_pty = pty; + cli->cli_pid = pid; + + schedRead(TASK_ROOT(task), io_rxPty, cli, cli->cli_pty, + TASK_ARG(task), 0); + schedRead(TASK_ROOT(task), io_rxNet, cli, cli->cli_fd, + TASK_ARG(task), 0); + ioUpdTimerSocket(cli); + break; + } +end: + schedReadSelf(task); + taskExit(task, NULL); +} + +static void * +io_bridgeClient2Pool(sched_task_t *task) +{ + int c, rlen; + sockaddr_t sa; + socklen_t salen = sizeof sa.ss; + sock_cli_t *cli = NULL; + sock_t *s = (sock_t*) TASK_ARG(task); + + if (s->sock_type == SOCK_STREAM) { + if ((c = accept(TASK_FD(task), &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } + } else { + if ((rlen = recvfrom(TASK_FD(task), + AIT_GET_BUF(&s->sock_buf), AIT_LEN(&s->sock_buf), + MSG_PEEK, &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } else + c = TASK_FD(task); + } + + cli = e_malloc(sizeof(sock_cli_t)); + if (!cli) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + if (s->sock_type == SOCK_STREAM) + close(c); + goto end; + } else { + memset(cli, 0, sizeof(sock_cli_t)); + pthread_mutex_lock(&s->sock_mtx); + TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + } + + io_progCheck(s->sock_prog, 42); + + cli->cli_parent = TASK_ARG(task); + cli->cli_fd = c; + cli->cli_pty = io_progAttach(s->sock_prog, 42); + strlcpy(cli->cli_cmdline, TASK_DATA(task), sizeof cli->cli_cmdline); + memcpy(&cli->cli_addr, &sa, sizeof cli->cli_addr); + AIT_SET_BUFSIZ(&cli->cli_buf[0], 0, AIT_LEN(&s->sock_buf)); + AIT_SET_BUFSIZ(&cli->cli_buf[1], 0, AIT_LEN(&s->sock_buf)); + + schedRead(TASK_ROOT(task), io_rxPty, cli, cli->cli_pty, TASK_ARG(task), 0); + schedRead(TASK_ROOT(task), io_rxNet, cli, cli->cli_fd, TASK_ARG(task), 0); + ioUpdTimerSocket(cli); +end: + schedReadSelf(task); + taskExit(task, NULL); +} + + /* * ioInitSocket() - Init socket and allocate resources * @@ -84,6 +418,7 @@ ioInitSocket(int role, int type, int proto, const char return NULL; } else { buflen = buflen ? buflen : BUFSIZ; + buflen = E_ALIGN(buflen, 2); /* align buflen length */ AIT_SET_BUFSIZ(&s->sock_buf, 0, buflen); } @@ -119,6 +454,14 @@ ioInitSocket(int role, int type, int proto, const char return NULL; } + s->sock_root = schedBegin(); + if (!s->sock_root) { + io_SetErr(sched_GetErrno(), "%s", sched_GetError()); + AIT_FREE_VAL(&s->sock_buf); + e_free(s); + return NULL; + } + pthread_mutex_init(&s->sock_mtx, NULL); return s; } @@ -132,15 +475,31 @@ ioInitSocket(int role, int type, int proto, const char void ioCloseSocket(sock_t ** __restrict s) { - sock_cli_t *cli; + sock_cli_t *cli; + int stat; if (s && *s) { pthread_mutex_lock(&(*s)->sock_mtx); while ((cli = TAILQ_FIRST(&(*s)->sock_cli))) { TAILQ_REMOVE(&(*s)->sock_cli, cli, cli_node); - shutdown(cli->cli_fd, SHUT_RDWR); - close(cli->cli_fd); - AIT_FREE_VAL(&cli->cli_buf); + + schedCancelby((*s)->sock_root, taskMAX, CRITERIA_ARG, cli, NULL); + + if ((*s)->sock_type == SOCK_STREAM) { + shutdown(cli->cli_fd, SHUT_RDWR); + close(cli->cli_fd); + } + AIT_FREE_VAL(&cli->cli_buf[1]); + AIT_FREE_VAL(&cli->cli_buf[0]); + + if (cli->cli_pid > 0) { + kill(cli->cli_pid, SIGKILL); + while (waitpid(cli->cli_pid, &stat, WNOHANG) > 0) { + usleep(1000); + kill(cli->cli_pid, SIGKILL); + } + } + e_free(cli); } pthread_mutex_unlock(&(*s)->sock_mtx); @@ -150,6 +509,8 @@ ioCloseSocket(sock_t ** __restrict s) AIT_FREE_VAL(&(*s)->sock_buf); + schedEnd(&(*s)->sock_root); + pthread_mutex_destroy(&(*s)->sock_mtx); e_free(*s); *s = NULL; @@ -161,10 +522,11 @@ ioCloseSocket(sock_t ** __restrict s) * * @s = Socket * @arg = Server role = listen backlog queue and Client role = peer address + * @timeout = Socket timeout in sec (default -1 infinit) * return: -1 error or 0 ok */ int -ioUpSocket(sock_t * __restrict s, void *arg) +ioUpSocket(sock_t * __restrict s, void *arg, int timeout) { int ret = 0; sockaddr_t *peer = (sockaddr_t*) arg; @@ -172,6 +534,11 @@ ioUpSocket(sock_t * __restrict s, void *arg) if (!s || !arg) return -1; + else { + s->sock_timeout.tv_sec = timeout; + s->sock_timeout.tv_nsec = (timeout < 1) ? timeout : 0; + schedPolling(s->sock_root, &s->sock_timeout, NULL); + } switch (s->sock_role) { case IO_SOCK_ROLE_CLIENT: @@ -202,114 +569,113 @@ ioUpSocket(sock_t * __restrict s, void *arg) return ret; } -static void -thrCliClean(void *arg) +/* + * ioUpdTimerSocket() - Update timeout of socket + * + * @c = Client socket + * return: none + */ +void +ioUpdTimerSocket(sock_cli_t * __restrict c) { - sock_cli_t *cli = (sock_cli_t*) arg; - sock_t *s = (sock_t*) cli->cli_parent; + sock_t *s; - if (s->sock_type == SOCK_STREAM) { - shutdown(cli->cli_fd, SHUT_RDWR); - close(cli->cli_fd); - } - AIT_FREE_VAL(&cli->cli_buf); + if (!c) + return; + else + s = c->cli_parent; - pthread_mutex_lock(&s->sock_mtx); - TAILQ_REMOVE(&s->sock_cli, cli, cli_node); - pthread_mutex_unlock(&s->sock_mtx); + if (s->sock_prog) + io_progCheck(s->sock_prog, 42); - e_free(cli); + schedCancelby(s->sock_root, taskTIMER, CRITERIA_ARG, c, NULL); + schedTimer(s->sock_root, io_closeClient, c, s->sock_timeout, NULL, 0); } -static void * -io_thrCliWrapper(void *arg) +/* + * ioCloseClient() - Close client socket + * + * @c = Client socket + * return: 0 ok or !=0 error + */ +int +ioCloseClient(sock_cli_t * __restrict c) { - void *ret; - sock_cli_t *cli = (sock_cli_t*) arg; + sock_t *s; - pthread_cleanup_push(thrCliClean, arg); + if (!c) + return -1; + else + s = c->cli_parent; - ret = cli->cli_func(cli); - - pthread_cleanup_pop(42); - pthread_exit(ret); + return !schedEvent(s->sock_root, io_closeClient, c, 0, NULL, 0); } /* - * ioAcceptSocket() - Accept clients + * ioLoopSocket() - Start socket scheduler * * @s = Socket - * @f = callback function for client handling - * @arg = optional argument for callback function - * return: -1 error or 0 ok + * @rcb = Read callback + * return: -1 error or return result from scheduler */ int -ioAcceptSocket(sock_t * __restrict s, sock_cb_t f, void *arg) +ioLoopSocket(sock_t * __restrict s, sched_task_func_t rcb) { - struct pollfd pfd[1]; - socklen_t salen; - sockaddr_t sa; - int c, rlen; - sock_cli_t *cli; - u_char buf[BUFSIZ] = { [0 ... BUFSIZ - 1] = 0 }; - - if (!s || s->sock_role == IO_SOCK_ROLE_CLIENT || !f) + if (!s || !rcb || s->sock_kill) return -1; - pfd->fd = s->sock_fd; - pfd->events = POLLIN | POLLPRI; - do { - if (poll(pfd, 1, -1) < 1 || - pfd->revents & (POLLNVAL | POLLHUP | POLLERR)) { - LOGERR; - return -1; - } else - salen = sizeof sa.ss; + schedRead(s->sock_root, io_acceptClient, s, s->sock_fd, rcb, 0); + return schedRun(s->sock_root, &s->sock_kill); +} - if (s->sock_type == SOCK_STREAM) { - if ((c = accept(s->sock_fd, &sa.sa, &salen)) == -1) { - LOGERR; - return -1; - } - } else { - if ((rlen = recvfrom(s->sock_fd, buf, sizeof buf, MSG_PEEK, - &sa.sa, &salen)) == -1) { - LOGERR; - return -1; - } else - c = s->sock_fd; - } +static void * +io_progPurge(sched_task_t *task) +{ + sock_t *s = (sock_t*) TASK_ARG(task); - cli = e_malloc(sizeof(sock_cli_t)); - if (!cli) { - io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); - if (s->sock_type == SOCK_STREAM) - close(c); - return -1; - } else - memset(cli, 0, sizeof(sock_cli_t)); + io_progVacuum(s->sock_prog, 0); - cli->cli_parent = s; - cli->cli_fd = c; - cli->cli_func = f; - cli->cli_arg = arg; - memcpy(&cli->cli_addr, &sa, sizeof cli->cli_addr); - AIT_SET_BUFSIZ(&cli->cli_buf, 0, AIT_LEN(&s->sock_buf)); + schedTimer(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), + s->sock_timeout, TASK_DATA(task), TASK_DATLEN(task)); + taskExit(task, NULL); +} - if (pthread_create(&cli->cli_tid, NULL, io_thrCliWrapper, cli) == -1) { - LOGERR; - if (s->sock_type == SOCK_STREAM) - close(c); - AIT_FREE_VAL(&cli->cli_buf); - e_free(cli); - return -1; - } else { - pthread_detach(cli->cli_tid); - pthread_mutex_lock(&s->sock_mtx); - TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node); - pthread_mutex_unlock(&s->sock_mtx); - } - } while (42); +/* + * ioBridgeProg2Socket() - Start socket scheduler and bridge program to socket + * + * @s = Socket + * @prgname = Program name + * return: 0 ok or !=0 error + */ +int +ioBridgeProg2Socket(sock_t * __restrict s, const char *prgname) +{ + if (!s || !prgname || s->sock_kill) + return -1; + if (s->sock_prog) { + schedRead(s->sock_root, io_bridgeClient2Pool, + s, s->sock_fd, (void*) prgname, 0); + schedTimer(s->sock_root, io_progPurge, s, s->sock_timeout, NULL, 0); + } else + schedRead(s->sock_root, io_bridgeClient, + s, s->sock_fd, (void*) prgname, 0); + return schedRun(s->sock_root, &s->sock_kill); +} + +/* + * ioSetupProg2Socket() - Setup program pool to socket server + * + * @s = Socket + * @p = Program pool + * return: -1 error or 0 ok + */ +int +ioSetupProg2Socket(sock_t * __restrict s, prog_t * __restrict p) +{ + if (!s) + return -1; + + s->sock_prog = p; return 0; }