--- libaitio/src/sock.c 2013/08/13 00:17:28 1.2 +++ libaitio/src/sock.c 2013/09/02 11:16:27 1.3 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: sock.c,v 1.2 2013/08/13 00:17:28 misho Exp $ +* $Id: sock.c,v 1.3 2013/09/02 11:16:27 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -73,6 +73,8 @@ ioInitSocket(int role, int type, int proto, const char } else memset(s, 0, sizeof(sock_t)); + TAILQ_INIT(&s->sock_cli); + s->sock_role = role; s->sock_type = type; s->sock_proto = proto; @@ -117,6 +119,7 @@ ioInitSocket(int role, int type, int proto, const char return NULL; } + pthread_mutex_init(&s->sock_mtx, NULL); return s; } @@ -129,11 +132,25 @@ ioInitSocket(int role, int type, int proto, const char void ioCloseSocket(sock_t ** __restrict s) { + sock_cli_t *cli; + if (s && *s) { + pthread_mutex_lock(&(*s)->sock_mtx); + while ((cli = TAILQ_FIRST(&(*s)->sock_cli))) { + TAILQ_REMOVE(&(*s)->sock_cli, cli, cli_node); + shutdown(cli->cli_fd, SHUT_RDWR); + close(cli->cli_fd); + AIT_FREE_VAL(&cli->cli_buf); + e_free(cli); + } + pthread_mutex_unlock(&(*s)->sock_mtx); + shutdown((*s)->sock_fd, SHUT_RDWR); close((*s)->sock_fd); AIT_FREE_VAL(&(*s)->sock_buf); + + pthread_mutex_destroy(&(*s)->sock_mtx); e_free(*s); *s = NULL; } @@ -183,4 +200,116 @@ ioUpSocket(sock_t * __restrict s, void *arg) fcntl(s->sock_fd, F_SETFL, fcntl(s->sock_fd, F_GETFL) | O_NONBLOCK); return ret; +} + +static void +thrCliClean(void *arg) +{ + sock_cli_t *cli = (sock_cli_t*) arg; + sock_t *s = (sock_t*) cli->cli_parent; + + if (s->sock_type == SOCK_STREAM) { + shutdown(cli->cli_fd, SHUT_RDWR); + close(cli->cli_fd); + } + AIT_FREE_VAL(&cli->cli_buf); + + pthread_mutex_lock(&s->sock_mtx); + TAILQ_REMOVE(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + + e_free(cli); +} + +static void * +io_thrCliWrapper(void *arg) +{ + void *ret; + sock_cli_t *cli = (sock_cli_t*) arg; + + pthread_cleanup_push(thrCliClean, arg); + + ret = cli->cli_func(cli); + + pthread_cleanup_pop(42); + pthread_exit(ret); +} + +/* + * ioAcceptSocket() - Accept clients + * + * @s = Socket + * @f = callback function for client handling + * @arg = optional argument for callback function + * return: -1 error or 0 ok + */ +int +ioAcceptSocket(sock_t * __restrict s, sock_cb_t f, void *arg) +{ + struct pollfd pfd[1]; + socklen_t salen; + sockaddr_t sa; + int c, rlen; + sock_cli_t *cli; + u_char buf[BUFSIZ] = { [0 ... BUFSIZ - 1] = 0 }; + + if (!s || s->sock_role == IO_SOCK_ROLE_CLIENT || !f) + return -1; + + pfd->fd = s->sock_fd; + pfd->events = POLLIN | POLLPRI; + do { + if (poll(pfd, 1, -1) < 1 || + pfd->revents & (POLLNVAL | POLLHUP | POLLERR)) { + LOGERR; + return -1; + } else + salen = sizeof sa.ss; + + if (s->sock_type == SOCK_STREAM) { + if ((c = accept(s->sock_fd, &sa.sa, &salen)) == -1) { + LOGERR; + return -1; + } + } else { + if ((rlen = recvfrom(s->sock_fd, buf, sizeof buf, MSG_PEEK, + &sa.sa, &salen)) == -1) { + LOGERR; + return -1; + } else + c = s->sock_fd; + } + + cli = e_malloc(sizeof(sock_cli_t)); + if (!cli) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + if (s->sock_type == SOCK_STREAM) + close(c); + return -1; + } else + memset(cli, 0, sizeof(sock_cli_t)); + + cli->cli_parent = s; + cli->cli_fd = c; + cli->cli_func = f; + cli->cli_arg = arg; + memcpy(&cli->cli_addr, &sa, sizeof cli->cli_addr); + AIT_SET_BUFSIZ(&cli->cli_buf, 0, AIT_LEN(&s->sock_buf)); + + if (pthread_create(&cli->cli_tid, NULL, io_thrCliWrapper, cli) == -1) { + LOGERR; + if (s->sock_type == SOCK_STREAM) + close(c); + AIT_FREE_VAL(&cli->cli_buf); + e_free(cli); + return -1; + } else { + pthread_detach(cli->cli_tid); + pthread_mutex_lock(&s->sock_mtx); + TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + } + } while (42); + + return 0; }