Diff for /libaitrpc/src/srv.c between versions 1.21.2.5 and 1.23.6.2

version 1.21.2.5, 2013/11/15 09:05:19 version 1.23.6.2, 2014/11/17 23:51:26
Line 12  terms: Line 12  terms:
 All of the documentation and software included in the ELWIX and AITNET  All of the documentation and software included in the ELWIX and AITNET
 Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>  Releases is copyrighted by ELWIX - Sofia/Bulgaria <info@elwix.org>
   
Copyright 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013Copyright 2004 - 2014
         by Michael Pounov <misho@elwix.org>.  All rights reserved.          by Michael Pounov <misho@elwix.org>.  All rights reserved.
   
 Redistribution and use in source and binary forms, with or without  Redistribution and use in source and binary forms, with or without
Line 59  static void *txUDPPacket(sched_task_t *); Line 59  static void *txUDPPacket(sched_task_t *);
   
 /* SOCK_RAW */  /* SOCK_RAW */
   
static sched_task_func_t cbProto[SOCK_RAW + 1][4] = {/* SOCK_BPF */
 static void *rxBPFPacket(sched_task_t *);
 static void *txBPFPacket(sched_task_t *);
 
 static sched_task_func_t cbProto[SOCK_BPF + 1][4] = {
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */          { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */          { acceptClients, closeClient, rxPacket, txPacket },     /* SOCK_STREAM */
         { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */          { rxUDPPacket, freeClient, rxUDPPacket, txUDPPacket },  /* SOCK_DGRAM */
        { NULL, NULL, NULL, NULL }                              /* SOCK_RAW */        { NULL, NULL, NULL, NULL },                                /* SOCK_RAW */
         { rxBPFPacket, freeClient, rxBPFPacket, txBPFPacket }   /* SOCK_BPF */
 };  };
   
   /* Global Signal Argument when kqueue support disabled */
   
   static volatile uintptr_t _glSigArg = 0;
   
   
 void  void
 rpc_freeCli(rpc_cli_t * __restrict c)  rpc_freeCli(rpc_cli_t * __restrict c)
 {  {
Line 223  txPacket(sched_task_t *task) Line 232  txPacket(sched_task_t *task)
                         if (ret)                          if (ret)
                                 LOGERR;                                  LOGERR;
                         else                          else
                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Server not respond");                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         /* close connection */                          /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],                           schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);                                          TASK_ARG(task), 0, NULL, 0);
Line 346  rxPacket(sched_task_t *task) Line 355  rxPacket(sched_task_t *task)
                         if (rlen)                          if (rlen)
                                 LOGERR;                                  LOGERR;
                         else                          else
                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Server not respond");                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],                           schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);                                          TASK_ARG(task), 0, NULL, 0);
                         return NULL;                          return NULL;
Line 502  txUDPPacket(sched_task_t *task) Line 511  txUDPPacket(sched_task_t *task)
                         if (ret)                          if (ret)
                                 LOGERR;                                  LOGERR;
                         else                          else
                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Server not respond");                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         /* close connection */                          /* close connection */
                         schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT],                           schedEvent(TASK_ROOT(task), cbProto[s->srv_proto][CB_CLOSECLIENT], 
                                         TASK_ARG(task), 0, NULL, 0);                                          TASK_ARG(task), 0, NULL, 0);
Line 575  rxUDPPacket(sched_task_t *task) Line 584  rxUDPPacket(sched_task_t *task)
                         if (rlen)                          if (rlen)
                                 LOGERR;                                  LOGERR;
                         else                          else
                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Server not respond");                                rpc_SetErr(ETIMEDOUT, "Timeout reached! Client not respond");
                         schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT],                           schedEvent(TASK_ROOT(task), cbProto[srv->srv_proto][CB_CLOSECLIENT], 
                                         c, 0, NULL, 0);                                          c, 0, NULL, 0);
                         return NULL;                          return NULL;
