--- libaitio/src/sock.c 2013/08/27 19:24:37 1.2.2.4 +++ libaitio/src/sock.c 2013/11/21 12:54:21 1.4.4.1 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: sock.c,v 1.2.2.4 2013/08/27 19:24:37 misho Exp $ +* $Id: sock.c,v 1.4.4.1 2013/11/21 12:54:21 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -84,6 +84,7 @@ ioInitSocket(int role, int type, int proto, const char return NULL; } else { buflen = buflen ? buflen : BUFSIZ; + buflen = E_ALIGN(buflen, 2); /* align buflen length */ AIT_SET_BUFSIZ(&s->sock_buf, 0, buflen); } @@ -119,6 +120,14 @@ ioInitSocket(int role, int type, int proto, const char return NULL; } + s->sock_root = schedBegin(); + if (!s->sock_root) { + io_SetErr(sched_GetErrno(), "%s", sched_GetError()); + AIT_FREE_VAL(&s->sock_buf); + e_free(s); + return NULL; + } + pthread_mutex_init(&s->sock_mtx, NULL); return s; } @@ -150,6 +159,8 @@ ioCloseSocket(sock_t ** __restrict s) AIT_FREE_VAL(&(*s)->sock_buf); + schedEnd(&(*s)->sock_root); + pthread_mutex_destroy(&(*s)->sock_mtx); e_free(*s); *s = NULL; @@ -161,10 +172,11 @@ ioCloseSocket(sock_t ** __restrict s) * * @s = Socket * @arg = Server role = listen backlog queue and Client role = peer address + * @timeout = Socket timeout in sec (default -1 infinit) * return: -1 error or 0 ok */ int -ioUpSocket(sock_t * __restrict s, void *arg) +ioUpSocket(sock_t * __restrict s, void *arg, int timeout) { int ret = 0; sockaddr_t *peer = (sockaddr_t*) arg; @@ -172,6 +184,11 @@ ioUpSocket(sock_t * __restrict s, void *arg) if (!s || !arg) return -1; + else { + s->sock_timeout.tv_sec = timeout; + s->sock_timeout.tv_nsec = (timeout < 1) ? timeout : 0; + schedPolling(s->sock_root, &s->sock_timeout, NULL); + } switch (s->sock_role) { case IO_SOCK_ROLE_CLIENT: @@ -202,13 +219,42 @@ ioUpSocket(sock_t * __restrict s, void *arg) return ret; } +static void * +io_acceptSocket(sched_task_t *task) +{ + schedReadSelf(task); + taskExit(task, NULL); +} + +/* + * ioLoopSocket() - Start socket scheduler + * + * @s = Socket + * return: -1 error or return result from scheduler + */ +int +ioLoopSocket(sock_t * __restrict s) +{ + schedRead(s->sock_root, io_acceptSocket, s, s->sock_fd, NULL, 0); + return schedRun(s->sock_root, &s->sock_kill); +} + static void thrCliClean(void *arg) { sock_cli_t *cli = (sock_cli_t*) arg; + sock_t *s = (sock_t*) cli->cli_parent; - close(cli->cli_fd); + if (s->sock_type == SOCK_STREAM) { + shutdown(cli->cli_fd, SHUT_RDWR); + close(cli->cli_fd); + } AIT_FREE_VAL(&cli->cli_buf); + + pthread_mutex_lock(&s->sock_mtx); + TAILQ_REMOVE(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + e_free(cli); } @@ -223,7 +269,7 @@ io_thrCliWrapper(void *arg) ret = cli->cli_func(cli); pthread_cleanup_pop(42); - pthread_exit(NULL); + pthread_exit(ret); } /* @@ -251,7 +297,9 @@ ioAcceptSocket(sock_t * __restrict s, sock_cb_t f, voi pfd->events = POLLIN | POLLPRI; do { if (poll(pfd, 1, -1) < 1 || - pfd->revents & (POLLNVAL | POLLHUP || POLLERR)) { + pfd->revents & (POLLNVAL | POLLHUP | POLLERR)) { + if (errno == EINTR) + continue; LOGERR; return -1; } else @@ -274,7 +322,8 @@ ioAcceptSocket(sock_t * __restrict s, sock_cb_t f, voi cli = e_malloc(sizeof(sock_cli_t)); if (!cli) { io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); - close(c); + if (s->sock_type == SOCK_STREAM) + close(c); return -1; } else memset(cli, 0, sizeof(sock_cli_t)); @@ -288,13 +337,16 @@ ioAcceptSocket(sock_t * __restrict s, sock_cb_t f, voi if (pthread_create(&cli->cli_tid, NULL, io_thrCliWrapper, cli) == -1) { LOGERR; - close(c); + if (s->sock_type == SOCK_STREAM) + close(c); AIT_FREE_VAL(&cli->cli_buf); e_free(cli); return -1; } else { pthread_detach(cli->cli_tid); + pthread_mutex_lock(&s->sock_mtx); TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); } } while (42);