Annotation of embedaddon/curl/docs/examples/ephiperfifo.c, revision 1.1.1.1

1.1       misho       1: /***************************************************************************
                      2:  *                                  _   _ ____  _
                      3:  *  Project                     ___| | | |  _ \| |
                      4:  *                             / __| | | | |_) | |
                      5:  *                            | (__| |_| |  _ <| |___
                      6:  *                             \___|\___/|_| \_\_____|
                      7:  *
                      8:  * Copyright (C) 1998 - 2019, Daniel Stenberg, <daniel@haxx.se>, et al.
                      9:  *
                     10:  * This software is licensed as described in the file COPYING, which
                     11:  * you should have received as part of this distribution. The terms
                     12:  * are also available at https://curl.haxx.se/docs/copyright.html.
                     13:  *
                     14:  * You may opt to use, copy, modify, merge, publish, distribute and/or sell
                     15:  * copies of the Software, and permit persons to whom the Software is
                     16:  * furnished to do so, under the terms of the COPYING file.
                     17:  *
                     18:  * This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY
                     19:  * KIND, either express or implied.
                     20:  *
                     21:  ***************************************************************************/
                     22: /* <DESC>
                     23:  * multi socket API usage with epoll and timerfd
                     24:  * </DESC>
                     25:  */
                     26: /* Example application source code using the multi socket interface to
                     27:  * download many files at once.
                     28:  *
                     29:  * This example features the same basic functionality as hiperfifo.c does,
                     30:  * but this uses epoll and timerfd instead of libevent.
                     31:  *
                     32:  * Written by Jeff Pohlmeyer, converted to use epoll by Josh Bialkowski
                     33: 
                     34: Requires a linux system with epoll
                     35: 
                     36: When running, the program creates the named pipe "hiper.fifo"
                     37: 
                     38: Whenever there is input into the fifo, the program reads the input as a list
                     39: of URL's and creates some new easy handles to fetch each URL via the
                     40: curl_multi "hiper" API.
                     41: 
                     42: 
                     43: Thus, you can try a single URL:
                     44:   % echo http://www.yahoo.com > hiper.fifo
                     45: 
                     46: Or a whole bunch of them:
                     47:   % cat my-url-list > hiper.fifo
                     48: 
                     49: The fifo buffer is handled almost instantly, so you can even add more URL's
                     50: while the previous requests are still being downloaded.
                     51: 
                     52: Note:
                     53:   For the sake of simplicity, URL length is limited to 1023 char's !
                     54: 
                     55: This is purely a demo app, all retrieved data is simply discarded by the write
                     56: callback.
                     57: 
                     58: */
                     59: 
                     60: #include <errno.h>
                     61: #include <fcntl.h>
                     62: #include <signal.h>
                     63: #include <stdio.h>
                     64: #include <stdlib.h>
                     65: #include <string.h>
                     66: #include <sys/epoll.h>
                     67: #include <sys/stat.h>
                     68: #include <sys/time.h>
                     69: #include <sys/timerfd.h>
                     70: #include <sys/types.h>
                     71: #include <time.h>
                     72: #include <unistd.h>
                     73: 
                     74: #include <curl/curl.h>
                     75: 
                     76: #define MSG_OUT stdout /* Send info to stdout, change to stderr if you want */
                     77: 
                     78: 
                     79: /* Global information, common to all connections */
                     80: typedef struct _GlobalInfo
                     81: {
                     82:   int epfd;    /* epoll filedescriptor */
                     83:   int tfd;     /* timer filedescriptor */
                     84:   int fifofd;  /* fifo filedescriptor */
                     85:   CURLM *multi;
                     86:   int still_running;
                     87:   FILE *input;
                     88: } GlobalInfo;
                     89: 
                     90: 
                     91: /* Information associated with a specific easy handle */
                     92: typedef struct _ConnInfo
                     93: {
                     94:   CURL *easy;
                     95:   char *url;
                     96:   GlobalInfo *global;
                     97:   char error[CURL_ERROR_SIZE];
                     98: } ConnInfo;
                     99: 
                    100: 
                    101: /* Information associated with a specific socket */
                    102: typedef struct _SockInfo
                    103: {
                    104:   curl_socket_t sockfd;
                    105:   CURL *easy;
                    106:   int action;
                    107:   long timeout;
                    108:   GlobalInfo *global;
                    109: } SockInfo;
                    110: 
                    111: #define mycase(code) \
                    112:   case code: s = __STRING(code)
                    113: 
                    114: /* Die if we get a bad CURLMcode somewhere */
                    115: static void mcode_or_die(const char *where, CURLMcode code)
                    116: {
                    117:   if(CURLM_OK != code) {
                    118:     const char *s;
                    119:     switch(code) {
                    120:       mycase(CURLM_BAD_HANDLE); break;
                    121:       mycase(CURLM_BAD_EASY_HANDLE); break;
                    122:       mycase(CURLM_OUT_OF_MEMORY); break;
                    123:       mycase(CURLM_INTERNAL_ERROR); break;
                    124:       mycase(CURLM_UNKNOWN_OPTION); break;
                    125:       mycase(CURLM_LAST); break;
                    126:       default: s = "CURLM_unknown"; break;
                    127:       mycase(CURLM_BAD_SOCKET);
                    128:       fprintf(MSG_OUT, "ERROR: %s returns %s\n", where, s);
                    129:       /* ignore this error */
                    130:       return;
                    131:     }
                    132:     fprintf(MSG_OUT, "ERROR: %s returns %s\n", where, s);
                    133:     exit(code);
                    134:   }
                    135: }
                    136: 
                    137: static void timer_cb(GlobalInfo* g, int revents);
                    138: 
                    139: /* Update the timer after curl_multi library does it's thing. Curl will
                    140:  * inform us through this callback what it wants the new timeout to be,
                    141:  * after it does some work. */
                    142: static int multi_timer_cb(CURLM *multi, long timeout_ms, GlobalInfo *g)
                    143: {
                    144:   struct itimerspec its;
                    145: 
                    146:   fprintf(MSG_OUT, "multi_timer_cb: Setting timeout to %ld ms\n", timeout_ms);
                    147: 
                    148:   if(timeout_ms > 0) {
                    149:     its.it_interval.tv_sec = 1;
                    150:     its.it_interval.tv_nsec = 0;
                    151:     its.it_value.tv_sec = timeout_ms / 1000;
                    152:     its.it_value.tv_nsec = (timeout_ms % 1000) * 1000 * 1000;
                    153:   }
                    154:   else if(timeout_ms == 0) {
                    155:     /* libcurl wants us to timeout now, however setting both fields of
                    156:      * new_value.it_value to zero disarms the timer. The closest we can
                    157:      * do is to schedule the timer to fire in 1 ns. */
                    158:     its.it_interval.tv_sec = 1;
                    159:     its.it_interval.tv_nsec = 0;
                    160:     its.it_value.tv_sec = 0;
                    161:     its.it_value.tv_nsec = 1;
                    162:   }
                    163:   else {
                    164:     memset(&its, 0, sizeof(struct itimerspec));
                    165:   }
                    166: 
                    167:   timerfd_settime(g->tfd, /*flags=*/0, &its, NULL);
                    168:   return 0;
                    169: }
                    170: 
                    171: 
                    172: /* Check for completed transfers, and remove their easy handles */
                    173: static void check_multi_info(GlobalInfo *g)
                    174: {
                    175:   char *eff_url;
                    176:   CURLMsg *msg;
                    177:   int msgs_left;
                    178:   ConnInfo *conn;
                    179:   CURL *easy;
                    180:   CURLcode res;
                    181: 
                    182:   fprintf(MSG_OUT, "REMAINING: %d\n", g->still_running);
                    183:   while((msg = curl_multi_info_read(g->multi, &msgs_left))) {
                    184:     if(msg->msg == CURLMSG_DONE) {
                    185:       easy = msg->easy_handle;
                    186:       res = msg->data.result;
                    187:       curl_easy_getinfo(easy, CURLINFO_PRIVATE, &conn);
                    188:       curl_easy_getinfo(easy, CURLINFO_EFFECTIVE_URL, &eff_url);
                    189:       fprintf(MSG_OUT, "DONE: %s => (%d) %s\n", eff_url, res, conn->error);
                    190:       curl_multi_remove_handle(g->multi, easy);
                    191:       free(conn->url);
                    192:       curl_easy_cleanup(easy);
                    193:       free(conn);
                    194:     }
                    195:   }
                    196: }
                    197: 
                    198: /* Called by libevent when we get action on a multi socket filedescriptor*/
                    199: static void event_cb(GlobalInfo *g, int fd, int revents)
                    200: {
                    201:   CURLMcode rc;
                    202:   struct itimerspec its;
                    203: 
                    204:   int action = ((revents & EPOLLIN) ? CURL_CSELECT_IN : 0) |
                    205:                ((revents & EPOLLOUT) ? CURL_CSELECT_OUT : 0);
                    206: 
                    207:   rc = curl_multi_socket_action(g->multi, fd, action, &g->still_running);
                    208:   mcode_or_die("event_cb: curl_multi_socket_action", rc);
                    209: 
                    210:   check_multi_info(g);
                    211:   if(g->still_running <= 0) {
                    212:     fprintf(MSG_OUT, "last transfer done, kill timeout\n");
                    213:     memset(&its, 0, sizeof(struct itimerspec));
                    214:     timerfd_settime(g->tfd, 0, &its, NULL);
                    215:   }
                    216: }
                    217: 
                    218: /* Called by main loop when our timeout expires */
                    219: static void timer_cb(GlobalInfo* g, int revents)
                    220: {
                    221:   CURLMcode rc;
                    222:   uint64_t count = 0;
                    223:   ssize_t err = 0;
                    224: 
                    225:   err = read(g->tfd, &count, sizeof(uint64_t));
                    226:   if(err == -1) {
                    227:     /* Note that we may call the timer callback even if the timerfd isn't
                    228:      * readable. It's possible that there are multiple events stored in the
                    229:      * epoll buffer (i.e. the timer may have fired multiple times). The
                    230:      * event count is cleared after the first call so future events in the
                    231:      * epoll buffer will fail to read from the timer. */
                    232:     if(errno == EAGAIN) {
                    233:       fprintf(MSG_OUT, "EAGAIN on tfd %d\n", g->tfd);
                    234:       return;
                    235:     }
                    236:   }
                    237:   if(err != sizeof(uint64_t)) {
                    238:     fprintf(stderr, "read(tfd) == %ld", err);
                    239:     perror("read(tfd)");
                    240:   }
                    241: 
                    242:   rc = curl_multi_socket_action(g->multi,
                    243:                                   CURL_SOCKET_TIMEOUT, 0, &g->still_running);
                    244:   mcode_or_die("timer_cb: curl_multi_socket_action", rc);
                    245:   check_multi_info(g);
                    246: }
                    247: 
                    248: 
                    249: 
                    250: /* Clean up the SockInfo structure */
                    251: static void remsock(SockInfo *f, GlobalInfo* g)
                    252: {
                    253:   if(f) {
                    254:     if(f->sockfd) {
                    255:       if(epoll_ctl(g->epfd, EPOLL_CTL_DEL, f->sockfd, NULL))
                    256:         fprintf(stderr, "EPOLL_CTL_DEL failed for fd: %d : %s\n",
                    257:                 f->sockfd, strerror(errno));
                    258:     }
                    259:     free(f);
                    260:   }
                    261: }
                    262: 
                    263: 
                    264: 
                    265: /* Assign information to a SockInfo structure */
                    266: static void setsock(SockInfo *f, curl_socket_t s, CURL *e, int act,
                    267:                     GlobalInfo *g)
                    268: {
                    269:   struct epoll_event ev;
                    270:   int kind = ((act & CURL_POLL_IN) ? EPOLLIN : 0) |
                    271:              ((act & CURL_POLL_OUT) ? EPOLLOUT : 0);
                    272: 
                    273:   if(f->sockfd) {
                    274:     if(epoll_ctl(g->epfd, EPOLL_CTL_DEL, f->sockfd, NULL))
                    275:       fprintf(stderr, "EPOLL_CTL_DEL failed for fd: %d : %s\n",
                    276:               f->sockfd, strerror(errno));
                    277:   }
                    278: 
                    279:   f->sockfd = s;
                    280:   f->action = act;
                    281:   f->easy = e;
                    282: 
                    283:   ev.events = kind;
                    284:   ev.data.fd = s;
                    285:   if(epoll_ctl(g->epfd, EPOLL_CTL_ADD, s, &ev))
                    286:     fprintf(stderr, "EPOLL_CTL_ADD failed for fd: %d : %s\n",
                    287:             s, strerror(errno));
                    288: }
                    289: 
                    290: 
                    291: 
                    292: /* Initialize a new SockInfo structure */
                    293: static void addsock(curl_socket_t s, CURL *easy, int action, GlobalInfo *g)
                    294: {
                    295:   SockInfo *fdp = (SockInfo*)calloc(sizeof(SockInfo), 1);
                    296: 
                    297:   fdp->global = g;
                    298:   setsock(fdp, s, easy, action, g);
                    299:   curl_multi_assign(g->multi, s, fdp);
                    300: }
                    301: 
                    302: /* CURLMOPT_SOCKETFUNCTION */
                    303: static int sock_cb(CURL *e, curl_socket_t s, int what, void *cbp, void *sockp)
                    304: {
                    305:   GlobalInfo *g = (GlobalInfo*) cbp;
                    306:   SockInfo *fdp = (SockInfo*) sockp;
                    307:   const char *whatstr[]={ "none", "IN", "OUT", "INOUT", "REMOVE" };
                    308: 
                    309:   fprintf(MSG_OUT,
                    310:           "socket callback: s=%d e=%p what=%s ", s, e, whatstr[what]);
                    311:   if(what == CURL_POLL_REMOVE) {
                    312:     fprintf(MSG_OUT, "\n");
                    313:     remsock(fdp, g);
                    314:   }
                    315:   else {
                    316:     if(!fdp) {
                    317:       fprintf(MSG_OUT, "Adding data: %s\n", whatstr[what]);
                    318:       addsock(s, e, what, g);
                    319:     }
                    320:     else {
                    321:       fprintf(MSG_OUT,
                    322:               "Changing action from %s to %s\n",
                    323:               whatstr[fdp->action], whatstr[what]);
                    324:       setsock(fdp, s, e, what, g);
                    325:     }
                    326:   }
                    327:   return 0;
                    328: }
                    329: 
                    330: 
                    331: 
                    332: /* CURLOPT_WRITEFUNCTION */
                    333: static size_t write_cb(void *ptr, size_t size, size_t nmemb, void *data)
                    334: {
                    335:   (void)ptr;
                    336:   (void)data;
                    337:   return size * nmemb;
                    338: }
                    339: 
                    340: 
                    341: /* CURLOPT_PROGRESSFUNCTION */
                    342: static int prog_cb(void *p, double dltotal, double dlnow, double ult,
                    343:                    double uln)
                    344: {
                    345:   ConnInfo *conn = (ConnInfo *)p;
                    346:   (void)ult;
                    347:   (void)uln;
                    348: 
                    349:   fprintf(MSG_OUT, "Progress: %s (%g/%g)\n", conn->url, dlnow, dltotal);
                    350:   return 0;
                    351: }
                    352: 
                    353: 
                    354: /* Create a new easy handle, and add it to the global curl_multi */
                    355: static void new_conn(char *url, GlobalInfo *g)
                    356: {
                    357:   ConnInfo *conn;
                    358:   CURLMcode rc;
                    359: 
                    360:   conn = (ConnInfo*)calloc(1, sizeof(ConnInfo));
                    361:   conn->error[0]='\0';
                    362: 
                    363:   conn->easy = curl_easy_init();
                    364:   if(!conn->easy) {
                    365:     fprintf(MSG_OUT, "curl_easy_init() failed, exiting!\n");
                    366:     exit(2);
                    367:   }
                    368:   conn->global = g;
                    369:   conn->url = strdup(url);
                    370:   curl_easy_setopt(conn->easy, CURLOPT_URL, conn->url);
                    371:   curl_easy_setopt(conn->easy, CURLOPT_WRITEFUNCTION, write_cb);
                    372:   curl_easy_setopt(conn->easy, CURLOPT_WRITEDATA, conn);
                    373:   curl_easy_setopt(conn->easy, CURLOPT_VERBOSE, 1L);
                    374:   curl_easy_setopt(conn->easy, CURLOPT_ERRORBUFFER, conn->error);
                    375:   curl_easy_setopt(conn->easy, CURLOPT_PRIVATE, conn);
                    376:   curl_easy_setopt(conn->easy, CURLOPT_NOPROGRESS, 0L);
                    377:   curl_easy_setopt(conn->easy, CURLOPT_PROGRESSFUNCTION, prog_cb);
                    378:   curl_easy_setopt(conn->easy, CURLOPT_PROGRESSDATA, conn);
                    379:   curl_easy_setopt(conn->easy, CURLOPT_FOLLOWLOCATION, 1L);
                    380:   curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_TIME, 3L);
                    381:   curl_easy_setopt(conn->easy, CURLOPT_LOW_SPEED_LIMIT, 10L);
                    382:   fprintf(MSG_OUT,
                    383:           "Adding easy %p to multi %p (%s)\n", conn->easy, g->multi, url);
                    384:   rc = curl_multi_add_handle(g->multi, conn->easy);
                    385:   mcode_or_die("new_conn: curl_multi_add_handle", rc);
                    386: 
                    387:   /* note that the add_handle() will set a time-out to trigger very soon so
                    388:      that the necessary socket_action() call will be called by this app */
                    389: }
                    390: 
                    391: /* This gets called whenever data is received from the fifo */
                    392: static void fifo_cb(GlobalInfo* g, int revents)
                    393: {
                    394:   char s[1024];
                    395:   long int rv = 0;
                    396:   int n = 0;
                    397: 
                    398:   do {
                    399:     s[0]='\0';
                    400:     rv = fscanf(g->input, "%1023s%n", s, &n);
                    401:     s[n]='\0';
                    402:     if(n && s[0]) {
                    403:       new_conn(s, g); /* if we read a URL, go get it! */
                    404:     }
                    405:     else
                    406:       break;
                    407:   } while(rv != EOF);
                    408: }
                    409: 
                    410: /* Create a named pipe and tell libevent to monitor it */
                    411: static const char *fifo = "hiper.fifo";
                    412: static int init_fifo(GlobalInfo *g)
                    413: {
                    414:   struct stat st;
                    415:   curl_socket_t sockfd;
                    416:   struct epoll_event epev;
                    417: 
                    418:   fprintf(MSG_OUT, "Creating named pipe \"%s\"\n", fifo);
                    419:   if(lstat (fifo, &st) == 0) {
                    420:     if((st.st_mode & S_IFMT) == S_IFREG) {
                    421:       errno = EEXIST;
                    422:       perror("lstat");
                    423:       exit(1);
                    424:     }
                    425:   }
                    426:   unlink(fifo);
                    427:   if(mkfifo (fifo, 0600) == -1) {
                    428:     perror("mkfifo");
                    429:     exit(1);
                    430:   }
                    431:   sockfd = open(fifo, O_RDWR | O_NONBLOCK, 0);
                    432:   if(sockfd == -1) {
                    433:     perror("open");
                    434:     exit(1);
                    435:   }
                    436: 
                    437:   g->fifofd = sockfd;
                    438:   g->input = fdopen(sockfd, "r");
                    439: 
                    440:   epev.events = EPOLLIN;
                    441:   epev.data.fd = sockfd;
                    442:   epoll_ctl(g->epfd, EPOLL_CTL_ADD, sockfd, &epev);
                    443: 
                    444:   fprintf(MSG_OUT, "Now, pipe some URL's into > %s\n", fifo);
                    445:   return 0;
                    446: }
                    447: 
                    448: static void clean_fifo(GlobalInfo *g)
                    449: {
                    450:     epoll_ctl(g->epfd, EPOLL_CTL_DEL, g->fifofd, NULL);
                    451:     fclose(g->input);
                    452:     unlink(fifo);
                    453: }
                    454: 
                    455: 
                    456: int g_should_exit_ = 0;
                    457: 
                    458: void SignalHandler(int signo)
                    459: {
                    460:   if(signo == SIGINT) {
                    461:     g_should_exit_ = 1;
                    462:   }
                    463: }
                    464: 
                    465: int main(int argc, char **argv)
                    466: {
                    467:   GlobalInfo g;
                    468:   struct itimerspec its;
                    469:   struct epoll_event ev;
                    470:   struct epoll_event events[10];
                    471:   (void)argc;
                    472:   (void)argv;
                    473: 
                    474:   g_should_exit_ = 0;
                    475:   signal(SIGINT, SignalHandler);
                    476: 
                    477:   memset(&g, 0, sizeof(GlobalInfo));
                    478:   g.epfd = epoll_create1(EPOLL_CLOEXEC);
                    479:   if(g.epfd == -1) {
                    480:     perror("epoll_create1 failed");
                    481:     exit(1);
                    482:   }
                    483: 
                    484:   g.tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
                    485:   if(g.tfd == -1) {
                    486:     perror("timerfd_create failed");
                    487:     exit(1);
                    488:   }
                    489: 
                    490:   memset(&its, 0, sizeof(struct itimerspec));
                    491:   its.it_interval.tv_sec = 1;
                    492:   its.it_value.tv_sec = 1;
                    493:   timerfd_settime(g.tfd, 0, &its, NULL);
                    494: 
                    495:   ev.events = EPOLLIN;
                    496:   ev.data.fd = g.tfd;
                    497:   epoll_ctl(g.epfd, EPOLL_CTL_ADD, g.tfd, &ev);
                    498: 
                    499:   init_fifo(&g);
                    500:   g.multi = curl_multi_init();
                    501: 
                    502:   /* setup the generic multi interface options we want */
                    503:   curl_multi_setopt(g.multi, CURLMOPT_SOCKETFUNCTION, sock_cb);
                    504:   curl_multi_setopt(g.multi, CURLMOPT_SOCKETDATA, &g);
                    505:   curl_multi_setopt(g.multi, CURLMOPT_TIMERFUNCTION, multi_timer_cb);
                    506:   curl_multi_setopt(g.multi, CURLMOPT_TIMERDATA, &g);
                    507: 
                    508:   /* we don't call any curl_multi_socket*() function yet as we have no handles
                    509:      added! */
                    510: 
                    511:   fprintf(MSG_OUT, "Entering wait loop\n");
                    512:   fflush(MSG_OUT);
                    513:   while(!g_should_exit_) {
                    514:     int idx;
                    515:     int err = epoll_wait(g.epfd, events,
                    516:                          sizeof(events)/sizeof(struct epoll_event), 10000);
                    517:     if(err == -1) {
                    518:       if(errno == EINTR) {
                    519:         fprintf(MSG_OUT, "note: wait interrupted\n");
                    520:         continue;
                    521:       }
                    522:       else {
                    523:         perror("epoll_wait");
                    524:         exit(1);
                    525:       }
                    526:     }
                    527: 
                    528:     for(idx = 0; idx < err; ++idx) {
                    529:       if(events[idx].data.fd == g.fifofd) {
                    530:         fifo_cb(&g, events[idx].events);
                    531:       }
                    532:       else if(events[idx].data.fd == g.tfd) {
                    533:         timer_cb(&g, events[idx].events);
                    534:       }
                    535:       else {
                    536:         event_cb(&g, events[idx].data.fd, events[idx].events);
                    537:       }
                    538:     }
                    539:   }
                    540: 
                    541:   fprintf(MSG_OUT, "Exiting normally.\n");
                    542:   fflush(MSG_OUT);
                    543: 
                    544:   curl_multi_cleanup(g.multi);
                    545:   clean_fifo(&g);
                    546:   return 0;
                    547: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>