Line 760  end: Line 769  end:
 static void *  static void *
 flushBLOB(sched_task_t *task)  flushBLOB(sched_task_t *task)
 {  {
        rpc_srv_t *srv = TASK_ARG(task);        uintptr_t sigArg = atomic_load_acq_ptr(&_glSigArg);
         rpc_srv_t *srv = sigArg ? (void*) sigArg : TASK_ARG(task);
         rpc_blob_t *b, *tmp;          rpc_blob_t *b, *tmp;
   
         TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {          TAILQ_FOREACH_SAFE(b, &srv->srv_blob.blobs, blob_node, tmp) {
Line 770  flushBLOB(sched_task_t *task) Line 780  flushBLOB(sched_task_t *task)
                 e_free(b);                  e_free(b);
         }          }
   
        schedSignalSelf(task);        if (!schedSignalSelf(task)) {
                 /* disabled kqueue support in libaitsched */
                 struct sigaction sa;
 
                 memset(&sa, 0, sizeof sa);
                 sigemptyset(&sa.sa_mask);
                 sa.sa_handler = (void (*)(int)) flushBLOB;
                 sa.sa_flags = SA_RESTART | SA_RESETHAND;
                 sigaction(SIGFBLOB, &sa, NULL);
         }
 
         return NULL;          return NULL;
 }  }
   
Line 984  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv) Line 1004  rpc_srv_loopBLOBServer(rpc_srv_t * __restrict srv)
                 return -1;                  return -1;
         }          }
   
        schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0);        if (!schedSignal(srv->srv_blob.root, flushBLOB, srv, SIGFBLOB, NULL, 0)) {
                 /* disabled kqueue support in libaitsched */
                 struct sigaction sa;
 
                 atomic_store_rel_ptr(&_glSigArg, (uintptr_t) srv);
 
                 memset(&sa, 0, sizeof sa);
                 sigemptyset(&sa.sa_mask);
                 sa.sa_handler = (void (*)(int)) flushBLOB;
                 sa.sa_flags = SA_RESTART | SA_RESETHAND;
                 sigaction(SIGFBLOB, &sa, NULL);
         }
 
         if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv,           if (!schedRead(srv->srv_blob.root, acceptBLOBClients, srv, 
                                 srv->srv_blob.server.cli_sock, NULL, 0)) {                                  srv->srv_blob.server.cli_sock, NULL, 0)) {
                 rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());                  rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
