--- libaitio/src/sock.c 2013/08/12 23:19:59 1.1.2.2 +++ libaitio/src/sock.c 2013/11/21 15:01:22 1.4.4.3 @@ -3,7 +3,7 @@ * by Michael Pounov * * $Author: misho $ -* $Id: sock.c,v 1.1.2.2 2013/08/12 23:19:59 misho Exp $ +* $Id: sock.c,v 1.4.4.3 2013/11/21 15:01:22 misho Exp $ * ************************************************************************** The ELWIX and AITNET software is distributed under the following @@ -73,11 +73,20 @@ ioInitSocket(int role, int type, int proto, const char } else memset(s, 0, sizeof(sock_t)); + TAILQ_INIT(&s->sock_cli); + s->sock_role = role; s->sock_type = type; s->sock_proto = proto; - e_gethostbyname(addr, port, &s->sock_addr); - AIT_SET_BUFSIZ(&s->sock_buf, 0, buflen ? buflen : BUFSIZ); + if (!e_gethostbyname(addr, port, &s->sock_addr)) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + e_free(s); + return NULL; + } else { + buflen = buflen ? buflen : BUFSIZ; + buflen = E_ALIGN(buflen, 2); /* align buflen length */ + AIT_SET_BUFSIZ(&s->sock_buf, 0, buflen); + } s->sock_fd = socket(s->sock_addr.sa.sa_family, s->sock_type, s->sock_proto); if (s->sock_fd == -1) { @@ -98,6 +107,12 @@ ioInitSocket(int role, int type, int proto, const char e_free(s); return NULL; } + if (setsockopt(s->sock_fd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) { + LOGERR; + AIT_FREE_VAL(&s->sock_buf); + e_free(s); + return NULL; + } if (bind(s->sock_fd, &s->sock_addr.sa, s->sock_addr.sa.sa_len) == -1) { LOGERR; AIT_FREE_VAL(&s->sock_buf); @@ -105,24 +120,15 @@ ioInitSocket(int role, int type, int proto, const char return NULL; } - switch (s->sock_role) { - case IO_SOCK_TYPE_CLIENT: - break; - case IO_SOCK_TYPE_SERVER: - if (setsockopt(s->sock_fd, SOL_SOCKET, SO_REUSEADDR, &n, sizeof n) == -1) { - LOGERR; - AIT_FREE_VAL(&s->sock_buf); - e_free(s); - return NULL; - } - break; - default: - io_SetErr(EINVAL, "Unsupported socket type"); - AIT_FREE_VAL(&s->sock_buf); - e_free(s); - return NULL; + s->sock_root = schedBegin(); + if (!s->sock_root) { + io_SetErr(sched_GetErrno(), "%s", sched_GetError()); + AIT_FREE_VAL(&s->sock_buf); + e_free(s); + return NULL; } + pthread_mutex_init(&s->sock_mtx, NULL); return s; } @@ -135,11 +141,27 @@ ioInitSocket(int role, int type, int proto, const char void ioCloseSocket(sock_t ** __restrict s) { + sock_cli_t *cli; + if (s && *s) { + pthread_mutex_lock(&(*s)->sock_mtx); + while ((cli = TAILQ_FIRST(&(*s)->sock_cli))) { + TAILQ_REMOVE(&(*s)->sock_cli, cli, cli_node); + shutdown(cli->cli_fd, SHUT_RDWR); + close(cli->cli_fd); + AIT_FREE_VAL(&cli->cli_buf); + e_free(cli); + } + pthread_mutex_unlock(&(*s)->sock_mtx); + shutdown((*s)->sock_fd, SHUT_RDWR); close((*s)->sock_fd); AIT_FREE_VAL(&(*s)->sock_buf); + + schedEnd(&(*s)->sock_root); + + pthread_mutex_destroy(&(*s)->sock_mtx); e_free(*s); *s = NULL; } @@ -150,10 +172,11 @@ ioCloseSocket(sock_t ** __restrict s) * * @s = Socket * @arg = Server role = listen backlog queue and Client role = peer address + * @timeout = Socket timeout in sec (default -1 infinit) * return: -1 error or 0 ok */ int -ioUpSocket(sock_t * __restrict s, void *arg) +ioUpSocket(sock_t * __restrict s, void *arg, int timeout) { int ret = 0; sockaddr_t *peer = (sockaddr_t*) arg; @@ -161,9 +184,14 @@ ioUpSocket(sock_t * __restrict s, void *arg) if (!s || !arg) return -1; + else { + s->sock_timeout.tv_sec = timeout; + s->sock_timeout.tv_nsec = (timeout < 1) ? timeout : 0; + schedPolling(s->sock_root, &s->sock_timeout, NULL); + } switch (s->sock_role) { - case IO_SOCK_TYPE_CLIENT: + case IO_SOCK_ROLE_CLIENT: memcpy(&s->sock_peer, peer, sizeof s->sock_peer); if (connect(s->sock_fd, &s->sock_peer.sa, @@ -172,12 +200,14 @@ ioUpSocket(sock_t * __restrict s, void *arg) return -1; } break; - case IO_SOCK_TYPE_SERVER: - s->sock_backq = backlog; + case IO_SOCK_ROLE_SERVER: + if (s->sock_type == SOCK_STREAM) { + s->sock_backq = backlog; - if (listen(s->sock_fd, s->sock_backq) == -1) { - LOGERR; - return -1; + if (listen(s->sock_fd, s->sock_backq) == -1) { + LOGERR; + return -1; + } } break; default: @@ -187,4 +217,128 @@ ioUpSocket(sock_t * __restrict s, void *arg) fcntl(s->sock_fd, F_SETFL, fcntl(s->sock_fd, F_GETFL) | O_NONBLOCK); return ret; +} + +static void * +io_closeClient(sched_task_t *task) +{ + sock_cli_t *cli = (sock_cli_t*) TASK_ARG(task); + sock_t *s = (sock_t*) cli->cli_parent; + int sock = (int) TASK_DATLEN(task); + + if (s->sock_type == SOCK_STREAM) { + shutdown(sock, SHUT_RDWR); + close(sock); + } + AIT_FREE_VAL(&cli->cli_buf); + + pthread_mutex_lock(&s->sock_mtx); + TAILQ_REMOVE(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + + e_free(cli); + taskExit(task, NULL); +} + +static void * +io_acceptClient(sched_task_t *task) +{ + int c, rlen; + sockaddr_t sa; + socklen_t salen = sizeof sa.ss; + sock_cli_t *cli = NULL; + sock_t *s = (sock_t*) TASK_ARG(task); + + if (s->sock_type == SOCK_STREAM) { + if ((c = accept(TASK_FD(task), &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } + } else { + if ((rlen = recvfrom(TASK_FD(task), + AIT_GET_BUF(&s->sock_buf), AIT_LEN(&s->sock_buf), + MSG_PEEK, &sa.sa, &salen)) == -1) { + LOGERR; + goto end; + } else + c = TASK_FD(task); + } + + cli = e_malloc(sizeof(sock_cli_t)); + if (!cli) { + io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); + if (s->sock_type == SOCK_STREAM) + close(c); + goto end; + } else { + memset(cli, 0, sizeof(sock_cli_t)); + pthread_mutex_lock(&s->sock_mtx); + TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node); + pthread_mutex_unlock(&s->sock_mtx); + } + + cli->cli_parent = TASK_ARG(task); + cli->cli_fd = c; + cli->cli_func = TASK_DATA(task); + memcpy(&cli->cli_addr, &sa, sizeof cli->cli_addr); + AIT_SET_BUFSIZ(&cli->cli_buf, 0, AIT_LEN(&s->sock_buf)); + + schedRead(TASK_ROOT(task), cli->cli_func, cli, cli->cli_fd, TASK_ARG(task), 0); + ioUpdTimerSocket(cli); +end: + schedReadSelf(task); + taskExit(task, NULL); +} + +/* + * ioUpdTimerSocket() - Update timeout of socket + * + * @c = Client socket + * return: none + */ +void +ioUpdTimerSocket(sock_cli_t * __restrict c) +{ + sock_t *s; + + if (!c) + return; + else + s = c->cli_parent; + + schedCancelby(s->sock_root, taskTIMER, CRITERIA_DATLEN, (void*) c->cli_fd, NULL); + schedTimer(s->sock_root, io_closeClient, c, s->sock_timeout, NULL, c->cli_fd); +} + +/* + * ioLoopSocket() - Start socket scheduler + * + * @s = Socket + * @rcb = Read callback + * return: -1 error or return result from scheduler + */ +int +ioLoopSocket(sock_t * __restrict s, sched_task_func_t rcb) +{ + schedRead(s->sock_root, io_acceptClient, s, s->sock_fd, rcb, 0); + return schedRun(s->sock_root, &s->sock_kill); +} + +/* + * ioCloseClient() - Close client socket + * + * @c = Client socket + * return: 0 ok or !=0 error + */ +int +ioCloseClient(sock_cli_t * __restrict c) +{ + sock_t *s; + + if (!c) + return -1; + else + s = c->cli_parent; + + return !schedEvent(s->sock_root, io_closeClient, c, 0, NULL, c->cli_fd); }