Diff for /libaitio/src/sock.c between versions 1.4 and 1.5

version 1.4, 2013/09/02 11:56:32 version 1.5, 2013/11/22 13:49:14
Line 46  SUCH DAMAGE. Line 46  SUCH DAMAGE.
 #include "global.h"  #include "global.h"
   
   
   static void *
   io_closeClient(sched_task_t *task)
   {
           sock_cli_t *cli = (sock_cli_t*) TASK_ARG(task);
           sock_t *s = (sock_t*) cli->cli_parent;
           int stat, sock = (int) TASK_DATLEN(task);
   
           pthread_mutex_lock(&s->sock_mtx);
           TAILQ_REMOVE(&s->sock_cli, cli, cli_node);
           pthread_mutex_unlock(&s->sock_mtx);
   
           schedCancelby(s->sock_root, taskMAX, CRITERIA_ARG, cli, NULL);
   
           if (s->sock_type == SOCK_STREAM) {
                   shutdown(sock, SHUT_RDWR);
                   close(sock);
           }
           AIT_FREE_VAL(&cli->cli_buf[1]);
           AIT_FREE_VAL(&cli->cli_buf[0]);
   
           if (cli->cli_pid > 0) {
                   kill(cli->cli_pid, SIGTERM);
                   while (waitpid(cli->cli_pid, &stat, WNOHANG) > 0) {
                           usleep(1000);
                           kill(cli->cli_pid, SIGTERM);
                   }
           }
   
           e_free(cli);
           taskExit(task, NULL);
   }
   
   static void *
   io_acceptClient(sched_task_t *task)
   {
           int c, rlen;
           sockaddr_t sa;
           socklen_t salen = sizeof sa.ss;
           sock_cli_t *cli = NULL;
           sock_t *s = (sock_t*) TASK_ARG(task);
   
           if (s->sock_type == SOCK_STREAM) {
                   if ((c = accept(TASK_FD(task), &sa.sa, &salen)) == -1) {
                           LOGERR;
                           goto end;
                   }
           } else {
                   if ((rlen = recvfrom(TASK_FD(task), 
                                                   AIT_GET_BUF(&s->sock_buf), AIT_LEN(&s->sock_buf), 
                                                   MSG_PEEK, &sa.sa, &salen)) == -1) {
                           LOGERR;
                           goto end;
                   } else
                           c = TASK_FD(task);
           }
   
           cli = e_malloc(sizeof(sock_cli_t));
           if (!cli) {
                   io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   if (s->sock_type == SOCK_STREAM)
                           close(c);
                   goto end;
           } else {
                   memset(cli, 0, sizeof(sock_cli_t));
                   pthread_mutex_lock(&s->sock_mtx);
                   TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node);
                   pthread_mutex_unlock(&s->sock_mtx);
           }
   
           cli->cli_parent = TASK_ARG(task);
           cli->cli_fd = c;
           cli->cli_func = TASK_DATA(task);
           memcpy(&cli->cli_addr, &sa, sizeof cli->cli_addr);
           AIT_SET_BUFSIZ(&cli->cli_buf[0], 0, AIT_LEN(&s->sock_buf));
           AIT_SET_BUFSIZ(&cli->cli_buf[1], 0, AIT_LEN(&s->sock_buf));
   
           schedRead(TASK_ROOT(task), cli->cli_func, cli, cli->cli_fd, TASK_ARG(task), 0);
           ioUpdTimerSocket(cli, NULL);
   end:
           schedReadSelf(task);
           taskExit(task, NULL);
   }
   
   static void *
   io_txNet(sched_task_t *task)
   {
           int wlen;
           sock_cli_t *cli = TASK_ARG(task);
           sock_t *s = (sock_t*) cli->cli_parent;
   
           ioUpdTimerSocket(cli, NULL);
   
           if (s->sock_type == SOCK_STREAM)
                   wlen = send(TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task), 0);
           else
                   wlen = sendto(TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task), 0, 
                                   &cli->cli_addr.sa, cli->cli_addr.sa.sa_len);
           if (wlen < 1)
                   schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, 
                                   (void*) cli->cli_func, cli->cli_fd);
   
           taskExit(task, NULL);
   }
   
   static void *
   io_txPty(sched_task_t *task)
   {
           int wlen;
           sock_cli_t *cli = TASK_ARG(task);
   
           ioUpdTimerSocket(cli, NULL);
   
           wlen = write(TASK_FD(task), TASK_DATA(task), TASK_DATLEN(task));
           if (wlen < 1)
                   schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, 
                                   (void*) cli->cli_func, cli->cli_fd);
   
           taskExit(task, NULL);
   }
   
   static void *
   io_rxNet(sched_task_t *task)
   {
           int rlen;
           sock_cli_t *cli = TASK_ARG(task);
           sock_t *s = (sock_t*) cli->cli_parent;
           sockaddr_t sa;
           socklen_t salen = sizeof sa.ss;
   
           ioUpdTimerSocket(cli, NULL);
   
           if (s->sock_type == SOCK_STREAM)
                   rlen = recv(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[0]), 
                                   AIT_LEN(&cli->cli_buf[0]), 0);
           else {
                   rlen = recvfrom(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[0]), 
                                   AIT_LEN(&cli->cli_buf[0]), 0, &sa.sa, &salen);
                   if (e_addrcmp(&cli->cli_addr, &sa, 42))
                           goto end;
           }
           if (rlen < 1)
                   schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, 
                                   (void*) cli->cli_func, cli->cli_fd);
           else
                   schedEvent(TASK_ROOT(task), io_txPty, cli, cli->cli_pty, 
                                   AIT_GET_BUF(&cli->cli_buf[0]), rlen);
   end:
           schedReadSelf(task);
           taskExit(task, NULL);
   }
   
   static void *
   io_rxPty(sched_task_t *task)
   {
           int rlen;
           sock_cli_t *cli = TASK_ARG(task);
   
           ioUpdTimerSocket(cli, NULL);
   
           rlen = read(TASK_FD(task), AIT_GET_BUF(&cli->cli_buf[1]), AIT_LEN(&cli->cli_buf[1]));
           if (rlen < 1)
                   schedEvent(TASK_ROOT(task), io_closeClient, cli, 0, 
                                   (void*) cli->cli_func, cli->cli_fd);
           else
                   schedEvent(TASK_ROOT(task), io_txNet, cli, cli->cli_fd, 
                                   AIT_GET_BUF(&cli->cli_buf[1]), rlen);
   
           schedReadSelf(task);
           taskExit(task, NULL);
   }
   
   static void *
   io_bridgeClient(sched_task_t *task)
   {
           int c, rlen;
           pid_t pid;
           sockaddr_t sa;
           socklen_t salen = sizeof sa.ss;
           sock_cli_t *cli = NULL;
           sock_t *s = (sock_t*) TASK_ARG(task);
           array_t *args = NULL;
           char **argv = NULL;
   
           if (s->sock_type == SOCK_STREAM) {
                   if ((c = accept(TASK_FD(task), &sa.sa, &salen)) == -1) {
                           LOGERR;
                           goto end;
                   }
           } else {
                   if ((rlen = recvfrom(TASK_FD(task), 
                                                   AIT_GET_BUF(&s->sock_buf), AIT_LEN(&s->sock_buf), 
                                                   MSG_PEEK, &sa.sa, &salen)) == -1) {
                           LOGERR;
                           goto end;
                   } else
                           c = TASK_FD(task);
           }
   
           cli = e_malloc(sizeof(sock_cli_t));
           if (!cli) {
                   io_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   if (s->sock_type == SOCK_STREAM)
                           close(c);
                   goto end;
           } else {
                   memset(cli, 0, sizeof(sock_cli_t));
                   pthread_mutex_lock(&s->sock_mtx);
                   TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node);
                   pthread_mutex_unlock(&s->sock_mtx);
           }
   
           cli->cli_parent = TASK_ARG(task);
           cli->cli_fd = c;
           strlcpy(cli->cli_cmdline, TASK_DATA(task), sizeof cli->cli_cmdline);
           memcpy(&cli->cli_addr, &sa, sizeof cli->cli_addr);
           AIT_SET_BUFSIZ(&cli->cli_buf[0], 0, AIT_LEN(&s->sock_buf));
           AIT_SET_BUFSIZ(&cli->cli_buf[1], 0, AIT_LEN(&s->sock_buf));
   
           switch ((pid = ioForkPTY(&cli->cli_pty, cli->cli_name, sizeof cli->cli_name, 
                                   NULL, NULL, NULL))) {
                   case -1:
                           ELIBERR(io);
                           break;
                   case 0:
                           array_Args(cli->cli_cmdline, 0, " \t", &args);
                           argv = array_To(args);
                           array_Destroy(&args);
   
                           printf("Console %s\n", cli->cli_name);
                           execv(*argv, argv);
                           break;
                   default:
                           cli->cli_pid = pid;
   
                           schedRead(TASK_ROOT(task), io_rxPty, cli, cli->cli_pty, 
                                           TASK_ARG(task), 0);
                           schedRead(TASK_ROOT(task), io_rxNet, cli, cli->cli_fd, 
                                           TASK_ARG(task), 0);
                           ioUpdTimerSocket(cli, NULL);
                           break;
           }
   end:
           schedReadSelf(task);
           taskExit(task, NULL);
   }
   
   
 /*  /*
  * ioInitSocket() - Init socket and allocate resources   * ioInitSocket() - Init socket and allocate resources
  *   *
Line 84  ioInitSocket(int role, int type, int proto, const char Line 331  ioInitSocket(int role, int type, int proto, const char
                 return NULL;                  return NULL;
         } else {          } else {
                 buflen = buflen ? buflen : BUFSIZ;                  buflen = buflen ? buflen : BUFSIZ;
                   buflen = E_ALIGN(buflen, 2);    /* align buflen length */
                 AIT_SET_BUFSIZ(&s->sock_buf, 0, buflen);                  AIT_SET_BUFSIZ(&s->sock_buf, 0, buflen);
         }          }
   