Line 1262  rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta Line 1294  rpc_srv_execCall(rpc_cli_t * __restrict cli, struct ta
   
         func = AIT_GET_LIKE(&funcname, rpc_callback_t);          func = AIT_GET_LIKE(&funcname, rpc_callback_t);
         return func(cli, rpc, args);          return func(cli, rpc, args);
   }
   
   
   /*
    * rpc_srv_initServer2() - Init & create layer2 RPC Server
    *
    * @InstID = Instance for authentication & recognition
    * @concurentClients = Concurent clients at same time to this server
    * @netBuf = Network buffer length (min:512 bytes), if =0 == BUFSIZ (also meaning max RPC packet)
    * @csIface = Interface name for bind server, if NULL first interface on host
    * return: NULL == error or !=NULL bind and created RPC server instance
    */
   rpc_srv_t *
   rpc_srv_initServer2(u_char InstID, int concurentClients, int netBuf, const char *csIface)
   {
           int n = 1;
           rpc_srv_t *srv = NULL;
           sockaddr_t sa = E_SOCKADDR_INIT;
           char szIface[64], szStr[STRSIZ];
           register int i;
           struct ifreq ifr;
   
           if (!concurentClients) {
                   rpc_SetErr(EINVAL, "Invalid parameters can`t init RPC server");
                   return NULL;
           }
           if (!csIface) {
                   if (e_get1stiface(szIface, sizeof szIface))
                           return NULL;
           } else
                   strlcpy(szIface, csIface, sizeof szIface);
           if (e_getifacebyname(szIface, &sa))
                   return NULL;
   
   #ifdef HAVE_SRANDOMDEV
           srandomdev();
   #else
           time_t tim;
   
           srandom((time(&tim) ^ getpid()));
   #endif
   
           srv = e_malloc(sizeof(rpc_srv_t));
           if (!srv) {
                   LOGERR;
                   return NULL;
           } else
                   memset(srv, 0, sizeof(rpc_srv_t));
   
           srv->srv_proto = SOCK_BPF;
           srv->srv_netbuf = netBuf;
           srv->srv_session.sess_version = RPC_VERSION;
           srv->srv_session.sess_instance = InstID;
   
           srv->srv_server.cli_parent = srv;
           memcpy(&srv->srv_server.cli_sa, &sa, sizeof srv->srv_server.cli_sa);
   
           /* init functions */
           pthread_mutex_init(&srv->srv_funcs.mtx, NULL);
           SLIST_INIT(&srv->srv_funcs);
           AVL_INIT(&srv->srv_funcs);
   
           /* init scheduler */
           srv->srv_root = schedBegin();
           if (!srv->srv_root) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
           /* init pool for clients */
           srv->srv_clients = array_Init(concurentClients);
           if (!srv->srv_clients) {
                   rpc_SetErr(elwix_GetErrno(), "%s", elwix_GetError());
                   schedEnd(&srv->srv_root);
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
           /* create server handler */
           for (i = 0; i < 10; i++) {
                   memset(szStr, 0, sizeof szStr);
                   snprintf(szStr, sizeof szStr, "/dev/bpf%d", i);
                   srv->srv_server.cli_sock = open(szStr, O_RDWR);
                   if (srv->srv_server.cli_sock > STDERR_FILENO)
                           break;
           }
           if (srv->srv_server.cli_sock < 3) {
                   LOGERR;
                   array_Destroy(&srv->srv_clients);
                   schedEnd(&srv->srv_root);
                   pthread_mutex_destroy(&srv->srv_funcs.mtx);
                   e_free(srv);
                   return NULL;
           }
   
           if (ioctl(srv->srv_server.cli_sock, BIOCIMMEDIATE, &n) == -1) {
                   LOGERR;
                   goto err;
           }
           n = (netBuf < RPC_MIN_BUFSIZ) ? getpagesize() : E_ALIGN(netBuf, 2);
           if (ioctl(srv->srv_server.cli_sock, BIOCSBLEN, &n) == -1) {
                   LOGERR;
                   goto err;
           } else
                   srv->srv_netbuf = n;
   
           memset(&ifr, 0, sizeof ifr);
           strlcpy(ifr.ifr_name, szIface, sizeof ifr.ifr_name);
           if (ioctl(srv->srv_server.cli_sock, BIOCSETIF, &ifr) == -1) {
                   LOGERR;
                   goto err;
           } else
                   fcntl(srv->srv_server.cli_sock, F_SETFL, 
                                   fcntl(srv->srv_server.cli_sock, F_GETFL) | O_NONBLOCK);
   
           rpc_register_srvPing(srv);
   
           return srv;
   err:    /* error condition */
           close(srv->srv_server.cli_sock);
           array_Destroy(&srv->srv_clients);
           schedEnd(&srv->srv_root);
           pthread_mutex_destroy(&srv->srv_funcs.mtx);
           e_free(srv);
           return NULL;
   }
   
   /*
    * rpc_srv_loopServer2() - Execute Main layer2 server loop and wait for clients requests
    *
    * @srv = RPC Server instance
    * return: -1 error or 0 ok, infinite loop ...
    */
   int
   rpc_srv_loopServer2(rpc_srv_t * __restrict srv)
   {
           rpc_cli_t *c;
           register int i;
           rpc_func_t *f;
           struct timespec ts = { RPC_SCHED_POLLING, 0 };
   
           if (!srv) {
                   rpc_SetErr(EINVAL, "Invalid parameter can`t start RPC server");
                   return -1;
           }
   
           if (!schedRead(srv->srv_root, cbProto[srv->srv_proto][CB_ACCEPTCLIENT], srv, 
                                   srv->srv_server.cli_sock, NULL, 0)) {
                   rpc_SetErr(sched_GetErrno(), "%s", sched_GetError());
                   return -1;
           }
   
           schedPolling(srv->srv_root, &ts, NULL);
           /* main rpc loop */
           schedRun(srv->srv_root, &srv->srv_kill);
   
           /* close all clients connections & server socket */
           for (i = 0; i < array_Size(srv->srv_clients); i++) {
                   c = array(srv->srv_clients, i, rpc_cli_t*);
                   if (c) {
                           schedCancelby(srv->srv_root, taskMAX, CRITERIA_ARG, c, NULL);
                           ait_freeVars(&RPC_RETVARS(c));
                           AIT_FREE_VAL(&c->cli_buf);
                   }
                   array_Del(srv->srv_clients, i, 42);
           }
           array_Destroy(&srv->srv_clients);
   
           close(srv->srv_server.cli_sock);
   
           /* detach exported calls */
           RPC_FUNCS_LOCK(&srv->srv_funcs);
           while ((f = SLIST_FIRST(&srv->srv_funcs))) {
                   SLIST_REMOVE_HEAD(&srv->srv_funcs, func_next);
   
                   AIT_FREE_VAL(&f->func_name);
                   e_free(f);
           }
           srv->srv_funcs.avlh_root = NULL;
           RPC_FUNCS_UNLOCK(&srv->srv_funcs);
   
           return 0;
 }  }

Removed from v.1.21.2.5  
changed lines
  Added in v.1.23.6.2


FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>