--- libaitio/src/sock.c 2013/11/22 14:31:08 1.6 +++ libaitio/src/sock.c 2016/08/18 09:06:31 1.16 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: sock.c,v 1.6 2013/11/22 14:31:08 misho Exp $ +* $Id: sock.c,v 1.16 2016/08/18 09:06:31 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -12,7 +12,7 @@ terms: All of the documentation and software included in the ELWIX and AITNET Releases is copyrighted by ELWIX - Sofia/Bulgaria -Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 +Copyright 2004 - 2016 by Michael Pounov . All rights reserved. Redistribution and use in source and binary forms, with or without @@ -51,26 +51,37 @@ io_closeClient(sched_task_t *task) { sock_cli_t *cli = (sock_cli_t*) TASK_ARG(task); sock_t *s = (sock_t*) cli->cli_parent; - int stat, sock = (int) TASK_DATLEN(task); + int stat; + schedCancelby(s->sock_root, taskMAX, CRITERIA_ARG, cli, NULL); + pthread_mutex_lock(&s->sock_mtx); TAILQ_REMOVE(&s->sock_cli, cli, cli_node); pthread_mutex_unlock(&s->sock_mtx); - schedCancelby(s->sock_root, taskMAX, CRITERIA_ARG, cli, NULL); + if (*cli->cli_name) + ioFreePTY(cli->cli_pty, cli->cli_name); + if (s->sock_prog) { + io_progDetach(s->sock_prog, cli->cli_pty); +#if 0 + io_progCloseAt(s->sock_prog, cli->cli_pty); +#endif + cli->cli_pty ^= cli->cli_pty; + io_progCheck(s->sock_prog, 42); + } if (s->sock_type == SOCK_STREAM) { - shutdown(sock, SHUT_RDWR); - close(sock); + shutdown(cli->cli_fd, SHUT_RDWR); + close(cli->cli_fd); } AIT_FREE_VAL(&cli->cli_buf[1]); AIT_FREE_VAL(&cli->cli_buf[0]); if (cli->cli_pid > 0) { - kill(cli->cli_pid, SIGTERM); + kill(cli->cli_pid, SIGKILL); while (waitpid(cli->cli_pid, &stat, WNOHANG) > 0) { usleep(1000); - kill(cli->cli_pid, SIGTERM); + kill(cli->cli_pid, SIGKILL); } } @@ -123,7 +134,7 @@ io_acceptClient(sched_task_t *task) AIT_SET_BUFSIZ(&cli->cli_buf[1], 0, AIT_LEN(&s->sock_buf)); schedRead(TASK_ROOT(task), cli->cli_func, cli, cli->cli_fd, TASK_ARG(task), 0); - ioUpdTimerSocket(cli, NULL); + ioUpdTimerSocket(cli); end: schedReadSelf(task); taskExit(task, NULL); @@ -132,21 +143,44 @@ end: static void * io_txNet(sched_task_t *task) { - int wlen; + int wlen, ret, len = TASK_DATLEN(task); sock_cli_t *cli = TASK_ARG(task); sock_t *s = (sock_t*) cli->cli_parent; + u_char *buf = TASK_DATA(task); + struct pollfd pfd[1]; - ioUpdTimerSocket(cli, NULL); + pfd->fd = TASK_FD(task); + pfd->events = POLLOUT; + pfd->revents = 0; + for(; len > 0; len -= wlen, buf += wlen) { + ioUpdTimerSocket(cli); - if (s->sock_type == SOCK_STREAM) - wlen = send(TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task), 0); - else - wlen = sendto(TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task), 0, - &cli->cli_addr.sa, cli->cli_addr.sa.sa_len); - if (wlen < 1) - schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, - (void*) cli->cli_func, cli->cli_fd); + if ((ret = poll(pfd, 1, s->sock_timeout.tv_sec * 1000)) < 1 || + pfd->revents & (POLLNVAL | POLLERR | POLLHUP)) { +#if 0 + if (!ret) + continue; +#endif + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + break; + } + if (s->sock_type == SOCK_STREAM) + wlen = send(TASK_FD(task), buf, len, 0); + else +#ifndef __linux__ + wlen = sendto(TASK_FD(task), buf, len, 0, &cli->cli_addr.sa, cli->cli_addr.sa.sa_len); +#else + wlen = sendto(TASK_FD(task), buf, len, 0, &cli->cli_addr.sa, + cli->cli_addr.sa.sa_family == AF_INET ? + sizeof cli->cli_addr.sin : sizeof cli->cli_addr.sin6); +#endif + if (wlen < 1) { + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + break; + } + } + taskExit(task, NULL); } @@ -156,12 +190,11 @@ io_txPty(sched_task_t *task) int wlen; sock_cli_t *cli = TASK_ARG(task); - ioUpdTimerSocket(cli, NULL); + ioUpdTimerSocket(cli); wlen = write(TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task)); if (wlen < 1) - schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, - (void*) cli->cli_func, cli->cli_fd); + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); taskExit(task, NULL); } @@ -175,7 +208,7 @@ io_rxNet(sched_task_t *task) sockaddr_t sa; socklen_t salen = sizeof sa.ss; - ioUpdTimerSocket(cli, NULL); + ioUpdTimerSocket(cli); if (s->sock_type == SOCK_STREAM) rlen = recv(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[0]), @@ -183,17 +216,19 @@ io_rxNet(sched_task_t *task) else { rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[0]), AIT_LEN(&cli->cli_buf[0]), 0, &sa.sa, &salen); - if (e_addrcmp(&cli->cli_addr, &sa, 42)) - goto end; + if (e_addrcmp(&cli->cli_addr, &sa, 42)) { + schedReadSelf(task); + taskExit(task, NULL); + } } if (rlen < 1) - schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, - (void*) cli->cli_func, cli->cli_fd); - else + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + else { schedEvent(TASK_ROOT(task), io_txPty, cli, cli->cli_pty, AIT_GET_BUF(&cli->cli_buf[0]), rlen); -end: - schedReadSelf(task); + schedReadSelf(task); + } + taskExit(task, NULL); } @@ -203,24 +238,24 @@ io_rxPty(sched_task_t *task) int rlen; sock_cli_t *cli = TASK_ARG(task); - ioUpdTimerSocket(cli, NULL); + ioUpdTimerSocket(cli); rlen = read(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[1]), AIT_LEN(&cli->cli_buf[1])); if (rlen < 1) - schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, - (void*) cli->cli_func, cli->cli_fd); - else + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, NULL, 0); + else { schedEvent(TASK_ROOT(task), io_txNet, cli, cli->cli_fd, AIT_GET_BUF(&cli->cli_buf[1]), rlen); + schedReadSelf(task); + } - schedReadSelf(task); taskExit(task, NULL); } static void * io_bridgeClient(sched_task_t *task) { - int c, rlen; + int c, rlen, pty; pid_t pid; sockaddr_t sa; socklen_t salen = sizeof sa.ss; @@ -264,28 +299,34 @@ io_bridgeClient(sched_task_t *task) AIT_SET_BUFSIZ(&cli->cli_buf[0], 0, AIT_LEN(&s->sock_buf)); AIT_SET_BUFSIZ(&cli->cli_buf[1], 0, AIT_LEN(&s->sock_buf)); - switch ((pid = ioForkPTY(&cli->cli_pty, cli->cli_name, sizeof cli->cli_name, + switch ((pid = ioForkPTY(&pty, cli->cli_name, sizeof cli->cli_name, NULL, NULL, NULL))) { case -1: ELIBERR(io); break; case 0: + cli->cli_pty = pty; + ioSetRAWMode(STDIN_FILENO, NULL); + array_Args(cli->cli_cmdline, 0, " \t", &args); argv = array_To(args); array_Destroy(&args); +#if 0 printf("Console %s\n", cli->cli_name); +#endif rlen = execv(*argv, argv); _exit(rlen); break; default: + cli->cli_pty = pty; cli->cli_pid = pid; schedRead(TASK_ROOT(task), io_rxPty, cli, cli->cli_pty, TASK_ARG(task), 0); schedRead(TASK_ROOT(task), io_rxNet, cli, cli->cli_fd, TASK_ARG(task), 0); - ioUpdTimerSocket(cli, NULL); + ioUpdTimerSocket(cli); break; } end: @@ -293,7 +334,62 @@ end: taskExit(task, NULL); } +static void * +io_bridgeClient2Pool(sched_task_t *task) +{ + int c, rlen; + sockaddr_t sa; + socklen_t salen = sizeof sa.ss; + sock_cli_t *cli = NULL; + sock_t *s = (sock_t*) TASK_ARG(task); + if (s->sock_type == SOCK_STREAM) { + if ((c = accept(TASK_FD(task), &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } + } else { + if ((rlen = recvfrom(TASK_FD(task), + AIT_GET_BUF(&s->sock_buf), AIT_LEN(&s->sock_buf), + MSG_PEEK, &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } else + c = TASK_FD(task); + } + + cli = e_malloc(sizeof(sock_cli_t)); + if (!cli) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + if (s->sock_type == SOCK_STREAM) + close(c); + goto end; + } else { + memset(cli, 0, sizeof(sock_cli_t)); + pthread_mutex_lock(&s->sock_mtx); + TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + } + + io_progCheck(s->sock_prog, 42); + + cli->cli_parent = TASK_ARG(task); + cli->cli_fd = c; + cli->cli_pty = io_progAttach(s->sock_prog, 42); + strlcpy(cli->cli_cmdline, TASK_DATA(task), sizeof cli->cli_cmdline); + memcpy(&cli->cli_addr, &sa, sizeof cli->cli_addr); + AIT_SET_BUFSIZ(&cli->cli_buf[0], 0, AIT_LEN(&s->sock_buf)); + AIT_SET_BUFSIZ(&cli->cli_buf[1], 0, AIT_LEN(&s->sock_buf)); + + schedRead(TASK_ROOT(task), io_rxPty, cli, cli->cli_pty, TASK_ARG(task), 0); + schedRead(TASK_ROOT(task), io_rxNet, cli, cli->cli_fd, TASK_ARG(task), 0); + ioUpdTimerSocket(cli); +end: + schedReadSelf(task); + taskExit(task, NULL); +} + + /* * ioInitSocket() - Init socket and allocate resources * @@ -361,7 +457,12 @@ ioInitSocket(int role, int type, int proto, const char e_free(s); return NULL; } +#ifndef __linux__ if (bind(s->sock_fd, &s->sock_addr.sa, s->sock_addr.sa.sa_len) == -1) { +#else + if (bind(s->sock_fd, &s->sock_addr.sa, s->sock_addr.sa.sa_family == AF_INET ? + sizeof s->sock_addr.sin : sizeof s->sock_addr.sin6) == -1) { +#endif LOGERR; AIT_FREE_VAL(&s->sock_buf); e_free(s); @@ -407,10 +508,10 @@ ioCloseSocket(sock_t ** __restrict s) AIT_FREE_VAL(&cli->cli_buf[0]); if (cli->cli_pid > 0) { - kill(cli->cli_pid, SIGTERM); + kill(cli->cli_pid, SIGKILL); while (waitpid(cli->cli_pid, &stat, WNOHANG) > 0) { usleep(1000); - kill(cli->cli_pid, SIGTERM); + kill(cli->cli_pid, SIGKILL); } } @@ -458,8 +559,12 @@ ioUpSocket(sock_t * __restrict s, void *arg, int timeo case IO_SOCK_ROLE_CLIENT: memcpy(&s->sock_peer, peer, sizeof s->sock_peer); - if (connect(s->sock_fd, &s->sock_peer.sa, - s->sock_peer.sa.sa_len) == -1) { +#ifndef __linux__ + if (connect(s->sock_fd, &s->sock_peer.sa, s->sock_peer.sa.sa_len) == -1) { +#else + if (connect(s->sock_fd, &s->sock_peer.sa, s->sock_addr.sa.sa_family == AF_INET ? + sizeof s->sock_peer.sin : sizeof s->sock_peer.sin6) == -1) { +#endif LOGERR; return -1; } @@ -487,11 +592,10 @@ ioUpSocket(sock_t * __restrict s, void *arg, int timeo * ioUpdTimerSocket() - Update timeout of socket * * @c = Client socket - * @arg = Optional data argument * return: none */ void -ioUpdTimerSocket(sock_cli_t * __restrict c, void *arg) +ioUpdTimerSocket(sock_cli_t * __restrict c) { sock_t *s; @@ -500,8 +604,11 @@ ioUpdTimerSocket(sock_cli_t * __restrict c, void *arg) else s = c->cli_parent; - schedCancelby(s->sock_root, taskTIMER, CRITERIA_DATLEN, (void*) c->cli_fd, NULL); - schedTimer(s->sock_root, io_closeClient, c, s->sock_timeout, arg, c->cli_fd); + if (s->sock_prog) + io_progCheck(s->sock_prog, 42); + + schedCancelby(s->sock_root, taskTIMER, CRITERIA_ARG, c, NULL); + schedTimer(s->sock_root, io_closeClient, c, s->sock_timeout, NULL, 0); } /* @@ -520,7 +627,7 @@ ioCloseClient(sock_cli_t * __restrict c) else s = c->cli_parent; - return !schedEvent(s->sock_root, io_closeClient, c, 0, NULL, c->cli_fd); + return !schedEvent(s->sock_root, io_closeClient, c, 0, NULL, 0); } /* @@ -540,6 +647,18 @@ ioLoopSocket(sock_t * __restrict s, sched_task_func_t return schedRun(s->sock_root, &s->sock_kill); } +static void * +io_progPurge(sched_task_t *task) +{ + sock_t *s = (sock_t*) TASK_ARG(task); + + io_progVacuum(s->sock_prog, 0); + + schedTimer(TASK_ROOT(task), TASK_FUNC(task), TASK_ARG(task), + s->sock_timeout, TASK_DATA(task), TASK_DATLEN(task)); + taskExit(task, NULL); +} + /* * ioBridgeProg2Socket() - Start socket scheduler and bridge program to socket * @@ -553,6 +672,29 @@ ioBridgeProg2Socket(sock_t * __restrict s, const char if (!s || !prgname || s->sock_kill) return -1; - schedRead(s->sock_root, io_bridgeClient, s, s->sock_fd, (void*) prgname, 0); + if (s->sock_prog) { + schedRead(s->sock_root, io_bridgeClient2Pool, + s, s->sock_fd, (void*) prgname, 0); + schedTimer(s->sock_root, io_progPurge, s, s->sock_timeout, NULL, 0); + } else + schedRead(s->sock_root, io_bridgeClient, + s, s->sock_fd, (void*) prgname, 0); return schedRun(s->sock_root, &s->sock_kill); +} + +/* + * ioSetupProg2Socket() - Setup program pool to socket server + * + * @s = Socket + * @p = Program pool + * return: -1 error or 0 ok + */ +int +ioSetupProg2Socket(sock_t * __restrict s, prog_t * __restrict p) +{ + if (!s) + return -1; + + s->sock_prog = p; + return 0; }