Annotation of embedaddon/quagga/zebra/zebra_fpm.c, revision 1.1.1.2
1.1 misho 1: /*
2: * Main implementation file for interface to Forwarding Plane Manager.
3: *
4: * Copyright (C) 2012 by Open Source Routing.
5: * Copyright (C) 2012 by Internet Systems Consortium, Inc. ("ISC")
6: *
7: * This file is part of GNU Zebra.
8: *
9: * GNU Zebra is free software; you can redistribute it and/or modify it
10: * under the terms of the GNU General Public License as published by the
11: * Free Software Foundation; either version 2, or (at your option) any
12: * later version.
13: *
14: * GNU Zebra is distributed in the hope that it will be useful, but
15: * WITHOUT ANY WARRANTY; without even the implied warranty of
16: * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17: * General Public License for more details.
18: *
19: * You should have received a copy of the GNU General Public License
20: * along with GNU Zebra; see the file COPYING. If not, write to the Free
21: * Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA
22: * 02111-1307, USA.
23: */
24:
25: #include <zebra.h>
26:
27: #include "log.h"
28: #include "stream.h"
29: #include "thread.h"
30: #include "network.h"
31: #include "command.h"
32:
33: #include "zebra/rib.h"
34:
35: #include "fpm/fpm.h"
36: #include "zebra_fpm.h"
37: #include "zebra_fpm_private.h"
38:
39: /*
40: * Interval at which we attempt to connect to the FPM.
41: */
42: #define ZFPM_CONNECT_RETRY_IVL 5
43:
44: /*
45: * Sizes of outgoing and incoming stream buffers for writing/reading
46: * FPM messages.
47: */
48: #define ZFPM_OBUF_SIZE (2 * FPM_MAX_MSG_LEN)
49: #define ZFPM_IBUF_SIZE (FPM_MAX_MSG_LEN)
50:
51: /*
52: * The maximum number of times the FPM socket write callback can call
53: * 'write' before it yields.
54: */
55: #define ZFPM_MAX_WRITES_PER_RUN 10
56:
57: /*
58: * Interval over which we collect statistics.
59: */
60: #define ZFPM_STATS_IVL_SECS 10
61:
62: /*
63: * Structure that holds state for iterating over all route_node
64: * structures that are candidates for being communicated to the FPM.
65: */
66: typedef struct zfpm_rnodes_iter_t_
67: {
68: rib_tables_iter_t tables_iter;
69: route_table_iter_t iter;
70: } zfpm_rnodes_iter_t;
71:
72: /*
73: * Statistics.
74: */
75: typedef struct zfpm_stats_t_ {
76: unsigned long connect_calls;
77: unsigned long connect_no_sock;
78:
79: unsigned long read_cb_calls;
80:
81: unsigned long write_cb_calls;
82: unsigned long write_calls;
83: unsigned long partial_writes;
84: unsigned long max_writes_hit;
85: unsigned long t_write_yields;
86:
87: unsigned long nop_deletes_skipped;
88: unsigned long route_adds;
89: unsigned long route_dels;
90:
91: unsigned long updates_triggered;
92: unsigned long redundant_triggers;
93: unsigned long non_fpm_table_triggers;
94:
95: unsigned long dests_del_after_update;
96:
97: unsigned long t_conn_down_starts;
98: unsigned long t_conn_down_dests_processed;
99: unsigned long t_conn_down_yields;
100: unsigned long t_conn_down_finishes;
101:
102: unsigned long t_conn_up_starts;
103: unsigned long t_conn_up_dests_processed;
104: unsigned long t_conn_up_yields;
105: unsigned long t_conn_up_aborts;
106: unsigned long t_conn_up_finishes;
107:
108: } zfpm_stats_t;
109:
110: /*
111: * States for the FPM state machine.
112: */
113: typedef enum {
114:
115: /*
116: * In this state we are not yet ready to connect to the FPM. This
117: * can happen when this module is disabled, or if we're cleaning up
118: * after a connection has gone down.
119: */
120: ZFPM_STATE_IDLE,
121:
122: /*
123: * Ready to talk to the FPM and periodically trying to connect to
124: * it.
125: */
126: ZFPM_STATE_ACTIVE,
127:
128: /*
129: * In the middle of bringing up a TCP connection. Specifically,
130: * waiting for a connect() call to complete asynchronously.
131: */
132: ZFPM_STATE_CONNECTING,
133:
134: /*
135: * TCP connection to the FPM is up.
136: */
137: ZFPM_STATE_ESTABLISHED
138:
139: } zfpm_state_t;
140:
141: /*
142: * Globals.
143: */
144: typedef struct zfpm_glob_t_
145: {
146:
147: /*
148: * True if the FPM module has been enabled.
149: */
150: int enabled;
151:
152: struct thread_master *master;
153:
154: zfpm_state_t state;
155:
156: /*
157: * Port on which the FPM is running.
158: */
159: int fpm_port;
160:
161: /*
162: * List of rib_dest_t structures to be processed
163: */
164: TAILQ_HEAD (zfpm_dest_q, rib_dest_t_) dest_q;
165:
166: /*
167: * Stream socket to the FPM.
168: */
169: int sock;
170:
171: /*
172: * Buffers for messages to/from the FPM.
173: */
174: struct stream *obuf;
175: struct stream *ibuf;
176:
177: /*
178: * Threads for I/O.
179: */
180: struct thread *t_connect;
181: struct thread *t_write;
182: struct thread *t_read;
183:
184: /*
185: * Thread to clean up after the TCP connection to the FPM goes down
186: * and the state that belongs to it.
187: */
188: struct thread *t_conn_down;
189:
190: struct {
191: zfpm_rnodes_iter_t iter;
192: } t_conn_down_state;
193:
194: /*
195: * Thread to take actions once the TCP conn to the FPM comes up, and
196: * the state that belongs to it.
197: */
198: struct thread *t_conn_up;
199:
200: struct {
201: zfpm_rnodes_iter_t iter;
202: } t_conn_up_state;
203:
204: unsigned long connect_calls;
205: time_t last_connect_call_time;
206:
207: /*
208: * Stats from the start of the current statistics interval up to
209: * now. These are the counters we typically update in the code.
210: */
211: zfpm_stats_t stats;
212:
213: /*
214: * Statistics that were gathered in the last collection interval.
215: */
216: zfpm_stats_t last_ivl_stats;
217:
218: /*
219: * Cumulative stats from the last clear to the start of the current
220: * statistics interval.
221: */
222: zfpm_stats_t cumulative_stats;
223:
224: /*
225: * Stats interval timer.
226: */
227: struct thread *t_stats;
228:
229: /*
230: * If non-zero, the last time when statistics were cleared.
231: */
232: time_t last_stats_clear_time;
233:
234: } zfpm_glob_t;
235:
236: static zfpm_glob_t zfpm_glob_space;
237: static zfpm_glob_t *zfpm_g = &zfpm_glob_space;
238:
239: static int zfpm_read_cb (struct thread *thread);
240: static int zfpm_write_cb (struct thread *thread);
241:
242: static void zfpm_set_state (zfpm_state_t state, const char *reason);
243: static void zfpm_start_connect_timer (const char *reason);
244: static void zfpm_start_stats_timer (void);
245:
246: /*
247: * zfpm_thread_should_yield
248: */
249: static inline int
250: zfpm_thread_should_yield (struct thread *t)
251: {
252: return thread_should_yield (t);
253: }
254:
255: /*
256: * zfpm_state_to_str
257: */
258: static const char *
259: zfpm_state_to_str (zfpm_state_t state)
260: {
261: switch (state)
262: {
263:
264: case ZFPM_STATE_IDLE:
265: return "idle";
266:
267: case ZFPM_STATE_ACTIVE:
268: return "active";
269:
270: case ZFPM_STATE_CONNECTING:
271: return "connecting";
272:
273: case ZFPM_STATE_ESTABLISHED:
274: return "established";
275:
276: default:
277: return "unknown";
278: }
279: }
280:
281: /*
282: * zfpm_get_time
283: */
284: static time_t
285: zfpm_get_time (void)
286: {
287: struct timeval tv;
288:
289: if (quagga_gettime (QUAGGA_CLK_MONOTONIC, &tv) < 0)
290: zlog_warn ("FPM: quagga_gettime failed!!");
291:
292: return tv.tv_sec;
293: }
294:
295: /*
296: * zfpm_get_elapsed_time
297: *
298: * Returns the time elapsed (in seconds) since the given time.
299: */
300: static time_t
301: zfpm_get_elapsed_time (time_t reference)
302: {
303: time_t now;
304:
305: now = zfpm_get_time ();
306:
307: if (now < reference)
308: {
309: assert (0);
310: return 0;
311: }
312:
313: return now - reference;
314: }
315:
316: /*
317: * zfpm_is_table_for_fpm
318: *
319: * Returns TRUE if the the given table is to be communicated to the
320: * FPM.
321: */
322: static inline int
323: zfpm_is_table_for_fpm (struct route_table *table)
324: {
325: rib_table_info_t *info;
326:
327: info = rib_table_info (table);
328:
329: /*
330: * We only send the unicast tables in the main instance to the FPM
331: * at this point.
332: */
1.1.1.2 ! misho 333: if (info->zvrf->vrf_id != 0)
1.1 misho 334: return 0;
335:
336: if (info->safi != SAFI_UNICAST)
337: return 0;
338:
339: return 1;
340: }
341:
342: /*
343: * zfpm_rnodes_iter_init
344: */
345: static inline void
346: zfpm_rnodes_iter_init (zfpm_rnodes_iter_t *iter)
347: {
348: memset (iter, 0, sizeof (*iter));
349: rib_tables_iter_init (&iter->tables_iter);
350:
351: /*
352: * This is a hack, but it makes implementing 'next' easier by
353: * ensuring that route_table_iter_next() will return NULL the first
354: * time we call it.
355: */
356: route_table_iter_init (&iter->iter, NULL);
357: route_table_iter_cleanup (&iter->iter);
358: }
359:
360: /*
361: * zfpm_rnodes_iter_next
362: */
363: static inline struct route_node *
364: zfpm_rnodes_iter_next (zfpm_rnodes_iter_t *iter)
365: {
366: struct route_node *rn;
367: struct route_table *table;
368:
369: while (1)
370: {
371: rn = route_table_iter_next (&iter->iter);
372: if (rn)
373: return rn;
374:
375: /*
376: * We've made our way through this table, go to the next one.
377: */
378: route_table_iter_cleanup (&iter->iter);
379:
380: while ((table = rib_tables_iter_next (&iter->tables_iter)))
381: {
382: if (zfpm_is_table_for_fpm (table))
383: break;
384: }
385:
386: if (!table)
387: return NULL;
388:
389: route_table_iter_init (&iter->iter, table);
390: }
391:
392: return NULL;
393: }
394:
395: /*
396: * zfpm_rnodes_iter_pause
397: */
398: static inline void
399: zfpm_rnodes_iter_pause (zfpm_rnodes_iter_t *iter)
400: {
401: route_table_iter_pause (&iter->iter);
402: }
403:
404: /*
405: * zfpm_rnodes_iter_cleanup
406: */
407: static inline void
408: zfpm_rnodes_iter_cleanup (zfpm_rnodes_iter_t *iter)
409: {
410: route_table_iter_cleanup (&iter->iter);
411: rib_tables_iter_cleanup (&iter->tables_iter);
412: }
413:
414: /*
415: * zfpm_stats_init
416: *
417: * Initialize a statistics block.
418: */
419: static inline void
420: zfpm_stats_init (zfpm_stats_t *stats)
421: {
422: memset (stats, 0, sizeof (*stats));
423: }
424:
425: /*
426: * zfpm_stats_reset
427: */
428: static inline void
429: zfpm_stats_reset (zfpm_stats_t *stats)
430: {
431: zfpm_stats_init (stats);
432: }
433:
434: /*
435: * zfpm_stats_copy
436: */
437: static inline void
438: zfpm_stats_copy (const zfpm_stats_t *src, zfpm_stats_t *dest)
439: {
440: memcpy (dest, src, sizeof (*dest));
441: }
442:
443: /*
444: * zfpm_stats_compose
445: *
446: * Total up the statistics in two stats structures ('s1 and 's2') and
447: * return the result in the third argument, 'result'. Note that the
448: * pointer 'result' may be the same as 's1' or 's2'.
449: *
450: * For simplicity, the implementation below assumes that the stats
451: * structure is composed entirely of counters. This can easily be
452: * changed when necessary.
453: */
454: static void
455: zfpm_stats_compose (const zfpm_stats_t *s1, const zfpm_stats_t *s2,
456: zfpm_stats_t *result)
457: {
458: const unsigned long *p1, *p2;
459: unsigned long *result_p;
460: int i, num_counters;
461:
462: p1 = (const unsigned long *) s1;
463: p2 = (const unsigned long *) s2;
464: result_p = (unsigned long *) result;
465:
466: num_counters = (sizeof (zfpm_stats_t) / sizeof (unsigned long));
467:
468: for (i = 0; i < num_counters; i++)
469: {
470: result_p[i] = p1[i] + p2[i];
471: }
472: }
473:
474: /*
475: * zfpm_read_on
476: */
477: static inline void
478: zfpm_read_on (void)
479: {
480: assert (!zfpm_g->t_read);
481: assert (zfpm_g->sock >= 0);
482:
483: THREAD_READ_ON (zfpm_g->master, zfpm_g->t_read, zfpm_read_cb, 0,
484: zfpm_g->sock);
485: }
486:
487: /*
488: * zfpm_write_on
489: */
490: static inline void
491: zfpm_write_on (void)
492: {
493: assert (!zfpm_g->t_write);
494: assert (zfpm_g->sock >= 0);
495:
496: THREAD_WRITE_ON (zfpm_g->master, zfpm_g->t_write, zfpm_write_cb, 0,
497: zfpm_g->sock);
498: }
499:
500: /*
501: * zfpm_read_off
502: */
503: static inline void
504: zfpm_read_off (void)
505: {
506: THREAD_READ_OFF (zfpm_g->t_read);
507: }
508:
509: /*
510: * zfpm_write_off
511: */
512: static inline void
513: zfpm_write_off (void)
514: {
515: THREAD_WRITE_OFF (zfpm_g->t_write);
516: }
517:
518: /*
519: * zfpm_conn_up_thread_cb
520: *
521: * Callback for actions to be taken when the connection to the FPM
522: * comes up.
523: */
524: static int
525: zfpm_conn_up_thread_cb (struct thread *thread)
526: {
527: struct route_node *rnode;
528: zfpm_rnodes_iter_t *iter;
529: rib_dest_t *dest;
530:
531: assert (zfpm_g->t_conn_up);
532: zfpm_g->t_conn_up = NULL;
533:
534: iter = &zfpm_g->t_conn_up_state.iter;
535:
536: if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
537: {
538: zfpm_debug ("Connection not up anymore, conn_up thread aborting");
539: zfpm_g->stats.t_conn_up_aborts++;
540: goto done;
541: }
542:
543: while ((rnode = zfpm_rnodes_iter_next (iter)))
544: {
545: dest = rib_dest_from_rnode (rnode);
546:
547: if (dest)
548: {
549: zfpm_g->stats.t_conn_up_dests_processed++;
550: zfpm_trigger_update (rnode, NULL);
551: }
552:
553: /*
554: * Yield if need be.
555: */
556: if (!zfpm_thread_should_yield (thread))
557: continue;
558:
559: zfpm_g->stats.t_conn_up_yields++;
560: zfpm_rnodes_iter_pause (iter);
561: zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
562: zfpm_conn_up_thread_cb,
563: 0, 0);
564: return 0;
565: }
566:
567: zfpm_g->stats.t_conn_up_finishes++;
568:
569: done:
570: zfpm_rnodes_iter_cleanup (iter);
571: return 0;
572: }
573:
574: /*
575: * zfpm_connection_up
576: *
577: * Called when the connection to the FPM comes up.
578: */
579: static void
580: zfpm_connection_up (const char *detail)
581: {
582: assert (zfpm_g->sock >= 0);
583: zfpm_read_on ();
584: zfpm_write_on ();
585: zfpm_set_state (ZFPM_STATE_ESTABLISHED, detail);
586:
587: /*
588: * Start thread to push existing routes to the FPM.
589: */
590: assert (!zfpm_g->t_conn_up);
591:
592: zfpm_rnodes_iter_init (&zfpm_g->t_conn_up_state.iter);
593:
594: zfpm_debug ("Starting conn_up thread");
595: zfpm_g->t_conn_up = thread_add_background (zfpm_g->master,
596: zfpm_conn_up_thread_cb, 0, 0);
597: zfpm_g->stats.t_conn_up_starts++;
598: }
599:
600: /*
601: * zfpm_connect_check
602: *
603: * Check if an asynchronous connect() to the FPM is complete.
604: */
605: static void
606: zfpm_connect_check ()
607: {
608: int status;
609: socklen_t slen;
610: int ret;
611:
612: zfpm_read_off ();
613: zfpm_write_off ();
614:
615: slen = sizeof (status);
616: ret = getsockopt (zfpm_g->sock, SOL_SOCKET, SO_ERROR, (void *) &status,
617: &slen);
618:
619: if (ret >= 0 && status == 0)
620: {
621: zfpm_connection_up ("async connect complete");
622: return;
623: }
624:
625: /*
626: * getsockopt() failed or indicated an error on the socket.
627: */
628: close (zfpm_g->sock);
629: zfpm_g->sock = -1;
630:
631: zfpm_start_connect_timer ("getsockopt() after async connect failed");
632: return;
633: }
634:
635: /*
636: * zfpm_conn_down_thread_cb
637: *
638: * Callback that is invoked to clean up state after the TCP connection
639: * to the FPM goes down.
640: */
641: static int
642: zfpm_conn_down_thread_cb (struct thread *thread)
643: {
644: struct route_node *rnode;
645: zfpm_rnodes_iter_t *iter;
646: rib_dest_t *dest;
647:
648: assert (zfpm_g->state == ZFPM_STATE_IDLE);
649:
650: assert (zfpm_g->t_conn_down);
651: zfpm_g->t_conn_down = NULL;
652:
653: iter = &zfpm_g->t_conn_down_state.iter;
654:
655: while ((rnode = zfpm_rnodes_iter_next (iter)))
656: {
657: dest = rib_dest_from_rnode (rnode);
658:
659: if (dest)
660: {
661: if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM))
662: {
663: TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
664: }
665:
666: UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
667: UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
668:
669: zfpm_g->stats.t_conn_down_dests_processed++;
670:
671: /*
672: * Check if the dest should be deleted.
673: */
674: rib_gc_dest(rnode);
675: }
676:
677: /*
678: * Yield if need be.
679: */
680: if (!zfpm_thread_should_yield (thread))
681: continue;
682:
683: zfpm_g->stats.t_conn_down_yields++;
684: zfpm_rnodes_iter_pause (iter);
685: zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
686: zfpm_conn_down_thread_cb,
687: 0, 0);
688: return 0;
689: }
690:
691: zfpm_g->stats.t_conn_down_finishes++;
692: zfpm_rnodes_iter_cleanup (iter);
693:
694: /*
695: * Start the process of connecting to the FPM again.
696: */
697: zfpm_start_connect_timer ("cleanup complete");
698: return 0;
699: }
700:
701: /*
702: * zfpm_connection_down
703: *
704: * Called when the connection to the FPM has gone down.
705: */
706: static void
707: zfpm_connection_down (const char *detail)
708: {
709: if (!detail)
710: detail = "unknown";
711:
712: assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
713:
714: zlog_info ("connection to the FPM has gone down: %s", detail);
715:
716: zfpm_read_off ();
717: zfpm_write_off ();
718:
719: stream_reset (zfpm_g->ibuf);
720: stream_reset (zfpm_g->obuf);
721:
722: if (zfpm_g->sock >= 0) {
723: close (zfpm_g->sock);
724: zfpm_g->sock = -1;
725: }
726:
727: /*
728: * Start thread to clean up state after the connection goes down.
729: */
730: assert (!zfpm_g->t_conn_down);
731: zfpm_debug ("Starting conn_down thread");
732: zfpm_rnodes_iter_init (&zfpm_g->t_conn_down_state.iter);
733: zfpm_g->t_conn_down = thread_add_background (zfpm_g->master,
734: zfpm_conn_down_thread_cb, 0, 0);
735: zfpm_g->stats.t_conn_down_starts++;
736:
737: zfpm_set_state (ZFPM_STATE_IDLE, detail);
738: }
739:
740: /*
741: * zfpm_read_cb
742: */
743: static int
744: zfpm_read_cb (struct thread *thread)
745: {
746: size_t already;
747: struct stream *ibuf;
748: uint16_t msg_len;
749: fpm_msg_hdr_t *hdr;
750:
751: zfpm_g->stats.read_cb_calls++;
752: assert (zfpm_g->t_read);
753: zfpm_g->t_read = NULL;
754:
755: /*
756: * Check if async connect is now done.
757: */
758: if (zfpm_g->state == ZFPM_STATE_CONNECTING)
759: {
760: zfpm_connect_check();
761: return 0;
762: }
763:
764: assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
765: assert (zfpm_g->sock >= 0);
766:
767: ibuf = zfpm_g->ibuf;
768:
769: already = stream_get_endp (ibuf);
770: if (already < FPM_MSG_HDR_LEN)
771: {
772: ssize_t nbyte;
773:
774: nbyte = stream_read_try (ibuf, zfpm_g->sock, FPM_MSG_HDR_LEN - already);
775: if (nbyte == 0 || nbyte == -1)
776: {
777: zfpm_connection_down ("closed socket in read");
778: return 0;
779: }
780:
781: if (nbyte != (ssize_t) (FPM_MSG_HDR_LEN - already))
782: goto done;
783:
784: already = FPM_MSG_HDR_LEN;
785: }
786:
787: stream_set_getp (ibuf, 0);
788:
789: hdr = (fpm_msg_hdr_t *) stream_pnt (ibuf);
790:
791: if (!fpm_msg_hdr_ok (hdr))
792: {
793: zfpm_connection_down ("invalid message header");
794: return 0;
795: }
796:
797: msg_len = fpm_msg_len (hdr);
798:
799: /*
800: * Read out the rest of the packet.
801: */
802: if (already < msg_len)
803: {
804: ssize_t nbyte;
805:
806: nbyte = stream_read_try (ibuf, zfpm_g->sock, msg_len - already);
807:
808: if (nbyte == 0 || nbyte == -1)
809: {
810: zfpm_connection_down ("failed to read message");
811: return 0;
812: }
813:
814: if (nbyte != (ssize_t) (msg_len - already))
815: goto done;
816: }
817:
818: zfpm_debug ("Read out a full fpm message");
819:
820: /*
821: * Just throw it away for now.
822: */
823: stream_reset (ibuf);
824:
825: done:
826: zfpm_read_on ();
827: return 0;
828: }
829:
830: /*
831: * zfpm_writes_pending
832: *
833: * Returns TRUE if we may have something to write to the FPM.
834: */
835: static int
836: zfpm_writes_pending (void)
837: {
838:
839: /*
840: * Check if there is any data in the outbound buffer that has not
841: * been written to the socket yet.
842: */
843: if (stream_get_endp (zfpm_g->obuf) - stream_get_getp (zfpm_g->obuf))
844: return 1;
845:
846: /*
847: * Check if there are any prefixes on the outbound queue.
848: */
849: if (!TAILQ_EMPTY (&zfpm_g->dest_q))
850: return 1;
851:
852: return 0;
853: }
854:
855: /*
856: * zfpm_encode_route
857: *
858: * Encode a message to the FPM with information about the given route.
859: *
860: * Returns the number of bytes written to the buffer. 0 or a negative
861: * value indicates an error.
862: */
863: static inline int
864: zfpm_encode_route (rib_dest_t *dest, struct rib *rib, char *in_buf,
865: size_t in_buf_len)
866: {
867: #ifndef HAVE_NETLINK
868: return 0;
869: #else
870:
871: int cmd;
872:
873: cmd = rib ? RTM_NEWROUTE : RTM_DELROUTE;
874:
875: return zfpm_netlink_encode_route (cmd, dest, rib, in_buf, in_buf_len);
876:
877: #endif /* HAVE_NETLINK */
878: }
879:
880: /*
881: * zfpm_route_for_update
882: *
883: * Returns the rib that is to be sent to the FPM for a given dest.
884: */
885: static struct rib *
886: zfpm_route_for_update (rib_dest_t *dest)
887: {
888: struct rib *rib;
889:
890: RIB_DEST_FOREACH_ROUTE (dest, rib)
891: {
1.1.1.2 ! misho 892: if (!CHECK_FLAG (rib->status, RIB_ENTRY_SELECTED_FIB))
1.1 misho 893: continue;
894:
895: return rib;
896: }
897:
898: /*
899: * We have no route for this destination.
900: */
901: return NULL;
902: }
903:
904: /*
905: * zfpm_build_updates
906: *
907: * Process the outgoing queue and write messages to the outbound
908: * buffer.
909: */
910: static void
911: zfpm_build_updates (void)
912: {
913: struct stream *s;
914: rib_dest_t *dest;
915: unsigned char *buf, *data, *buf_end;
916: size_t msg_len;
917: size_t data_len;
918: fpm_msg_hdr_t *hdr;
919: struct rib *rib;
920: int is_add, write_msg;
921:
922: s = zfpm_g->obuf;
923:
924: assert (stream_empty (s));
925:
926: do {
927:
928: /*
929: * Make sure there is enough space to write another message.
930: */
931: if (STREAM_WRITEABLE (s) < FPM_MAX_MSG_LEN)
932: break;
933:
934: buf = STREAM_DATA (s) + stream_get_endp (s);
935: buf_end = buf + STREAM_WRITEABLE (s);
936:
937: dest = TAILQ_FIRST (&zfpm_g->dest_q);
938: if (!dest)
939: break;
940:
941: assert (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM));
942:
943: hdr = (fpm_msg_hdr_t *) buf;
944: hdr->version = FPM_PROTO_VERSION;
945: hdr->msg_type = FPM_MSG_TYPE_NETLINK;
946:
947: data = fpm_msg_data (hdr);
948:
949: rib = zfpm_route_for_update (dest);
950: is_add = rib ? 1 : 0;
951:
952: write_msg = 1;
953:
954: /*
955: * If this is a route deletion, and we have not sent the route to
956: * the FPM previously, skip it.
957: */
958: if (!is_add && !CHECK_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM))
959: {
960: write_msg = 0;
961: zfpm_g->stats.nop_deletes_skipped++;
962: }
963:
964: if (write_msg) {
965: data_len = zfpm_encode_route (dest, rib, (char *) data, buf_end - data);
966:
967: assert (data_len);
968: if (data_len)
969: {
970: msg_len = fpm_data_len_to_msg_len (data_len);
971: hdr->msg_len = htons (msg_len);
972: stream_forward_endp (s, msg_len);
973:
974: if (is_add)
975: zfpm_g->stats.route_adds++;
976: else
977: zfpm_g->stats.route_dels++;
978: }
979: }
980:
981: /*
982: * Remove the dest from the queue, and reset the flag.
983: */
984: UNSET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
985: TAILQ_REMOVE (&zfpm_g->dest_q, dest, fpm_q_entries);
986:
987: if (is_add)
988: {
989: SET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
990: }
991: else
992: {
993: UNSET_FLAG (dest->flags, RIB_DEST_SENT_TO_FPM);
994: }
995:
996: /*
997: * Delete the destination if necessary.
998: */
999: if (rib_gc_dest (dest->rnode))
1000: zfpm_g->stats.dests_del_after_update++;
1001:
1002: } while (1);
1003:
1004: }
1005:
1006: /*
1007: * zfpm_write_cb
1008: */
1009: static int
1010: zfpm_write_cb (struct thread *thread)
1011: {
1012: struct stream *s;
1013: int num_writes;
1014:
1015: zfpm_g->stats.write_cb_calls++;
1016: assert (zfpm_g->t_write);
1017: zfpm_g->t_write = NULL;
1018:
1019: /*
1020: * Check if async connect is now done.
1021: */
1022: if (zfpm_g->state == ZFPM_STATE_CONNECTING)
1023: {
1024: zfpm_connect_check ();
1025: return 0;
1026: }
1027:
1028: assert (zfpm_g->state == ZFPM_STATE_ESTABLISHED);
1029: assert (zfpm_g->sock >= 0);
1030:
1031: num_writes = 0;
1032:
1033: do
1034: {
1035: int bytes_to_write, bytes_written;
1036:
1037: s = zfpm_g->obuf;
1038:
1039: /*
1040: * If the stream is empty, try fill it up with data.
1041: */
1042: if (stream_empty (s))
1043: {
1044: zfpm_build_updates ();
1045: }
1046:
1047: bytes_to_write = stream_get_endp (s) - stream_get_getp (s);
1048: if (!bytes_to_write)
1049: break;
1050:
1051: bytes_written = write (zfpm_g->sock, STREAM_PNT (s), bytes_to_write);
1052: zfpm_g->stats.write_calls++;
1053: num_writes++;
1054:
1055: if (bytes_written < 0)
1056: {
1057: if (ERRNO_IO_RETRY (errno))
1058: break;
1059:
1060: zfpm_connection_down ("failed to write to socket");
1061: return 0;
1062: }
1063:
1064: if (bytes_written != bytes_to_write)
1065: {
1066:
1067: /*
1068: * Partial write.
1069: */
1070: stream_forward_getp (s, bytes_written);
1071: zfpm_g->stats.partial_writes++;
1072: break;
1073: }
1074:
1075: /*
1076: * We've written out the entire contents of the stream.
1077: */
1078: stream_reset (s);
1079:
1080: if (num_writes >= ZFPM_MAX_WRITES_PER_RUN)
1081: {
1082: zfpm_g->stats.max_writes_hit++;
1083: break;
1084: }
1085:
1086: if (zfpm_thread_should_yield (thread))
1087: {
1088: zfpm_g->stats.t_write_yields++;
1089: break;
1090: }
1091: } while (1);
1092:
1093: if (zfpm_writes_pending ())
1094: zfpm_write_on ();
1095:
1096: return 0;
1097: }
1098:
1099: /*
1100: * zfpm_connect_cb
1101: */
1102: static int
1103: zfpm_connect_cb (struct thread *t)
1104: {
1105: int sock, ret;
1106: struct sockaddr_in serv;
1107:
1108: assert (zfpm_g->t_connect);
1109: zfpm_g->t_connect = NULL;
1110: assert (zfpm_g->state == ZFPM_STATE_ACTIVE);
1111:
1112: sock = socket (AF_INET, SOCK_STREAM, 0);
1113: if (sock < 0)
1114: {
1115: zfpm_debug ("Failed to create socket for connect(): %s", strerror(errno));
1116: zfpm_g->stats.connect_no_sock++;
1117: return 0;
1118: }
1119:
1120: set_nonblocking(sock);
1121:
1122: /* Make server socket. */
1123: memset (&serv, 0, sizeof (serv));
1124: serv.sin_family = AF_INET;
1125: serv.sin_port = htons (zfpm_g->fpm_port);
1126: #ifdef HAVE_STRUCT_SOCKADDR_IN_SIN_LEN
1127: serv.sin_len = sizeof (struct sockaddr_in);
1128: #endif /* HAVE_STRUCT_SOCKADDR_IN_SIN_LEN */
1129: serv.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
1130:
1131: /*
1132: * Connect to the FPM.
1133: */
1134: zfpm_g->connect_calls++;
1135: zfpm_g->stats.connect_calls++;
1136: zfpm_g->last_connect_call_time = zfpm_get_time ();
1137:
1138: ret = connect (sock, (struct sockaddr *) &serv, sizeof (serv));
1139: if (ret >= 0)
1140: {
1141: zfpm_g->sock = sock;
1142: zfpm_connection_up ("connect succeeded");
1143: return 1;
1144: }
1145:
1146: if (errno == EINPROGRESS)
1147: {
1148: zfpm_g->sock = sock;
1149: zfpm_read_on ();
1150: zfpm_write_on ();
1151: zfpm_set_state (ZFPM_STATE_CONNECTING, "async connect in progress");
1152: return 0;
1153: }
1154:
1155: zlog_info ("can't connect to FPM %d: %s", sock, safe_strerror (errno));
1156: close (sock);
1157:
1158: /*
1159: * Restart timer for retrying connection.
1160: */
1161: zfpm_start_connect_timer ("connect() failed");
1162: return 0;
1163: }
1164:
1165: /*
1166: * zfpm_set_state
1167: *
1168: * Move state machine into the given state.
1169: */
1170: static void
1171: zfpm_set_state (zfpm_state_t state, const char *reason)
1172: {
1173: zfpm_state_t cur_state = zfpm_g->state;
1174:
1175: if (!reason)
1176: reason = "Unknown";
1177:
1178: if (state == cur_state)
1179: return;
1180:
1181: zfpm_debug("beginning state transition %s -> %s. Reason: %s",
1182: zfpm_state_to_str (cur_state), zfpm_state_to_str (state),
1183: reason);
1184:
1185: switch (state) {
1186:
1187: case ZFPM_STATE_IDLE:
1188: assert (cur_state == ZFPM_STATE_ESTABLISHED);
1189: break;
1190:
1191: case ZFPM_STATE_ACTIVE:
1192: assert (cur_state == ZFPM_STATE_IDLE ||
1193: cur_state == ZFPM_STATE_CONNECTING);
1194: assert (zfpm_g->t_connect);
1195: break;
1196:
1197: case ZFPM_STATE_CONNECTING:
1198: assert (zfpm_g->sock);
1199: assert (cur_state == ZFPM_STATE_ACTIVE);
1200: assert (zfpm_g->t_read);
1201: assert (zfpm_g->t_write);
1202: break;
1203:
1204: case ZFPM_STATE_ESTABLISHED:
1205: assert (cur_state == ZFPM_STATE_ACTIVE ||
1206: cur_state == ZFPM_STATE_CONNECTING);
1207: assert (zfpm_g->sock);
1208: assert (zfpm_g->t_read);
1209: assert (zfpm_g->t_write);
1210: break;
1211: }
1212:
1213: zfpm_g->state = state;
1214: }
1215:
1216: /*
1217: * zfpm_calc_connect_delay
1218: *
1219: * Returns the number of seconds after which we should attempt to
1220: * reconnect to the FPM.
1221: */
1222: static long
1223: zfpm_calc_connect_delay (void)
1224: {
1225: time_t elapsed;
1226:
1227: /*
1228: * Return 0 if this is our first attempt to connect.
1229: */
1230: if (zfpm_g->connect_calls == 0)
1231: {
1232: return 0;
1233: }
1234:
1235: elapsed = zfpm_get_elapsed_time (zfpm_g->last_connect_call_time);
1236:
1237: if (elapsed > ZFPM_CONNECT_RETRY_IVL) {
1238: return 0;
1239: }
1240:
1241: return ZFPM_CONNECT_RETRY_IVL - elapsed;
1242: }
1243:
1244: /*
1245: * zfpm_start_connect_timer
1246: */
1247: static void
1248: zfpm_start_connect_timer (const char *reason)
1249: {
1250: long delay_secs;
1251:
1252: assert (!zfpm_g->t_connect);
1253: assert (zfpm_g->sock < 0);
1254:
1255: assert(zfpm_g->state == ZFPM_STATE_IDLE ||
1256: zfpm_g->state == ZFPM_STATE_ACTIVE ||
1257: zfpm_g->state == ZFPM_STATE_CONNECTING);
1258:
1259: delay_secs = zfpm_calc_connect_delay();
1260: zfpm_debug ("scheduling connect in %ld seconds", delay_secs);
1261:
1262: THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_connect, zfpm_connect_cb, 0,
1263: delay_secs);
1264: zfpm_set_state (ZFPM_STATE_ACTIVE, reason);
1265: }
1266:
1267: /*
1268: * zfpm_is_enabled
1269: *
1270: * Returns TRUE if the zebra FPM module has been enabled.
1271: */
1272: static inline int
1273: zfpm_is_enabled (void)
1274: {
1275: return zfpm_g->enabled;
1276: }
1277:
1278: /*
1279: * zfpm_conn_is_up
1280: *
1281: * Returns TRUE if the connection to the FPM is up.
1282: */
1283: static inline int
1284: zfpm_conn_is_up (void)
1285: {
1286: if (zfpm_g->state != ZFPM_STATE_ESTABLISHED)
1287: return 0;
1288:
1289: assert (zfpm_g->sock >= 0);
1290:
1291: return 1;
1292: }
1293:
1294: /*
1295: * zfpm_trigger_update
1296: *
1297: * The zebra code invokes this function to indicate that we should
1298: * send an update to the FPM about the given route_node.
1299: */
1300: void
1301: zfpm_trigger_update (struct route_node *rn, const char *reason)
1302: {
1303: rib_dest_t *dest;
1.1.1.2 ! misho 1304: char buf[PREFIX_STRLEN];
1.1 misho 1305:
1306: /*
1307: * Ignore if the connection is down. We will update the FPM about
1308: * all destinations once the connection comes up.
1309: */
1310: if (!zfpm_conn_is_up ())
1311: return;
1312:
1313: dest = rib_dest_from_rnode (rn);
1314:
1315: /*
1316: * Ignore the trigger if the dest is not in a table that we would
1317: * send to the FPM.
1318: */
1319: if (!zfpm_is_table_for_fpm (rib_dest_table (dest)))
1320: {
1321: zfpm_g->stats.non_fpm_table_triggers++;
1322: return;
1323: }
1324:
1325: if (CHECK_FLAG (dest->flags, RIB_DEST_UPDATE_FPM)) {
1326: zfpm_g->stats.redundant_triggers++;
1327: return;
1328: }
1329:
1330: if (reason)
1331: {
1.1.1.2 ! misho 1332: zfpm_debug ("%s triggering update to FPM - Reason: %s",
! 1333: prefix2str (&rn->p, buf, sizeof(buf)), reason);
1.1 misho 1334: }
1335:
1336: SET_FLAG (dest->flags, RIB_DEST_UPDATE_FPM);
1337: TAILQ_INSERT_TAIL (&zfpm_g->dest_q, dest, fpm_q_entries);
1338: zfpm_g->stats.updates_triggered++;
1339:
1340: /*
1341: * Make sure that writes are enabled.
1342: */
1343: if (zfpm_g->t_write)
1344: return;
1345:
1346: zfpm_write_on ();
1347: }
1348:
1349: /*
1350: * zfpm_stats_timer_cb
1351: */
1352: static int
1353: zfpm_stats_timer_cb (struct thread *t)
1354: {
1355: assert (zfpm_g->t_stats);
1356: zfpm_g->t_stats = NULL;
1357:
1358: /*
1359: * Remember the stats collected in the last interval for display
1360: * purposes.
1361: */
1362: zfpm_stats_copy (&zfpm_g->stats, &zfpm_g->last_ivl_stats);
1363:
1364: /*
1365: * Add the current set of stats into the cumulative statistics.
1366: */
1367: zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1368: &zfpm_g->cumulative_stats);
1369:
1370: /*
1371: * Start collecting stats afresh over the next interval.
1372: */
1373: zfpm_stats_reset (&zfpm_g->stats);
1374:
1375: zfpm_start_stats_timer ();
1376:
1377: return 0;
1378: }
1379:
1380: /*
1381: * zfpm_stop_stats_timer
1382: */
1383: static void
1384: zfpm_stop_stats_timer (void)
1385: {
1386: if (!zfpm_g->t_stats)
1387: return;
1388:
1389: zfpm_debug ("Stopping existing stats timer");
1390: THREAD_TIMER_OFF (zfpm_g->t_stats);
1391: }
1392:
1393: /*
1394: * zfpm_start_stats_timer
1395: */
1396: void
1397: zfpm_start_stats_timer (void)
1398: {
1399: assert (!zfpm_g->t_stats);
1400:
1401: THREAD_TIMER_ON (zfpm_g->master, zfpm_g->t_stats, zfpm_stats_timer_cb, 0,
1402: ZFPM_STATS_IVL_SECS);
1403: }
1404:
1405: /*
1406: * Helper macro for zfpm_show_stats() below.
1407: */
1408: #define ZFPM_SHOW_STAT(counter) \
1409: do { \
1410: vty_out (vty, "%-40s %10lu %16lu%s", #counter, total_stats.counter, \
1411: zfpm_g->last_ivl_stats.counter, VTY_NEWLINE); \
1412: } while (0)
1413:
1414: /*
1415: * zfpm_show_stats
1416: */
1417: static void
1418: zfpm_show_stats (struct vty *vty)
1419: {
1420: zfpm_stats_t total_stats;
1421: time_t elapsed;
1422:
1423: vty_out (vty, "%s%-40s %10s Last %2d secs%s%s", VTY_NEWLINE, "Counter",
1424: "Total", ZFPM_STATS_IVL_SECS, VTY_NEWLINE, VTY_NEWLINE);
1425:
1426: /*
1427: * Compute the total stats up to this instant.
1428: */
1429: zfpm_stats_compose (&zfpm_g->cumulative_stats, &zfpm_g->stats,
1430: &total_stats);
1431:
1432: ZFPM_SHOW_STAT (connect_calls);
1433: ZFPM_SHOW_STAT (connect_no_sock);
1434: ZFPM_SHOW_STAT (read_cb_calls);
1435: ZFPM_SHOW_STAT (write_cb_calls);
1436: ZFPM_SHOW_STAT (write_calls);
1437: ZFPM_SHOW_STAT (partial_writes);
1438: ZFPM_SHOW_STAT (max_writes_hit);
1439: ZFPM_SHOW_STAT (t_write_yields);
1440: ZFPM_SHOW_STAT (nop_deletes_skipped);
1441: ZFPM_SHOW_STAT (route_adds);
1442: ZFPM_SHOW_STAT (route_dels);
1443: ZFPM_SHOW_STAT (updates_triggered);
1444: ZFPM_SHOW_STAT (non_fpm_table_triggers);
1445: ZFPM_SHOW_STAT (redundant_triggers);
1446: ZFPM_SHOW_STAT (dests_del_after_update);
1447: ZFPM_SHOW_STAT (t_conn_down_starts);
1448: ZFPM_SHOW_STAT (t_conn_down_dests_processed);
1449: ZFPM_SHOW_STAT (t_conn_down_yields);
1450: ZFPM_SHOW_STAT (t_conn_down_finishes);
1451: ZFPM_SHOW_STAT (t_conn_up_starts);
1452: ZFPM_SHOW_STAT (t_conn_up_dests_processed);
1453: ZFPM_SHOW_STAT (t_conn_up_yields);
1454: ZFPM_SHOW_STAT (t_conn_up_aborts);
1455: ZFPM_SHOW_STAT (t_conn_up_finishes);
1456:
1457: if (!zfpm_g->last_stats_clear_time)
1458: return;
1459:
1460: elapsed = zfpm_get_elapsed_time (zfpm_g->last_stats_clear_time);
1461:
1462: vty_out (vty, "%sStats were cleared %lu seconds ago%s", VTY_NEWLINE,
1463: (unsigned long) elapsed, VTY_NEWLINE);
1464: }
1465:
1466: /*
1467: * zfpm_clear_stats
1468: */
1469: static void
1470: zfpm_clear_stats (struct vty *vty)
1471: {
1472: if (!zfpm_is_enabled ())
1473: {
1474: vty_out (vty, "The FPM module is not enabled...%s", VTY_NEWLINE);
1475: return;
1476: }
1477:
1478: zfpm_stats_reset (&zfpm_g->stats);
1479: zfpm_stats_reset (&zfpm_g->last_ivl_stats);
1480: zfpm_stats_reset (&zfpm_g->cumulative_stats);
1481:
1482: zfpm_stop_stats_timer ();
1483: zfpm_start_stats_timer ();
1484:
1485: zfpm_g->last_stats_clear_time = zfpm_get_time();
1486:
1487: vty_out (vty, "Cleared FPM stats%s", VTY_NEWLINE);
1488: }
1489:
1490: /*
1491: * show_zebra_fpm_stats
1492: */
1493: DEFUN (show_zebra_fpm_stats,
1494: show_zebra_fpm_stats_cmd,
1495: "show zebra fpm stats",
1496: SHOW_STR
1497: "Zebra information\n"
1498: "Forwarding Path Manager information\n"
1499: "Statistics\n")
1500: {
1501: zfpm_show_stats (vty);
1502: return CMD_SUCCESS;
1503: }
1504:
1505: /*
1506: * clear_zebra_fpm_stats
1507: */
1508: DEFUN (clear_zebra_fpm_stats,
1509: clear_zebra_fpm_stats_cmd,
1510: "clear zebra fpm stats",
1511: CLEAR_STR
1512: "Zebra information\n"
1513: "Clear Forwarding Path Manager information\n"
1514: "Statistics\n")
1515: {
1516: zfpm_clear_stats (vty);
1517: return CMD_SUCCESS;
1518: }
1519:
1520: /**
1521: * zfpm_init
1522: *
1523: * One-time initialization of the Zebra FPM module.
1524: *
1525: * @param[in] port port at which FPM is running.
1526: * @param[in] enable TRUE if the zebra FPM module should be enabled
1527: *
1528: * Returns TRUE on success.
1529: */
1530: int
1531: zfpm_init (struct thread_master *master, int enable, uint16_t port)
1532: {
1533: static int initialized = 0;
1534:
1535: if (initialized) {
1536: return 1;
1537: }
1538:
1539: initialized = 1;
1540:
1541: memset (zfpm_g, 0, sizeof (*zfpm_g));
1542: zfpm_g->master = master;
1543: TAILQ_INIT(&zfpm_g->dest_q);
1544: zfpm_g->sock = -1;
1545: zfpm_g->state = ZFPM_STATE_IDLE;
1546:
1547: /*
1548: * Netlink must currently be available for the Zebra-FPM interface
1549: * to be enabled.
1550: */
1551: #ifndef HAVE_NETLINK
1552: enable = 0;
1553: #endif
1554:
1555: zfpm_g->enabled = enable;
1556:
1557: zfpm_stats_init (&zfpm_g->stats);
1558: zfpm_stats_init (&zfpm_g->last_ivl_stats);
1559: zfpm_stats_init (&zfpm_g->cumulative_stats);
1560:
1561: install_element (ENABLE_NODE, &show_zebra_fpm_stats_cmd);
1562: install_element (ENABLE_NODE, &clear_zebra_fpm_stats_cmd);
1563:
1564: if (!enable) {
1565: return 1;
1566: }
1567:
1568: if (!port)
1569: port = FPM_DEFAULT_PORT;
1570:
1571: zfpm_g->fpm_port = port;
1572:
1573: zfpm_g->obuf = stream_new (ZFPM_OBUF_SIZE);
1574: zfpm_g->ibuf = stream_new (ZFPM_IBUF_SIZE);
1575:
1576: zfpm_start_stats_timer ();
1577: zfpm_start_connect_timer ("initialized");
1578:
1579: return 1;
1580: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>