File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / curl / docs / examples / ephiperfifo.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Wed Jun 3 10:01:15 2020 UTC (4 years, 10 months ago) by misho
Branches: curl, MAIN
CVS tags: v7_70_0p4, HEAD
curl

    1: /***************************************************************************
    2:  *                                  _   _ ____  _
    3:  *  Project                     ___| | | |  _ \| |
    4:  *                             / __| | | | |_) | |
    5:  *                            | (__| |_| |  _ <| |___
    6:  *                             \___|\___/|_| \_\_____|
    7:  *
    8:  * Copyright (C) 1998 - 2019, Daniel Stenberg, <daniel@haxx.se>, et al.
    9:  *
   10:  * This software is licensed as described in the file COPYING, which
   11:  * you should have received as part of this distribution. The terms
   12:  * are also available at https://curl.haxx.se/docs/copyright.html.
   13:  *
   14:  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
   15:  * copies of the Software, and permit persons to whom the Software is
   16:  * furnished to do so, under the terms of the COPYING file.
   17:  *
   18:  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
   19:  * KIND, either express or implied.
   20:  *
   21:  ***************************************************************************/
   22: /* <DESC>
   23:  * multi socket API usage with epoll and timerfd
   24:  * </DESC>
   25:  */
   26: /* Example application source code using the multi socket interface to
   27:  * download many files at once.
   28:  *
   29:  * This example features the same basic functionality as hiperfifo.c does,
   30:  * but this uses epoll and timerfd instead of libevent.
   31:  *
   32:  * Written by Jeff Pohlmeyer, converted to use epoll by Josh Bialkowski
   33: 
   34: Requires a linux system with epoll
   35: 
   36: When running, the program creates the named pipe "hiper.fifo"
   37: 
   38: Whenever there is input into the fifo, the program reads the input as a list
   39: of URL's and creates some new easy handles to fetch each URL via the
   40: curl_multi "hiper" API.
   41: 
   42: 
   43: Thus, you can try a single URL:
   44:   % echo http://www.yahoo.com > hiper.fifo
   45: 
   46: Or a whole bunch of them:
   47:   % cat my-url-list > hiper.fifo
   48: 
   49: The fifo buffer is handled almost instantly, so you can even add more URL's
   50: while the previous requests are still being downloaded.
   51: 
   52: Note:
   53:   For the sake of simplicity, URL length is limited to 1023 char's !
   54: 
   55: This is purely a demo app, all retrieved data is simply discarded by the write
   56: callback.
   57: 
   58: */
   59: 
   60: #include <errno.h>
   61: #include <fcntl.h>
   62: #include <signal.h>
   63: #include <stdio.h>
   64: #include <stdlib.h>
   65: #include <string.h>
   66: #include <sys/epoll.h>
   67: #include <sys/stat.h>
   68: #include <sys/time.h>
   69: #include <sys/timerfd.h>
   70: #include <sys/types.h>
   71: #include <time.h>
   72: #include <unistd.h>
   73: 
   74: #include <curl/curl.h>
   75: 
   76: #define MSG_OUT stdout /* Send info to stdout, change to stderr if you want */
   77: 
   78: 
   79: /* Global information, common to all connections */
   80: typedef struct _GlobalInfo
   81: {
   82:   int epfd;    /* epoll filedescriptor */
   83:   int tfd;     /* timer filedescriptor */
   84:   int fifofd;  /* fifo filedescriptor */
   85:   CURLM *multi;
   86:   int still_running;
   87:   FILE *input;
   88: } GlobalInfo;
   89: 
   90: 
   91: /* Information associated with a specific easy handle */
   92: typedef struct _ConnInfo
   93: {
   94:   CURL *easy;
   95:   char *url;
   96:   GlobalInfo *global;
   97:   char error[CURL_ERROR_SIZE];
   98: } ConnInfo;
   99: 
  100: 
  101: /* Information associated with a specific socket */
  102: typedef struct _SockInfo
  103: {
  104:   curl_socket_t sockfd;
  105:   CURL *easy;
  106:   int action;
  107:   long timeout;
  108:   GlobalInfo *global;
  109: } SockInfo;
  110: 
  111: #define mycase(code) \
  112:   case code: s = __STRING(code)
  113: 
  114: /* Die if we get a bad CURLMcode somewhere */
  115: static void mcode_or_die(const char *where, CURLMcode code)
  116: {
  117:   if(CURLM_OK != code) {
  118:     const char *s;
  119:     switch(code) {
  120:       mycase(CURLM_BAD_HANDLE); break;
  121:       mycase(CURLM_BAD_EASY_HANDLE); break;
  122:       mycase(CURLM_OUT_OF_MEMORY); break;
  123:       mycase(CURLM_INTERNAL_ERROR); break;
  124:       mycase(CURLM_UNKNOWN_OPTION); break;
  125:       mycase(CURLM_LAST); break;
  126:       default: s = "CURLM_unknown"; break;
  127:       mycase(CURLM_BAD_SOCKET);
  128:       fprintf(MSG_OUT, "ERROR: %s returns %s\n", where, s);
  129:       /* ignore this error */
  130:       return;
  131:     }
  132:     fprintf(MSG_OUT, "ERROR: %s returns %s\n", where, s);
  133:     exit(code);
  134:   }
  135: }
  136: 
  137: static void timer_cb(GlobalInfo* g, int revents);
  138: 
  139: /* Update the timer after curl_multi library does it's thing. Curl will
  140:  * inform us through this callback what it wants the new timeout to be,
  141:  * after it does some work. */
  142: static int multi_timer_cb(CURLM *multi, long timeout_ms, GlobalInfo *g)
  143: {
  144:   struct itimerspec its;
  145: 
  146:   fprintf(MSG_OUT, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms);
  147: 
  148:   if(timeout_ms > 0) {
  149:     its.it_interval.tv_sec = 1;
  150:     its.it_interval.tv_nsec = 0;
  151:     its.it_value.tv_sec = timeout_ms / 1000;
  152:     its.it_value.tv_nsec = (timeout_ms % 1000) * 1000 * 1000;
  153:   }
  154:   else if(timeout_ms == 0) {
  155:     /* libcurl wants us to timeout now, however setting both fields of
  156:      * new_value.it_value to zero disarms the timer. The closest we can
  157:      * do is to schedule the timer to fire in 1 ns. */
  158:     its.it_interval.tv_sec = 1;
  159:     its.it_interval.tv_nsec = 0;
  160:     its.it_value.tv_sec = 0;
  161:     its.it_value.tv_nsec = 1;
  162:   }
  163:   else {
  164:     memset(&its, 0, sizeof(struct itimerspec));
  165:   }
  166: 
  167:   timerfd_settime(g->tfd, /*flags=*/0, &its, NULL);
  168:   return 0;
  169: }
  170: 
  171: 
  172: /* Check for completed transfers, and remove their easy handles */
  173: static void check_multi_info(GlobalInfo *g)
  174: {
  175:   char *eff_url;
  176:   CURLMsg *msg;
  177:   int msgs_left;
  178:   ConnInfo *conn;
  179:   CURL *easy;
  180:   CURLcode res;
  181: 
  182:   fprintf(MSG_OUT, "REMAINING: %d\n", g->still_running);
  183:   while((msg = curl_multi_info_read(g->multi, &msgs_left))) {
  184:     if(msg->msg == CURLMSG_DONE) {
  185:       easy = msg->easy_handle;
  186:       res = msg->data.result;
  187:       curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
  188:       curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
  189:       fprintf(MSG_OUT, "DONE: %s => (%d) %s\n", eff_url, res, conn->error);
  190:       curl_multi_remove_handle(g->multi, easy);
  191:       free(conn->url);
  192:       curl_easy_cleanup(easy);
  193:       free(conn);
  194:     }
  195:   }
  196: }
  197: 
  198: /* Called by libevent when we get action on a multi socket filedescriptor*/
  199: static void event_cb(GlobalInfo *g, int fd, int revents)
  200: {
  201:   CURLMcode rc;
  202:   struct itimerspec its;
  203: 
  204:   int action = ((revents & EPOLLIN) ? CURL_CSELECT_IN : 0) |
  205:                ((revents & EPOLLOUT) ? CURL_CSELECT_OUT : 0);
  206: 
  207:   rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
  208:   mcode_or_die("event_cb: curl_multi_socket_action", rc);
  209: 
  210:   check_multi_info(g);
  211:   if(g->still_running <= 0) {
  212:     fprintf(MSG_OUT, "last transfer done, kill timeout\n");
  213:     memset(&its, 0, sizeof(struct itimerspec));
  214:     timerfd_settime(g->tfd, 0, &its, NULL);
  215:   }
  216: }
  217: 
  218: /* Called by main loop when our timeout expires */
  219: static void timer_cb(GlobalInfo* g, int revents)
  220: {
  221:   CURLMcode rc;
  222:   uint64_t count = 0;
  223:   ssize_t err = 0;
  224: 
  225:   err = read(g->tfd, &count, sizeof(uint64_t));
  226:   if(err == -1) {
  227:     /* Note that we may call the timer callback even if the timerfd isn't
  228:      * readable. It's possible that there are multiple events stored in the
  229:      * epoll buffer (i.e. the timer may have fired multiple times). The
  230:      * event count is cleared after the first call so future events in the
  231:      * epoll buffer will fail to read from the timer. */
  232:     if(errno == EAGAIN) {
  233:       fprintf(MSG_OUT, "EAGAIN on tfd %d\n", g->tfd);
  234:       return;
  235:     }
  236:   }
  237:   if(err != sizeof(uint64_t)) {
  238:     fprintf(stderr, "read(tfd) == %ld", err);
  239:     perror("read(tfd)");
  240:   }
  241: 
  242:   rc = curl_multi_socket_action(g->multi,
  243:                                   CURL_SOCKET_TIMEOUT, 0, &g->still_running);
  244:   mcode_or_die("timer_cb: curl_multi_socket_action", rc);
  245:   check_multi_info(g);
  246: }
  247: 
  248: 
  249: 
  250: /* Clean up the SockInfo structure */
  251: static void remsock(SockInfo *f, GlobalInfo* g)
  252: {
  253:   if(f) {
  254:     if(f->sockfd) {
  255:       if(epoll_ctl(g->epfd, EPOLL_CTL_DEL, f->sockfd, NULL))
  256:         fprintf(stderr, "EPOLL_CTL_DEL failed for fd: %d : %s\n",
  257:                 f->sockfd, strerror(errno));
  258:     }
  259:     free(f);
  260:   }
  261: }
  262: 
  263: 
  264: 
  265: /* Assign information to a SockInfo structure */
  266: static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act,
  267:                     GlobalInfo *g)
  268: {
  269:   struct epoll_event ev;
  270:   int kind = ((act & CURL_POLL_IN) ? EPOLLIN : 0) |
  271:              ((act & CURL_POLL_OUT) ? EPOLLOUT : 0);
  272: 
  273:   if(f->sockfd) {
  274:     if(epoll_ctl(g->epfd, EPOLL_CTL_DEL, f->sockfd, NULL))
  275:       fprintf(stderr, "EPOLL_CTL_DEL failed for fd: %d : %s\n",
  276:               f->sockfd, strerror(errno));
  277:   }
  278: 
  279:   f->sockfd = s;
  280:   f->action = act;
  281:   f->easy = e;
  282: 
  283:   ev.events = kind;
  284:   ev.data.fd = s;
  285:   if(epoll_ctl(g->epfd, EPOLL_CTL_ADD, s, &ev))
  286:     fprintf(stderr, "EPOLL_CTL_ADD failed for fd: %d : %s\n",
  287:             s, strerror(errno));
  288: }
  289: 
  290: 
  291: 
  292: /* Initialize a new SockInfo structure */
  293: static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
  294: {
  295:   SockInfo *fdp = (SockInfo*)calloc(sizeof(SockInfo), 1);
  296: 
  297:   fdp->global = g;
  298:   setsock(fdp, s, easy, action, g);
  299:   curl_multi_assign(g->multi, s, fdp);
  300: }
  301: 
  302: /* CURLMOPT_SOCKETFUNCTION */
  303: static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
  304: {
  305:   GlobalInfo *g = (GlobalInfo*) cbp;
  306:   SockInfo *fdp = (SockInfo*) sockp;
  307:   const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" };
  308: 
  309:   fprintf(MSG_OUT,
  310:           "socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
  311:   if(what == CURL_POLL_REMOVE) {
  312:     fprintf(MSG_OUT, "\n");
  313:     remsock(fdp, g);
  314:   }
  315:   else {
  316:     if(!fdp) {
  317:       fprintf(MSG_OUT, "Adding data: %s\n", whatstr[what]);
  318:       addsock(s, e, what, g);
  319:     }
  320:     else {
  321:       fprintf(MSG_OUT,
  322:               "Changing action from %s to %s\n",
  323:               whatstr[fdp->action], whatstr[what]);
  324:       setsock(fdp, s, e, what, g);
  325:     }
  326:   }
  327:   return 0;
  328: }
  329: 
  330: 
  331: 
  332: /* CURLOPT_WRITEFUNCTION */
  333: static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
  334: {
  335:   (void)ptr;
  336:   (void)data;
  337:   return size * nmemb;
  338: }
  339: 
  340: 
  341: /* CURLOPT_PROGRESSFUNCTION */
  342: static int prog_cb(void *p, double dltotal, double dlnow, double ult,
  343:                    double uln)
  344: {
  345:   ConnInfo *conn = (ConnInfo *)p;
  346:   (void)ult;
  347:   (void)uln;
  348: 
  349:   fprintf(MSG_OUT, "Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal);
  350:   return 0;
  351: }
  352: 
  353: 
  354: /* Create a new easy handle, and add it to the global curl_multi */
  355: static void new_conn(char *url, GlobalInfo *g)
  356: {
  357:   ConnInfo *conn;
  358:   CURLMcode rc;
  359: 
  360:   conn = (ConnInfo*)calloc(1, sizeof(ConnInfo));
  361:   conn->error[0]='\0';
  362: 
  363:   conn->easy = curl_easy_init();
  364:   if(!conn->easy) {
  365:     fprintf(MSG_OUT, "curl_easy_init() failed, exiting!\n");
  366:     exit(2);
  367:   }
  368:   conn->global = g;
  369:   conn->url = strdup(url);
  370:   curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
  371:   curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
  372:   curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, conn);
  373:   curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, 1L);
  374:   curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
  375:   curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
  376:   curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, 0L);
  377:   curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb);
  378:   curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
  379:   curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
  380:   curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 3L);
  381:   curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 10L);
  382:   fprintf(MSG_OUT,
  383:           "Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url);
  384:   rc = curl_multi_add_handle(g->multi, conn->easy);
  385:   mcode_or_die("new_conn: curl_multi_add_handle", rc);
  386: 
  387:   /* note that the add_handle() will set a time-out to trigger very soon so
  388:      that the necessary socket_action() call will be called by this app */
  389: }
  390: 
  391: /* This gets called whenever data is received from the fifo */
  392: static void fifo_cb(GlobalInfo* g, int revents)
  393: {
  394:   char s[1024];
  395:   long int rv = 0;
  396:   int n = 0;
  397: 
  398:   do {
  399:     s[0]='\0';
  400:     rv = fscanf(g->input, "%1023s%n", s, &n);
  401:     s[n]='\0';
  402:     if(n && s[0]) {
  403:       new_conn(s, g); /* if we read a URL, go get it! */
  404:     }
  405:     else
  406:       break;
  407:   } while(rv != EOF);
  408: }
  409: 
  410: /* Create a named pipe and tell libevent to monitor it */
  411: static const char *fifo = "hiper.fifo";
  412: static int init_fifo(GlobalInfo *g)
  413: {
  414:   struct stat st;
  415:   curl_socket_t sockfd;
  416:   struct epoll_event epev;
  417: 
  418:   fprintf(MSG_OUT, "Creating named pipe \"%s\"\n", fifo);
  419:   if(lstat (fifo, &st) == 0) {
  420:     if((st.st_mode & S_IFMT) == S_IFREG) {
  421:       errno = EEXIST;
  422:       perror("lstat");
  423:       exit(1);
  424:     }
  425:   }
  426:   unlink(fifo);
  427:   if(mkfifo (fifo, 0600) == -1) {
  428:     perror("mkfifo");
  429:     exit(1);
  430:   }
  431:   sockfd = open(fifo, O_RDWR | O_NONBLOCK, 0);
  432:   if(sockfd == -1) {
  433:     perror("open");
  434:     exit(1);
  435:   }
  436: 
  437:   g->fifofd = sockfd;
  438:   g->input = fdopen(sockfd, "r");
  439: 
  440:   epev.events = EPOLLIN;
  441:   epev.data.fd = sockfd;
  442:   epoll_ctl(g->epfd, EPOLL_CTL_ADD, sockfd, &epev);
  443: 
  444:   fprintf(MSG_OUT, "Now, pipe some URL's into > %s\n", fifo);
  445:   return 0;
  446: }
  447: 
  448: static void clean_fifo(GlobalInfo *g)
  449: {
  450:     epoll_ctl(g->epfd, EPOLL_CTL_DEL, g->fifofd, NULL);
  451:     fclose(g->input);
  452:     unlink(fifo);
  453: }
  454: 
  455: 
  456: int g_should_exit_ = 0;
  457: 
  458: void SignalHandler(int signo)
  459: {
  460:   if(signo == SIGINT) {
  461:     g_should_exit_ = 1;
  462:   }
  463: }
  464: 
  465: int main(int argc, char **argv)
  466: {
  467:   GlobalInfo g;
  468:   struct itimerspec its;
  469:   struct epoll_event ev;
  470:   struct epoll_event events[10];
  471:   (void)argc;
  472:   (void)argv;
  473: 
  474:   g_should_exit_ = 0;
  475:   signal(SIGINT, SignalHandler);
  476: 
  477:   memset(&g, 0, sizeof(GlobalInfo));
  478:   g.epfd = epoll_create1(EPOLL_CLOEXEC);
  479:   if(g.epfd == -1) {
  480:     perror("epoll_create1 failed");
  481:     exit(1);
  482:   }
  483: 
  484:   g.tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
  485:   if(g.tfd == -1) {
  486:     perror("timerfd_create failed");
  487:     exit(1);
  488:   }
  489: 
  490:   memset(&its, 0, sizeof(struct itimerspec));
  491:   its.it_interval.tv_sec = 1;
  492:   its.it_value.tv_sec = 1;
  493:   timerfd_settime(g.tfd, 0, &its, NULL);
  494: 
  495:   ev.events = EPOLLIN;
  496:   ev.data.fd = g.tfd;
  497:   epoll_ctl(g.epfd, EPOLL_CTL_ADD, g.tfd, &ev);
  498: 
  499:   init_fifo(&g);
  500:   g.multi = curl_multi_init();
  501: 
  502:   /* setup the generic multi interface options we want */
  503:   curl_multi_setopt(g.multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
  504:   curl_multi_setopt(g.multi, CURLMOPT_SOCKETDATA, &g);
  505:   curl_multi_setopt(g.multi, CURLMOPT_TIMERFUNCTION, multi_timer_cb);
  506:   curl_multi_setopt(g.multi, CURLMOPT_TIMERDATA, &g);
  507: 
  508:   /* we don't call any curl_multi_socket*() function yet as we have no handles
  509:      added! */
  510: 
  511:   fprintf(MSG_OUT, "Entering wait loop\n");
  512:   fflush(MSG_OUT);
  513:   while(!g_should_exit_) {
  514:     int idx;
  515:     int err = epoll_wait(g.epfd, events,
  516:                          sizeof(events)/sizeof(struct epoll_event), 10000);
  517:     if(err == -1) {
  518:       if(errno == EINTR) {
  519:         fprintf(MSG_OUT, "note: wait interrupted\n");
  520:         continue;
  521:       }
  522:       else {
  523:         perror("epoll_wait");
  524:         exit(1);
  525:       }
  526:     }
  527: 
  528:     for(idx = 0; idx < err; ++idx) {
  529:       if(events[idx].data.fd == g.fifofd) {
  530:         fifo_cb(&g, events[idx].events);
  531:       }
  532:       else if(events[idx].data.fd == g.tfd) {
  533:         timer_cb(&g, events[idx].events);
  534:       }
  535:       else {
  536:         event_cb(&g, events[idx].data.fd, events[idx].events);
  537:       }
  538:     }
  539:   }
  540: 
  541:   fprintf(MSG_OUT, "Exiting normally.\n");
  542:   fflush(MSG_OUT);
  543: 
  544:   curl_multi_cleanup(g.multi);
  545:   clean_fifo(&g);
  546:   return 0;
  547: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>