--- libaitio/src/sock.c 2013/11/22 08:53:17 1.4.4.8 +++ libaitio/src/sock.c 2013/11/22 13:49:14 1.5 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: sock.c,v 1.4.4.8 2013/11/22 08:53:17 misho Exp $ +* $Id: sock.c,v 1.5 2013/11/22 13:49:14 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -51,14 +51,12 @@ io_closeClient(sched_task_t *task) { sock_cli_t *cli = (sock_cli_t*) TASK_ARG(task); sock_t *s = (sock_t*) cli->cli_parent; - int sock = (int) TASK_DATLEN(task); + int stat, sock = (int) TASK_DATLEN(task); pthread_mutex_lock(&s->sock_mtx); TAILQ_REMOVE(&s->sock_cli, cli, cli_node); pthread_mutex_unlock(&s->sock_mtx); - schedCancelby(s->sock_root, taskTIMER, CRITERIA_DATLEN, - (void*) cli->cli_fd, NULL); schedCancelby(s->sock_root, taskMAX, CRITERIA_ARG, cli, NULL); if (s->sock_type == SOCK_STREAM) { @@ -68,6 +66,14 @@ io_closeClient(sched_task_t *task) AIT_FREE_VAL(&cli->cli_buf[1]); AIT_FREE_VAL(&cli->cli_buf[0]); + if (cli->cli_pid > 0) { + kill(cli->cli_pid, SIGTERM); + while (waitpid(cli->cli_pid, &stat, WNOHANG) > 0) { + usleep(1000); + kill(cli->cli_pid, SIGTERM); + } + } + e_free(cli); taskExit(task, NULL); } @@ -117,13 +123,101 @@ io_acceptClient(sched_task_t *task) AIT_SET_BUFSIZ(&cli->cli_buf[1], 0, AIT_LEN(&s->sock_buf)); schedRead(TASK_ROOT(task), cli->cli_func, cli, cli->cli_fd, TASK_ARG(task), 0); - ioUpdTimerSocket(cli); + ioUpdTimerSocket(cli, NULL); end: schedReadSelf(task); taskExit(task, NULL); } static void * +io_txNet(sched_task_t *task) +{ + int wlen; + sock_cli_t *cli = TASK_ARG(task); + sock_t *s = (sock_t*) cli->cli_parent; + + ioUpdTimerSocket(cli, NULL); + + if (s->sock_type == SOCK_STREAM) + wlen = send(TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task), 0); + else + wlen = sendto(TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task), 0, + &cli->cli_addr.sa, cli->cli_addr.sa.sa_len); + if (wlen < 1) + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, + (void*) cli->cli_func, cli->cli_fd); + + taskExit(task, NULL); +} + +static void * +io_txPty(sched_task_t *task) +{ + int wlen; + sock_cli_t *cli = TASK_ARG(task); + + ioUpdTimerSocket(cli, NULL); + + wlen = write(TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task)); + if (wlen < 1) + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, + (void*) cli->cli_func, cli->cli_fd); + + taskExit(task, NULL); +} + +static void * +io_rxNet(sched_task_t *task) +{ + int rlen; + sock_cli_t *cli = TASK_ARG(task); + sock_t *s = (sock_t*) cli->cli_parent; + sockaddr_t sa; + socklen_t salen = sizeof sa.ss; + + ioUpdTimerSocket(cli, NULL); + + if (s->sock_type == SOCK_STREAM) + rlen = recv(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[0]), + AIT_LEN(&cli->cli_buf[0]), 0); + else { + rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[0]), + AIT_LEN(&cli->cli_buf[0]), 0, &sa.sa, &salen); + if (e_addrcmp(&cli->cli_addr, &sa, 42)) + goto end; + } + if (rlen < 1) + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, + (void*) cli->cli_func, cli->cli_fd); + else + schedEvent(TASK_ROOT(task), io_txPty, cli, cli->cli_pty, + AIT_GET_BUF(&cli->cli_buf[0]), rlen); +end: + schedReadSelf(task); + taskExit(task, NULL); +} + +static void * +io_rxPty(sched_task_t *task) +{ + int rlen; + sock_cli_t *cli = TASK_ARG(task); + + ioUpdTimerSocket(cli, NULL); + + rlen = read(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[1]), AIT_LEN(&cli->cli_buf[1])); + if (rlen < 1) + schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, + (void*) cli->cli_func, cli->cli_fd); + else + schedEvent(TASK_ROOT(task), io_txNet, cli, cli->cli_fd, + AIT_GET_BUF(&cli->cli_buf[1]), rlen); + + schedReadSelf(task); + taskExit(task, NULL); +} + +static void * io_bridgeClient(sched_task_t *task) { int c, rlen; @@ -180,14 +274,17 @@ io_bridgeClient(sched_task_t *task) argv = array_To(args); array_Destroy(&args); + printf("Console %s\n", cli->cli_name); execv(*argv, argv); break; default: - schedRead(TASK_ROOT(task), io_rxPty, TASK_ARG(task), - cli->cli_pty, NULL, 0); - schedRead(TASK_ROOT(task), io_rxNet, TASK_ARG(task), - cli->cli_fd, NULL, 0); - ioUpdTimerSocket(cli); + cli->cli_pid = pid; + + schedRead(TASK_ROOT(task), io_rxPty, cli, cli->cli_pty, + TASK_ARG(task), 0); + schedRead(TASK_ROOT(task), io_rxNet, cli, cli->cli_fd, + TASK_ARG(task), 0); + ioUpdTimerSocket(cli, NULL); break; } end: @@ -292,14 +389,13 @@ void ioCloseSocket(sock_t ** __restrict s) { sock_cli_t *cli; + int stat; if (s && *s) { pthread_mutex_lock(&(*s)->sock_mtx); while ((cli = TAILQ_FIRST(&(*s)->sock_cli))) { TAILQ_REMOVE(&(*s)->sock_cli, cli, cli_node); - schedCancelby((*s)->sock_root, taskTIMER, CRITERIA_DATLEN, - (void*) cli->cli_fd, NULL); schedCancelby((*s)->sock_root, taskMAX, CRITERIA_ARG, cli, NULL); if ((*s)->sock_type == SOCK_STREAM) { @@ -308,6 +404,15 @@ ioCloseSocket(sock_t ** __restrict s) } AIT_FREE_VAL(&cli->cli_buf[1]); AIT_FREE_VAL(&cli->cli_buf[0]); + + if (cli->cli_pid > 0) { + kill(cli->cli_pid, SIGTERM); + while (waitpid(cli->cli_pid, &stat, WNOHANG) > 0) { + usleep(1000); + kill(cli->cli_pid, SIGTERM); + } + } + e_free(cli); } pthread_mutex_unlock(&(*s)->sock_mtx); @@ -381,10 +486,11 @@ ioUpSocket(sock_t * __restrict s, void *arg, int timeo * ioUpdTimerSocket() - Update timeout of socket * * @c = Client socket + * @arg = Optional data argument * return: none */ void -ioUpdTimerSocket(sock_cli_t * __restrict c) +ioUpdTimerSocket(sock_cli_t * __restrict c, void *arg) { sock_t *s; @@ -394,7 +500,7 @@ ioUpdTimerSocket(sock_cli_t * __restrict c) s = c->cli_parent; schedCancelby(s->sock_root, taskTIMER, CRITERIA_DATLEN, (void*) c->cli_fd, NULL); - schedTimer(s->sock_root, io_closeClient, c, s->sock_timeout, NULL, c->cli_fd); + schedTimer(s->sock_root, io_closeClient, c, s->sock_timeout, arg, c->cli_fd); } /*