Line 119  ioInitSocket(int role, int type, int proto, const char Line 367  ioInitSocket(int role, int type, int proto, const char
                 return NULL;                  return NULL;
         }          }
   
           s->sock_root = schedBegin();
           if (!s->sock_root) {
                   io_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   AIT_FREE_VAL(&s->sock_buf);
                   e_free(s);
                   return NULL;
           }
   
         pthread_mutex_init(&s->sock_mtx, NULL);          pthread_mutex_init(&s->sock_mtx, NULL);
         return s;          return s;
 }  }
Line 132  ioInitSocket(int role, int type, int proto, const char Line 388  ioInitSocket(int role, int type, int proto, const char
 void  void
 ioCloseSocket(sock_t ** __restrict s)  ioCloseSocket(sock_t ** __restrict s)
 {  {
        sock_cli_t      *cli;        sock_cli_t *cli;
         int stat;
   
         if (s && *s) {          if (s && *s) {
                 pthread_mutex_lock(&(*s)->sock_mtx);                  pthread_mutex_lock(&(*s)->sock_mtx);
                 while ((cli = TAILQ_FIRST(&(*s)->sock_cli))) {                  while ((cli = TAILQ_FIRST(&(*s)->sock_cli))) {
                         TAILQ_REMOVE(&(*s)->sock_cli, cli, cli_node);                          TAILQ_REMOVE(&(*s)->sock_cli, cli, cli_node);
                        shutdown(cli->cli_fd, SHUT_RDWR);
                        close(cli->cli_fd);                        schedCancelby((*s)->sock_root, taskMAX, CRITERIA_ARG, cli, NULL);
                        AIT_FREE_VAL(&cli->cli_buf);
                         if ((*s)->sock_type == SOCK_STREAM) {
                                 shutdown(cli->cli_fd, SHUT_RDWR);
                                 close(cli->cli_fd);
                         }
                         AIT_FREE_VAL(&cli->cli_buf[1]);
                         AIT_FREE_VAL(&cli->cli_buf[0]);
 
                         if (cli->cli_pid > 0) {
                                 kill(cli->cli_pid, SIGTERM);
                                 while (waitpid(cli->cli_pid, &stat, WNOHANG) > 0) {
                                         usleep(1000);
                                         kill(cli->cli_pid, SIGTERM);
                                 }
                         }
 
                         e_free(cli);                          e_free(cli);
                 }                  }
                 pthread_mutex_unlock(&(*s)->sock_mtx);                  pthread_mutex_unlock(&(*s)->sock_mtx);
Line 150  ioCloseSocket(sock_t ** __restrict s) Line 422  ioCloseSocket(sock_t ** __restrict s)
   
                 AIT_FREE_VAL(&(*s)->sock_buf);                  AIT_FREE_VAL(&(*s)->sock_buf);
   
                   schedEnd(&(*s)->sock_root);
   
                 pthread_mutex_destroy(&(*s)->sock_mtx);                  pthread_mutex_destroy(&(*s)->sock_mtx);
                 e_free(*s);                  e_free(*s);
                 *s = NULL;                  *s = NULL;
Line 161  ioCloseSocket(sock_t ** __restrict s) Line 435  ioCloseSocket(sock_t ** __restrict s)
  *   *
  * @s = Socket   * @s = Socket
  * @arg = Server role = listen backlog queue and Client role = peer address   * @arg = Server role = listen backlog queue and Client role = peer address
    * @timeout = Socket timeout in sec (default -1 infinit)
  * return: -1 error or 0 ok   * return: -1 error or 0 ok
  */   */
 int  int
ioUpSocket(sock_t * __restrict s, void *arg)ioUpSocket(sock_t * __restrict s, void *arg, int timeout)
 {  {
         int ret = 0;          int ret = 0;
         sockaddr_t *peer = (sockaddr_t*) arg;          sockaddr_t *peer = (sockaddr_t*) arg;
Line 172  ioUpSocket(sock_t * __restrict s, void *arg) Line 447  ioUpSocket(sock_t * __restrict s, void *arg)
   
         if (!s || !arg)          if (!s || !arg)
                 return -1;                  return -1;
           else {
                   s->sock_timeout.tv_sec = timeout;
                   s->sock_timeout.tv_nsec = (timeout < 1) ? timeout : 0;
                   schedPolling(s->sock_root, &s->sock_timeout, NULL);
           }
   
         switch (s->sock_role) {          switch (s->sock_role) {
                 case IO_SOCK_ROLE_CLIENT:                  case IO_SOCK_ROLE_CLIENT:
Line 202  ioUpSocket(sock_t * __restrict s, void *arg) Line 482  ioUpSocket(sock_t * __restrict s, void *arg)
         return ret;          return ret;
 }  }
   
static void/*
thrCliClean(void *arg) * ioUpdTimerSocket() - Update timeout of socket
  *
  * @c = Client socket
  * @arg = Optional data argument
  * return:  none
  */
 void
 ioUpdTimerSocket(sock_cli_t * __restrict c, void *arg)
 {  {
        sock_cli_t *cli = (sock_cli_t*) arg;        sock_t *s;
        sock_t *s = (sock_t*) cli->cli_parent; 
   
        if (s->sock_type == SOCK_STREAM) {        if (!c)
                shutdown(cli->cli_fd, SHUT_RDWR);                return;
                close(cli->cli_fd);        else
        }                s = c->cli_parent;
        AIT_FREE_VAL(&cli->cli_buf); 
   
        pthread_mutex_lock(&s->sock_mtx);        schedCancelby(s->sock_root, taskTIMER, CRITERIA_DATLEN, (void*) c->cli_fd, NULL);
        TAILQ_REMOVE(&s->sock_cli, cli, cli_node);        schedTimer(s->sock_root, io_closeClient, c, s->sock_timeout, arg, c->cli_fd);
        pthread_mutex_unlock(&s->sock_mtx); 
 
        e_free(cli); 
 }  }
   
static void */*
io_thrCliWrapper(void *arg) * ioCloseClient() - Close client socket
  *
  * @c = Client socket
  * return: 0 ok or !=0 error
  */
 int
 ioCloseClient(sock_cli_t * __restrict c)
 {  {
        void *ret;        sock_t *s;
        sock_cli_t *cli = (sock_cli_t*) arg; 
   
        pthread_cleanup_push(thrCliClean, arg);        if (!c)
                 return -1;
         else
                 s = c->cli_parent;
   
        ret = cli->cli_func(cli);        return !schedEvent(s->sock_root, io_closeClient, c, 0, NULL, c->cli_fd);
 
        pthread_cleanup_pop(42); 
        pthread_exit(ret); 
 }  }
   
 /*  /*
 * ioAcceptSocket() - Accept clients * ioLoopSocket() - Start socket scheduler
  *   *
  * @s = Socket   * @s = Socket
 * @f = callback function for client handling * @rcb = Read callback
 * @arg = optional argument for callback function * return: -1 error or return result from scheduler
 * return: -1 error or 0 ok 
  */   */
 int  int
ioAcceptSocket(sock_t * __restrict s, sock_cb_t f, void *arg)ioLoopSocket(sock_t * __restrict s, sched_task_func_t rcb)
 {  {
        struct pollfd pfd[1];        if (!s || !rcb || s->sock_kill)
        socklen_t salen; 
        sockaddr_t sa; 
        int c, rlen; 
        sock_cli_t *cli; 
        u_char buf[BUFSIZ] = { [0 ... BUFSIZ - 1] = 0 }; 
 
        if (!s || s->sock_role == IO_SOCK_ROLE_CLIENT || !f) 
                 return -1;                  return -1;
   
        pfd->fd = s->sock_fd;        schedRead(s->sock_root, io_acceptClient, s, s->sock_fd, rcb, 0);
        pfd->events = POLLIN | POLLPRI;        return schedRun(s->sock_root, &s->sock_kill);
        do {}
                if (poll(pfd, 1, -1) < 1 ||  
                                pfd->revents & (POLLNVAL | POLLHUP | POLLERR)) { 
                        if (errno == EINTR) 
                                continue; 
                        LOGERR; 
                        return -1; 
                } else 
                        salen = sizeof sa.ss; 
   
                if (s->sock_type == SOCK_STREAM) {/*
                        if ((c = accept(s->sock_fd, &sa.sa, &salen)) == -1) { * ioBridgeProg2Socket() - Start socket scheduler and bridge program to socket
                                LOGERR; *
                                return -1; * @s = Socket
                        } * @prgname = Program name
                } else { * return: 0 ok or !=0 error
                        if ((rlen = recvfrom(s->sock_fd, buf, sizeof buf, MSG_PEEK,  */
                                                        &sa.sa, &salen)) == -1) {int
                                LOGERR;ioBridgeProg2Socket(sock_t * __restrict s, const char *prgname)
                                return -1;{
                        } else        if (!s || !prgname || s->sock_kill)
                                c = s->sock_fd;                return -1;
                } 
   
                cli = e_malloc(sizeof(sock_cli_t));        schedRead(s->sock_root, io_bridgeClient, s, s->sock_fd, (void*) prgname, 0);
                if (!cli) {        return schedRun(s->sock_root, &s->sock_kill);
                        io_SetErr(elwix_GetErrno(), "%s", elwix_GetError()); 
                        if (s->sock_type == SOCK_STREAM) 
                                close(c); 
                        return -1; 
                } else 
                        memset(cli, 0, sizeof(sock_cli_t)); 
 
                cli->cli_parent = s; 
                cli->cli_fd = c; 
                cli->cli_func = f; 
                cli->cli_arg = arg; 
                memcpy(&cli->cli_addr, &sa, sizeof cli->cli_addr); 
                AIT_SET_BUFSIZ(&cli->cli_buf, 0, AIT_LEN(&s->sock_buf)); 
 
                if (pthread_create(&cli->cli_tid, NULL, io_thrCliWrapper, cli) == -1) { 
                        LOGERR; 
                        if (s->sock_type == SOCK_STREAM) 
                                close(c); 
                        AIT_FREE_VAL(&cli->cli_buf); 
                        e_free(cli); 
                        return -1; 
                } else { 
                        pthread_detach(cli->cli_tid); 
                        pthread_mutex_lock(&s->sock_mtx); 
                        TAILQ_INSERT_TAIL(&s->sock_cli, cli, cli_node); 
                        pthread_mutex_unlock(&s->sock_mtx); 
                } 
        } while (42); 
 
        return 0; 
 }  }

Removed from v.1.4  
changed lines
  Added in v.1.5


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>