File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / quagga / lib / stream.c
Revision 1.1.1.3 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Sun Jul 21 23:54:39 2013 UTC (11 years, 1 month ago) by misho
Branches: quagga, MAIN
CVS tags: v0_99_22p0, v0_99_22, HEAD
0.99.22

    1:   /*
    2:  * Packet interface
    3:  * Copyright (C) 1999 Kunihiro Ishiguro
    4:  *
    5:  * This file is part of GNU Zebra.
    6:  *
    7:  * GNU Zebra is free software; you can redistribute it and/or modify it
    8:  * under the terms of the GNU General Public License as published by the
    9:  * Free Software Foundation; either version 2, or (at your option) any
   10:  * later version.
   11:  *
   12:  * GNU Zebra is distributed in the hope that it will be useful, but
   13:  * WITHOUT ANY WARRANTY; without even the implied warranty of
   14:  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
   15:  * General Public License for more details.
   16:  *
   17:  * You should have received a copy of the GNU General Public License
   18:  * along with GNU Zebra; see the file COPYING.  If not, write to the Free
   19:  * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
   20:  * 02111-1307, USA.  
   21:  */
   22: 
   23: #include <stddef.h>
   24: #include <zebra.h>
   25: 
   26: #include "stream.h"
   27: #include "memory.h"
   28: #include "network.h"
   29: #include "prefix.h"
   30: #include "log.h"
   31: 
   32: /* Tests whether a position is valid */ 
   33: #define GETP_VALID(S,G) \
   34:   ((G) <= (S)->endp)
   35: #define PUT_AT_VALID(S,G) GETP_VALID(S,G)
   36: #define ENDP_VALID(S,E) \
   37:   ((E) <= (S)->size)
   38: 
   39: /* asserting sanity checks. Following must be true before
   40:  * stream functions are called:
   41:  *
   42:  * Following must always be true of stream elements
   43:  * before and after calls to stream functions:
   44:  *
   45:  * getp <= endp <= size
   46:  *
   47:  * Note that after a stream function is called following may be true:
   48:  * if (getp == endp) then stream is no longer readable
   49:  * if (endp == size) then stream is no longer writeable
   50:  *
   51:  * It is valid to put to anywhere within the size of the stream, but only
   52:  * using stream_put..._at() functions.
   53:  */
   54: #define STREAM_WARN_OFFSETS(S) \
   55:   zlog_warn ("&(struct stream): %p, size: %lu, getp: %lu, endp: %lu\n", \
   56:              (S), \
   57:              (unsigned long) (S)->size, \
   58:              (unsigned long) (S)->getp, \
   59:              (unsigned long) (S)->endp)\
   60: 
   61: #define STREAM_VERIFY_SANE(S) \
   62:   do { \
   63:     if ( !(GETP_VALID(S, (S)->getp)) && ENDP_VALID(S, (S)->endp) ) \
   64:       STREAM_WARN_OFFSETS(S); \
   65:     assert ( GETP_VALID(S, (S)->getp) ); \
   66:     assert ( ENDP_VALID(S, (S)->endp) ); \
   67:   } while (0)
   68: 
   69: #define STREAM_BOUND_WARN(S, WHAT) \
   70:   do { \
   71:     zlog_warn ("%s: Attempt to %s out of bounds", __func__, (WHAT)); \
   72:     STREAM_WARN_OFFSETS(S); \
   73:     assert (0); \
   74:   } while (0)
   75: 
   76: /* XXX: Deprecated macro: do not use */
   77: #define CHECK_SIZE(S, Z) \
   78:   do { \
   79:     if (((S)->endp + (Z)) > (S)->size) \
   80:       { \
   81:         zlog_warn ("CHECK_SIZE: truncating requested size %lu\n", \
   82:                    (unsigned long) (Z)); \
   83:         STREAM_WARN_OFFSETS(S); \
   84:         (Z) = (S)->size - (S)->endp; \
   85:       } \
   86:   } while (0);
   87: 
   88: /* Make stream buffer. */
   89: struct stream *
   90: stream_new (size_t size)
   91: {
   92:   struct stream *s;
   93: 
   94:   assert (size > 0);
   95:   
   96:   if (size == 0)
   97:     {
   98:       zlog_warn ("stream_new(): called with 0 size!");
   99:       return NULL;
  100:     }
  101:   
  102:   s = XCALLOC (MTYPE_STREAM, sizeof (struct stream));
  103: 
  104:   if (s == NULL)
  105:     return s;
  106:   
  107:   if ( (s->data = XMALLOC (MTYPE_STREAM_DATA, size)) == NULL)
  108:     {
  109:       XFREE (MTYPE_STREAM, s);
  110:       return NULL;
  111:     }
  112:   
  113:   s->size = size;
  114:   return s;
  115: }
  116: 
  117: /* Free it now. */
  118: void
  119: stream_free (struct stream *s)
  120: {
  121:   if (!s)
  122:     return;
  123:   
  124:   XFREE (MTYPE_STREAM_DATA, s->data);
  125:   XFREE (MTYPE_STREAM, s);
  126: }
  127: 
  128: struct stream *
  129: stream_copy (struct stream *new, struct stream *src)
  130: {
  131:   STREAM_VERIFY_SANE (src);
  132:   
  133:   assert (new != NULL);
  134:   assert (STREAM_SIZE(new) >= src->endp);
  135: 
  136:   new->endp = src->endp;
  137:   new->getp = src->getp;
  138:   
  139:   memcpy (new->data, src->data, src->endp);
  140:   
  141:   return new;
  142: }
  143: 
  144: struct stream *
  145: stream_dup (struct stream *s)
  146: {
  147:   struct stream *new;
  148: 
  149:   STREAM_VERIFY_SANE (s);
  150: 
  151:   if ( (new = stream_new (s->endp)) == NULL)
  152:     return NULL;
  153: 
  154:   return (stream_copy (new, s));
  155: }
  156: 
  157: size_t
  158: stream_resize (struct stream *s, size_t newsize)
  159: {
  160:   u_char *newdata;
  161:   STREAM_VERIFY_SANE (s);
  162:   
  163:   newdata = XREALLOC (MTYPE_STREAM_DATA, s->data, newsize);
  164:   
  165:   if (newdata == NULL)
  166:     return s->size;
  167:   
  168:   s->data = newdata;
  169:   s->size = newsize;
  170:   
  171:   if (s->endp > s->size)
  172:     s->endp = s->size;
  173:   if (s->getp > s->endp)
  174:     s->getp = s->endp;
  175:   
  176:   STREAM_VERIFY_SANE (s);
  177:   
  178:   return s->size;
  179: }
  180: 
  181: size_t
  182: stream_get_getp (struct stream *s)
  183: {
  184:   STREAM_VERIFY_SANE(s);
  185:   return s->getp;
  186: }
  187: 
  188: size_t
  189: stream_get_endp (struct stream *s)
  190: {
  191:   STREAM_VERIFY_SANE(s);
  192:   return s->endp;
  193: }
  194: 
  195: size_t
  196: stream_get_size (struct stream *s)
  197: {
  198:   STREAM_VERIFY_SANE(s);
  199:   return s->size;
  200: }
  201: 
  202: /* Stream structre' stream pointer related functions.  */
  203: void
  204: stream_set_getp (struct stream *s, size_t pos)
  205: {
  206:   STREAM_VERIFY_SANE(s);
  207:   
  208:   if (!GETP_VALID (s, pos))
  209:     {
  210:       STREAM_BOUND_WARN (s, "set getp");
  211:       pos = s->endp;
  212:     }
  213: 
  214:   s->getp = pos;
  215: }
  216: 
  217: void
  218: stream_set_endp (struct stream *s, size_t pos)
  219: {
  220:   STREAM_VERIFY_SANE(s);
  221: 
  222:   if (!ENDP_VALID(s, pos))
  223:     {
  224:       STREAM_BOUND_WARN (s, "set endp");
  225:       return;
  226:     }
  227: 
  228:   /*
  229:    * Make sure the current read pointer is not beyond the new endp.
  230:    */
  231:   if (s->getp > pos)
  232:     {
  233:       STREAM_BOUND_WARN(s, "set endp");
  234:       return;
  235:     }
  236: 
  237:   s->endp = pos;
  238:   STREAM_VERIFY_SANE(s);
  239: }
  240: 
  241: /* Forward pointer. */
  242: void
  243: stream_forward_getp (struct stream *s, size_t size)
  244: {
  245:   STREAM_VERIFY_SANE(s);
  246:   
  247:   if (!GETP_VALID (s, s->getp + size))
  248:     {
  249:       STREAM_BOUND_WARN (s, "seek getp");
  250:       return;
  251:     }
  252:   
  253:   s->getp += size;
  254: }
  255: 
  256: void
  257: stream_forward_endp (struct stream *s, size_t size)
  258: {
  259:   STREAM_VERIFY_SANE(s);
  260:   
  261:   if (!ENDP_VALID (s, s->endp + size))
  262:     {
  263:       STREAM_BOUND_WARN (s, "seek endp");
  264:       return;
  265:     }
  266:   
  267:   s->endp += size;
  268: }
  269: 
  270: /* Copy from stream to destination. */
  271: void
  272: stream_get (void *dst, struct stream *s, size_t size)
  273: {
  274:   STREAM_VERIFY_SANE(s);
  275:   
  276:   if (STREAM_READABLE(s) < size)
  277:     {
  278:       STREAM_BOUND_WARN (s, "get");
  279:       return;
  280:     }
  281:   
  282:   memcpy (dst, s->data + s->getp, size);
  283:   s->getp += size;
  284: }
  285: 
  286: /* Get next character from the stream. */
  287: u_char
  288: stream_getc (struct stream *s)
  289: {
  290:   u_char c;
  291:   
  292:   STREAM_VERIFY_SANE (s);
  293: 
  294:   if (STREAM_READABLE(s) < sizeof (u_char))
  295:     {
  296:       STREAM_BOUND_WARN (s, "get char");
  297:       return 0;
  298:     }
  299:   c = s->data[s->getp++];
  300:   
  301:   return c;
  302: }
  303: 
  304: /* Get next character from the stream. */
  305: u_char
  306: stream_getc_from (struct stream *s, size_t from)
  307: {
  308:   u_char c;
  309: 
  310:   STREAM_VERIFY_SANE(s);
  311:   
  312:   if (!GETP_VALID (s, from + sizeof (u_char)))
  313:     {
  314:       STREAM_BOUND_WARN (s, "get char");
  315:       return 0;
  316:     }
  317:   
  318:   c = s->data[from];
  319:   
  320:   return c;
  321: }
  322: 
  323: /* Get next word from the stream. */
  324: u_int16_t
  325: stream_getw (struct stream *s)
  326: {
  327:   u_int16_t w;
  328: 
  329:   STREAM_VERIFY_SANE (s);
  330: 
  331:   if (STREAM_READABLE (s) < sizeof (u_int16_t))
  332:     {
  333:       STREAM_BOUND_WARN (s, "get ");
  334:       return 0;
  335:     }
  336:   
  337:   w = s->data[s->getp++] << 8;
  338:   w |= s->data[s->getp++];
  339:   
  340:   return w;
  341: }
  342: 
  343: /* Get next word from the stream. */
  344: u_int16_t
  345: stream_getw_from (struct stream *s, size_t from)
  346: {
  347:   u_int16_t w;
  348: 
  349:   STREAM_VERIFY_SANE(s);
  350:   
  351:   if (!GETP_VALID (s, from + sizeof (u_int16_t)))
  352:     {
  353:       STREAM_BOUND_WARN (s, "get ");
  354:       return 0;
  355:     }
  356:   
  357:   w = s->data[from++] << 8;
  358:   w |= s->data[from];
  359:   
  360:   return w;
  361: }
  362: 
  363: /* Get next long word from the stream. */
  364: u_int32_t
  365: stream_getl_from (struct stream *s, size_t from)
  366: {
  367:   u_int32_t l;
  368: 
  369:   STREAM_VERIFY_SANE(s);
  370:   
  371:   if (!GETP_VALID (s, from + sizeof (u_int32_t)))
  372:     {
  373:       STREAM_BOUND_WARN (s, "get long");
  374:       return 0;
  375:     }
  376:   
  377:   l  = s->data[from++] << 24;
  378:   l |= s->data[from++] << 16;
  379:   l |= s->data[from++] << 8;
  380:   l |= s->data[from];
  381:   
  382:   return l;
  383: }
  384: 
  385: u_int32_t
  386: stream_getl (struct stream *s)
  387: {
  388:   u_int32_t l;
  389: 
  390:   STREAM_VERIFY_SANE(s);
  391:   
  392:   if (STREAM_READABLE (s) < sizeof (u_int32_t))
  393:     {
  394:       STREAM_BOUND_WARN (s, "get long");
  395:       return 0;
  396:     }
  397:   
  398:   l  = s->data[s->getp++] << 24;
  399:   l |= s->data[s->getp++] << 16;
  400:   l |= s->data[s->getp++] << 8;
  401:   l |= s->data[s->getp++];
  402:   
  403:   return l;
  404: }
  405: 
  406: /* Get next quad word from the stream. */
  407: uint64_t
  408: stream_getq_from (struct stream *s, size_t from)
  409: {
  410:   uint64_t q;
  411: 
  412:   STREAM_VERIFY_SANE(s);
  413:   
  414:   if (!GETP_VALID (s, from + sizeof (uint64_t)))
  415:     {
  416:       STREAM_BOUND_WARN (s, "get quad");
  417:       return 0;
  418:     }
  419:   
  420:   q  = ((uint64_t) s->data[from++]) << 56;
  421:   q |= ((uint64_t) s->data[from++]) << 48;
  422:   q |= ((uint64_t) s->data[from++]) << 40;
  423:   q |= ((uint64_t) s->data[from++]) << 32;  
  424:   q |= ((uint64_t) s->data[from++]) << 24;
  425:   q |= ((uint64_t) s->data[from++]) << 16;
  426:   q |= ((uint64_t) s->data[from++]) << 8;
  427:   q |= ((uint64_t) s->data[from++]);
  428:   
  429:   return q;
  430: }
  431: 
  432: uint64_t
  433: stream_getq (struct stream *s)
  434: {
  435:   uint64_t q;
  436: 
  437:   STREAM_VERIFY_SANE(s);
  438:   
  439:   if (STREAM_READABLE (s) < sizeof (uint64_t))
  440:     {
  441:       STREAM_BOUND_WARN (s, "get quad");
  442:       return 0;
  443:     }
  444:   
  445:   q  = ((uint64_t) s->data[s->getp++]) << 56;
  446:   q |= ((uint64_t) s->data[s->getp++]) << 48;
  447:   q |= ((uint64_t) s->data[s->getp++]) << 40;
  448:   q |= ((uint64_t) s->data[s->getp++]) << 32;  
  449:   q |= ((uint64_t) s->data[s->getp++]) << 24;
  450:   q |= ((uint64_t) s->data[s->getp++]) << 16;
  451:   q |= ((uint64_t) s->data[s->getp++]) << 8;
  452:   q |= ((uint64_t) s->data[s->getp++]);
  453:   
  454:   return q;
  455: }
  456: 
  457: /* Get next long word from the stream. */
  458: u_int32_t
  459: stream_get_ipv4 (struct stream *s)
  460: {
  461:   u_int32_t l;
  462: 
  463:   STREAM_VERIFY_SANE(s);
  464:   
  465:   if (STREAM_READABLE (s) < sizeof(u_int32_t))
  466:     {
  467:       STREAM_BOUND_WARN (s, "get ipv4");
  468:       return 0;
  469:     }
  470:   
  471:   memcpy (&l, s->data + s->getp, sizeof(u_int32_t));
  472:   s->getp += sizeof(u_int32_t);
  473: 
  474:   return l;
  475: }
  476: 
  477: /* Copy to source to stream.
  478:  *
  479:  * XXX: This uses CHECK_SIZE and hence has funny semantics -> Size will wrap
  480:  * around. This should be fixed once the stream updates are working.
  481:  *
  482:  * stream_write() is saner
  483:  */
  484: void
  485: stream_put (struct stream *s, const void *src, size_t size)
  486: {
  487: 
  488:   /* XXX: CHECK_SIZE has strange semantics. It should be deprecated */
  489:   CHECK_SIZE(s, size);
  490:   
  491:   STREAM_VERIFY_SANE(s);
  492:   
  493:   if (STREAM_WRITEABLE (s) < size)
  494:     {
  495:       STREAM_BOUND_WARN (s, "put");
  496:       return;
  497:     }
  498:   
  499:   if (src)
  500:     memcpy (s->data + s->endp, src, size);
  501:   else
  502:     memset (s->data + s->endp, 0, size);
  503: 
  504:   s->endp += size;
  505: }
  506: 
  507: /* Put character to the stream. */
  508: int
  509: stream_putc (struct stream *s, u_char c)
  510: {
  511:   STREAM_VERIFY_SANE(s);
  512:   
  513:   if (STREAM_WRITEABLE (s) < sizeof(u_char))
  514:     {
  515:       STREAM_BOUND_WARN (s, "put");
  516:       return 0;
  517:     }
  518:   
  519:   s->data[s->endp++] = c;
  520:   return sizeof (u_char);
  521: }
  522: 
  523: /* Put word to the stream. */
  524: int
  525: stream_putw (struct stream *s, u_int16_t w)
  526: {
  527:   STREAM_VERIFY_SANE (s);
  528: 
  529:   if (STREAM_WRITEABLE (s) < sizeof (u_int16_t))
  530:     {
  531:       STREAM_BOUND_WARN (s, "put");
  532:       return 0;
  533:     }
  534:   
  535:   s->data[s->endp++] = (u_char)(w >>  8);
  536:   s->data[s->endp++] = (u_char) w;
  537: 
  538:   return 2;
  539: }
  540: 
  541: /* Put long word to the stream. */
  542: int
  543: stream_putl (struct stream *s, u_int32_t l)
  544: {
  545:   STREAM_VERIFY_SANE (s);
  546: 
  547:   if (STREAM_WRITEABLE (s) < sizeof (u_int32_t))
  548:     {
  549:       STREAM_BOUND_WARN (s, "put");
  550:       return 0;
  551:     }
  552:   
  553:   s->data[s->endp++] = (u_char)(l >> 24);
  554:   s->data[s->endp++] = (u_char)(l >> 16);
  555:   s->data[s->endp++] = (u_char)(l >>  8);
  556:   s->data[s->endp++] = (u_char)l;
  557: 
  558:   return 4;
  559: }
  560: 
  561: /* Put quad word to the stream. */
  562: int
  563: stream_putq (struct stream *s, uint64_t q)
  564: {
  565:   STREAM_VERIFY_SANE (s);
  566: 
  567:   if (STREAM_WRITEABLE (s) < sizeof (uint64_t))
  568:     {
  569:       STREAM_BOUND_WARN (s, "put quad");
  570:       return 0;
  571:     }
  572:   
  573:   s->data[s->endp++] = (u_char)(q >> 56);
  574:   s->data[s->endp++] = (u_char)(q >> 48);
  575:   s->data[s->endp++] = (u_char)(q >> 40);
  576:   s->data[s->endp++] = (u_char)(q >> 32);
  577:   s->data[s->endp++] = (u_char)(q >> 24);
  578:   s->data[s->endp++] = (u_char)(q >> 16);
  579:   s->data[s->endp++] = (u_char)(q >>  8);
  580:   s->data[s->endp++] = (u_char)q;
  581: 
  582:   return 8;
  583: }
  584: 
  585: int
  586: stream_putc_at (struct stream *s, size_t putp, u_char c)
  587: {
  588:   STREAM_VERIFY_SANE(s);
  589:   
  590:   if (!PUT_AT_VALID (s, putp + sizeof (u_char)))
  591:     {
  592:       STREAM_BOUND_WARN (s, "put");
  593:       return 0;
  594:     }
  595:   
  596:   s->data[putp] = c;
  597:   
  598:   return 1;
  599: }
  600: 
  601: int
  602: stream_putw_at (struct stream *s, size_t putp, u_int16_t w)
  603: {
  604:   STREAM_VERIFY_SANE(s);
  605:   
  606:   if (!PUT_AT_VALID (s, putp + sizeof (u_int16_t)))
  607:     {
  608:       STREAM_BOUND_WARN (s, "put");
  609:       return 0;
  610:     }
  611:   
  612:   s->data[putp] = (u_char)(w >>  8);
  613:   s->data[putp + 1] = (u_char) w;
  614:   
  615:   return 2;
  616: }
  617: 
  618: int
  619: stream_putl_at (struct stream *s, size_t putp, u_int32_t l)
  620: {
  621:   STREAM_VERIFY_SANE(s);
  622:   
  623:   if (!PUT_AT_VALID (s, putp + sizeof (u_int32_t)))
  624:     {
  625:       STREAM_BOUND_WARN (s, "put");
  626:       return 0;
  627:     }
  628:   s->data[putp] = (u_char)(l >> 24);
  629:   s->data[putp + 1] = (u_char)(l >> 16);
  630:   s->data[putp + 2] = (u_char)(l >>  8);
  631:   s->data[putp + 3] = (u_char)l;
  632:   
  633:   return 4;
  634: }
  635: 
  636: int
  637: stream_putq_at (struct stream *s, size_t putp, uint64_t q)
  638: {
  639:   STREAM_VERIFY_SANE(s);
  640:   
  641:   if (!PUT_AT_VALID (s, putp + sizeof (uint64_t)))
  642:     {
  643:       STREAM_BOUND_WARN (s, "put");
  644:       return 0;
  645:     }
  646:   s->data[putp] =     (u_char)(q >> 56);
  647:   s->data[putp + 1] = (u_char)(q >> 48);
  648:   s->data[putp + 2] = (u_char)(q >> 40);
  649:   s->data[putp + 3] = (u_char)(q >> 32);
  650:   s->data[putp + 4] = (u_char)(q >> 24);
  651:   s->data[putp + 5] = (u_char)(q >> 16);
  652:   s->data[putp + 6] = (u_char)(q >>  8);
  653:   s->data[putp + 7] = (u_char)q;
  654:   
  655:   return 8;
  656: }
  657: 
  658: /* Put long word to the stream. */
  659: int
  660: stream_put_ipv4 (struct stream *s, u_int32_t l)
  661: {
  662:   STREAM_VERIFY_SANE(s);
  663:   
  664:   if (STREAM_WRITEABLE (s) < sizeof (u_int32_t))
  665:     {
  666:       STREAM_BOUND_WARN (s, "put");
  667:       return 0;
  668:     }
  669:   memcpy (s->data + s->endp, &l, sizeof (u_int32_t));
  670:   s->endp += sizeof (u_int32_t);
  671: 
  672:   return sizeof (u_int32_t);
  673: }
  674: 
  675: /* Put long word to the stream. */
  676: int
  677: stream_put_in_addr (struct stream *s, struct in_addr *addr)
  678: {
  679:   STREAM_VERIFY_SANE(s);
  680:   
  681:   if (STREAM_WRITEABLE (s) < sizeof (u_int32_t))
  682:     {
  683:       STREAM_BOUND_WARN (s, "put");
  684:       return 0;
  685:     }
  686: 
  687:   memcpy (s->data + s->endp, addr, sizeof (u_int32_t));
  688:   s->endp += sizeof (u_int32_t);
  689: 
  690:   return sizeof (u_int32_t);
  691: }
  692: 
  693: /* Put prefix by nlri type format. */
  694: int
  695: stream_put_prefix (struct stream *s, struct prefix *p)
  696: {
  697:   size_t psize;
  698:   
  699:   STREAM_VERIFY_SANE(s);
  700:   
  701:   psize = PSIZE (p->prefixlen);
  702:   
  703:   if (STREAM_WRITEABLE (s) < psize)
  704:     {
  705:       STREAM_BOUND_WARN (s, "put");
  706:       return 0;
  707:     }
  708:   
  709:   stream_putc (s, p->prefixlen);
  710:   memcpy (s->data + s->endp, &p->u.prefix, psize);
  711:   s->endp += psize;
  712:   
  713:   return psize;
  714: }
  715: 
  716: /* Read size from fd. */
  717: int
  718: stream_read (struct stream *s, int fd, size_t size)
  719: {
  720:   int nbytes;
  721: 
  722:   STREAM_VERIFY_SANE(s);
  723:   
  724:   if (STREAM_WRITEABLE (s) < size)
  725:     {
  726:       STREAM_BOUND_WARN (s, "put");
  727:       return 0;
  728:     }
  729:   
  730:   nbytes = readn (fd, s->data + s->endp, size);
  731: 
  732:   if (nbytes > 0)
  733:     s->endp += nbytes;
  734:   
  735:   return nbytes;
  736: }
  737: 
  738: /* Read size from fd. */
  739: int
  740: stream_read_unblock (struct stream *s, int fd, size_t size)
  741: {
  742:   int nbytes;
  743:   int val;
  744:   
  745:   STREAM_VERIFY_SANE(s);
  746:   
  747:   if (STREAM_WRITEABLE (s) < size)
  748:     {
  749:       STREAM_BOUND_WARN (s, "put");
  750:       return 0;
  751:     }
  752:   
  753:   val = fcntl (fd, F_GETFL, 0);
  754:   fcntl (fd, F_SETFL, val|O_NONBLOCK);
  755:   nbytes = read (fd, s->data + s->endp, size);
  756:   fcntl (fd, F_SETFL, val);
  757: 
  758:   if (nbytes > 0)
  759:     s->endp += nbytes;
  760:   
  761:   return nbytes;
  762: }
  763: 
  764: ssize_t
  765: stream_read_try(struct stream *s, int fd, size_t size)
  766: {
  767:   ssize_t nbytes;
  768: 
  769:   STREAM_VERIFY_SANE(s);
  770:   
  771:   if (STREAM_WRITEABLE(s) < size)
  772:     {
  773:       STREAM_BOUND_WARN (s, "put");
  774:       /* Fatal (not transient) error, since retrying will not help
  775:          (stream is too small to contain the desired data). */
  776:       return -1;
  777:     }
  778: 
  779:   if ((nbytes = read(fd, s->data + s->endp, size)) >= 0)
  780:     {
  781:       s->endp += nbytes;
  782:       return nbytes;
  783:     }
  784:   /* Error: was it transient (return -2) or fatal (return -1)? */
  785:   if (ERRNO_IO_RETRY(errno))
  786:     return -2;
  787:   zlog_warn("%s: read failed on fd %d: %s", __func__, fd, safe_strerror(errno));
  788:   return -1;
  789: }
  790: 
  791: /* Read up to size bytes into the stream from the fd, using recvmsgfrom
  792:  * whose arguments match the remaining arguments to this function
  793:  */
  794: ssize_t 
  795: stream_recvfrom (struct stream *s, int fd, size_t size, int flags,
  796:                  struct sockaddr *from, socklen_t *fromlen)                     
  797: {
  798:   ssize_t nbytes;
  799: 
  800:   STREAM_VERIFY_SANE(s);
  801:   
  802:   if (STREAM_WRITEABLE(s) < size)
  803:     {
  804:       STREAM_BOUND_WARN (s, "put");
  805:       /* Fatal (not transient) error, since retrying will not help
  806:          (stream is too small to contain the desired data). */
  807:       return -1;
  808:     }
  809: 
  810:   if ((nbytes = recvfrom (fd, s->data + s->endp, size, 
  811:                           flags, from, fromlen)) >= 0)
  812:     {
  813:       s->endp += nbytes;
  814:       return nbytes;
  815:     }
  816:   /* Error: was it transient (return -2) or fatal (return -1)? */
  817:   if (ERRNO_IO_RETRY(errno))
  818:     return -2;
  819:   zlog_warn("%s: read failed on fd %d: %s", __func__, fd, safe_strerror(errno));
  820:   return -1;
  821: }
  822: 
  823: /* Read up to smaller of size or SIZE_REMAIN() bytes to the stream, starting
  824:  * from endp.
  825:  * First iovec will be used to receive the data.
  826:  * Stream need not be empty.
  827:  */
  828: ssize_t
  829: stream_recvmsg (struct stream *s, int fd, struct msghdr *msgh, int flags, 
  830:                 size_t size)
  831: {
  832:   int nbytes;
  833:   struct iovec *iov;
  834:   
  835:   STREAM_VERIFY_SANE(s);
  836:   assert (msgh->msg_iovlen > 0);  
  837:   
  838:   if (STREAM_WRITEABLE (s) < size)
  839:     {
  840:       STREAM_BOUND_WARN (s, "put");
  841:       /* This is a logic error in the calling code: the stream is too small
  842:          to hold the desired data! */
  843:       return -1;
  844:     }
  845:   
  846:   iov = &(msgh->msg_iov[0]);
  847:   iov->iov_base = (s->data + s->endp);
  848:   iov->iov_len = size;
  849:   
  850:   nbytes = recvmsg (fd, msgh, flags);
  851:   
  852:   if (nbytes > 0)
  853:     s->endp += nbytes;
  854:   
  855:   return nbytes;
  856: }
  857:   
  858: /* Write data to buffer. */
  859: size_t
  860: stream_write (struct stream *s, const void *ptr, size_t size)
  861: {
  862: 
  863:   CHECK_SIZE(s, size);
  864: 
  865:   STREAM_VERIFY_SANE(s);
  866:   
  867:   if (STREAM_WRITEABLE (s) < size)
  868:     {
  869:       STREAM_BOUND_WARN (s, "put");
  870:       return 0;
  871:     }
  872:   
  873:   memcpy (s->data + s->endp, ptr, size);
  874:   s->endp += size;
  875: 
  876:   return size;
  877: }
  878: 
  879: /* Return current read pointer. 
  880:  * DEPRECATED!
  881:  * Use stream_get_pnt_to if you must, but decoding streams properly
  882:  * is preferred
  883:  */
  884: u_char *
  885: stream_pnt (struct stream *s)
  886: {
  887:   STREAM_VERIFY_SANE(s);
  888:   return s->data + s->getp;
  889: }
  890: 
  891: /* Check does this stream empty? */
  892: int
  893: stream_empty (struct stream *s)
  894: {
  895:   STREAM_VERIFY_SANE(s);
  896: 
  897:   return (s->endp == 0);
  898: }
  899: 
  900: /* Reset stream. */
  901: void
  902: stream_reset (struct stream *s)
  903: {
  904:   STREAM_VERIFY_SANE (s);
  905: 
  906:   s->getp = s->endp = 0;
  907: }
  908: 
  909: /* Write stream contens to the file discriptor. */
  910: int
  911: stream_flush (struct stream *s, int fd)
  912: {
  913:   int nbytes;
  914:   
  915:   STREAM_VERIFY_SANE(s);
  916:   
  917:   nbytes = write (fd, s->data + s->getp, s->endp - s->getp);
  918:   
  919:   return nbytes;
  920: }
  921: 
  922: /* Stream first in first out queue. */
  923: 
  924: struct stream_fifo *
  925: stream_fifo_new (void)
  926: {
  927:   struct stream_fifo *new;
  928:  
  929:   new = XCALLOC (MTYPE_STREAM_FIFO, sizeof (struct stream_fifo));
  930:   return new;
  931: }
  932: 
  933: /* Add new stream to fifo. */
  934: void
  935: stream_fifo_push (struct stream_fifo *fifo, struct stream *s)
  936: {
  937:   if (fifo->tail)
  938:     fifo->tail->next = s;
  939:   else
  940:     fifo->head = s;
  941:      
  942:   fifo->tail = s;
  943: 
  944:   fifo->count++;
  945: }
  946: 
  947: /* Delete first stream from fifo. */
  948: struct stream *
  949: stream_fifo_pop (struct stream_fifo *fifo)
  950: {
  951:   struct stream *s;
  952:   
  953:   s = fifo->head; 
  954: 
  955:   if (s)
  956:     { 
  957:       fifo->head = s->next;
  958: 
  959:       if (fifo->head == NULL)
  960: 	fifo->tail = NULL;
  961: 
  962:       fifo->count--;
  963:     }
  964: 
  965:   return s; 
  966: }
  967: 
  968: /* Return first fifo entry. */
  969: struct stream *
  970: stream_fifo_head (struct stream_fifo *fifo)
  971: {
  972:   return fifo->head;
  973: }
  974: 
  975: void
  976: stream_fifo_clean (struct stream_fifo *fifo)
  977: {
  978:   struct stream *s;
  979:   struct stream *next;
  980: 
  981:   for (s = fifo->head; s; s = next)
  982:     {
  983:       next = s->next;
  984:       stream_free (s);
  985:     }
  986:   fifo->head = fifo->tail = NULL;
  987:   fifo->count = 0;
  988: }
  989: 
  990: void
  991: stream_fifo_free (struct stream_fifo *fifo)
  992: {
  993:   stream_fifo_clean (fifo);
  994:   XFREE (MTYPE_STREAM_FIFO, fifo);
  995: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>