Annotation of embedaddon/strongswan/src/libcharon/plugins/ha/ha_segments.c, revision 1.1.1.1
1.1 misho 1: /*
2: * Copyright (C) 2008 Martin Willi
3: * HSR Hochschule fuer Technik Rapperswil
4: *
5: * This program is free software; you can redistribute it and/or modify it
6: * under the terms of the GNU General Public License as published by the
7: * Free Software Foundation; either version 2 of the License, or (at your
8: * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
9: *
10: * This program is distributed in the hope that it will be useful, but
11: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
12: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
13: * for more details.
14: */
15:
16: #include "ha_segments.h"
17:
18: #include <threading/mutex.h>
19: #include <threading/condvar.h>
20: #include <collections/linked_list.h>
21: #include <threading/thread.h>
22: #include <processing/jobs/callback_job.h>
23:
24: #define DEFAULT_HEARTBEAT_DELAY 1000
25: #define DEFAULT_HEARTBEAT_TIMEOUT 2100
26:
27: typedef struct private_ha_segments_t private_ha_segments_t;
28:
29: /**
30: * Private data of an ha_segments_t object.
31: */
32: struct private_ha_segments_t {
33:
34: /**
35: * Public ha_segments_t interface.
36: */
37: ha_segments_t public;
38:
39: /**
40: * communication socket
41: */
42: ha_socket_t *socket;
43:
44: /**
45: * Sync tunnel, if any
46: */
47: ha_tunnel_t *tunnel;
48:
49: /**
50: * Interface to control segments at kernel level
51: */
52: ha_kernel_t *kernel;
53:
54: /**
55: * Mutex to lock segment manipulation
56: */
57: mutex_t *mutex;
58:
59: /**
60: * Condvar to wait for heartbeats
61: */
62: condvar_t *condvar;
63:
64: /**
65: * Total number of ClusterIP segments
66: */
67: u_int count;
68:
69: /**
70: * mask of active segments
71: */
72: segment_mask_t active;
73:
74: /**
75: * Node number
76: */
77: u_int node;
78:
79: /**
80: * Are we checking for heartbeats?
81: */
82: bool heartbeat_active;
83:
84: /**
85: * Interval we send heartbeats
86: */
87: int heartbeat_delay;
88:
89: /**
90: * Timeout for heartbeats received from other node
91: */
92: int heartbeat_timeout;
93:
94: /**
95: * Interval to check for autobalance, 0 to disable
96: */
97: int autobalance;
98: };
99:
100: /**
101: * Log currently active segments
102: */
103: static void log_segments(private_ha_segments_t *this, bool activated,
104: u_int segment)
105: {
106: char buf[64] = "none", *pos = buf;
107: int i;
108: bool first = TRUE;
109:
110: for (i = 1; i <= this->count; i++)
111: {
112: if (this->active & SEGMENTS_BIT(i))
113: {
114: if (first)
115: {
116: first = FALSE;
117: }
118: else
119: {
120: pos += snprintf(pos, buf + sizeof(buf) - pos, ",");
121: }
122: pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i);
123: }
124: }
125: DBG1(DBG_CFG, "HA segment %d %sactivated, now active: %s",
126: segment, activated ? "" : "de", buf);
127: }
128:
129: /**
130: * Enable/Disable a specific segment
131: */
132: static void enable_disable(private_ha_segments_t *this, u_int segment,
133: bool enable, bool notify)
134: {
135: ike_sa_t *ike_sa;
136: enumerator_t *enumerator;
137: ike_sa_state_t old, new;
138: ha_message_t *message = NULL;
139: ha_message_type_t type;
140: bool changes = FALSE;
141:
142: if (segment > this->count)
143: {
144: return;
145: }
146:
147: if (enable)
148: {
149: old = IKE_PASSIVE;
150: new = IKE_ESTABLISHED;
151: type = HA_SEGMENT_TAKE;
152: if (!(this->active & SEGMENTS_BIT(segment)))
153: {
154: this->active |= SEGMENTS_BIT(segment);
155: this->kernel->activate(this->kernel, segment);
156: changes = TRUE;
157: }
158: }
159: else
160: {
161: old = IKE_ESTABLISHED;
162: new = IKE_PASSIVE;
163: type = HA_SEGMENT_DROP;
164: if (this->active & SEGMENTS_BIT(segment))
165: {
166: this->active &= ~SEGMENTS_BIT(segment);
167: this->kernel->deactivate(this->kernel, segment);
168: changes = TRUE;
169: }
170: }
171:
172: if (changes)
173: {
174: enumerator = charon->ike_sa_manager->create_enumerator(
175: charon->ike_sa_manager, TRUE);
176: while (enumerator->enumerate(enumerator, &ike_sa))
177: {
178: if (ike_sa->get_state(ike_sa) != old)
179: {
180: continue;
181: }
182: if (this->tunnel && this->tunnel->is_sa(this->tunnel, ike_sa))
183: {
184: continue;
185: }
186: if (this->kernel->get_segment(this->kernel,
187: ike_sa->get_other_host(ike_sa)) == segment)
188: {
189: ike_sa->set_state(ike_sa, new);
190: }
191: }
192: enumerator->destroy(enumerator);
193: log_segments(this, enable, segment);
194: }
195:
196: if (notify)
197: {
198: message = ha_message_create(type);
199: message->add_attribute(message, HA_SEGMENT, segment);
200: this->socket->push(this->socket, message);
201: message->destroy(message);
202: }
203: }
204:
205: /**
206: * Enable/Disable all or a specific segment, do locking
207: */
208: static void enable_disable_all(private_ha_segments_t *this, u_int segment,
209: bool enable, bool notify)
210: {
211: int i;
212:
213: this->mutex->lock(this->mutex);
214: if (segment == 0)
215: {
216: for (i = 1; i <= this->count; i++)
217: {
218: enable_disable(this, i, enable, notify);
219: }
220: }
221: else
222: {
223: enable_disable(this, segment, enable, notify);
224: }
225: this->mutex->unlock(this->mutex);
226: }
227:
228: METHOD(ha_segments_t, activate, void,
229: private_ha_segments_t *this, u_int segment, bool notify)
230: {
231: enable_disable_all(this, segment, TRUE, notify);
232: }
233:
234: METHOD(ha_segments_t, deactivate, void,
235: private_ha_segments_t *this, u_int segment, bool notify)
236: {
237: enable_disable_all(this, segment, FALSE, notify);
238: }
239:
240: METHOD(listener_t, alert_hook, bool,
241: private_ha_segments_t *this, ike_sa_t *ike_sa, alert_t alert, va_list args)
242: {
243: if (alert == ALERT_SHUTDOWN_SIGNAL)
244: {
245: if (this->heartbeat_active)
246: {
247: DBG1(DBG_CFG, "HA heartbeat active, dropping all segments");
248: deactivate(this, 0, TRUE);
249: }
250: else
251: {
252: DBG1(DBG_CFG, "no HA heartbeat active, closing IKE_SAs");
253: }
254: }
255: return TRUE;
256: }
257:
258: /**
259: * Monitor heartbeat activity of remote node
260: */
261: static job_requeue_t watchdog(private_ha_segments_t *this)
262: {
263: bool timeout, oldstate;
264:
265: this->mutex->lock(this->mutex);
266: thread_cleanup_push((void*)this->mutex->unlock, this->mutex);
267: oldstate = thread_cancelability(TRUE);
268: timeout = this->condvar->timed_wait(this->condvar, this->mutex,
269: this->heartbeat_timeout);
270: thread_cancelability(oldstate);
271: thread_cleanup_pop(TRUE);
272: if (timeout)
273: {
274: DBG1(DBG_CFG, "no heartbeat received, taking all segments");
275: activate(this, 0, TRUE);
276: /* disable heartbeat detection util we get one */
277: this->heartbeat_active = FALSE;
278: return JOB_REQUEUE_NONE;
279: }
280: return JOB_REQUEUE_DIRECT;
281: }
282:
283: /**
284: * Start the heartbeat detection thread
285: */
286: static void start_watchdog(private_ha_segments_t *this)
287: {
288: this->heartbeat_active = TRUE;
289: lib->processor->queue_job(lib->processor,
290: (job_t*)callback_job_create_with_prio((callback_job_cb_t)watchdog, this,
291: NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
292: }
293:
294: METHOD(ha_segments_t, handle_status, void,
295: private_ha_segments_t *this, segment_mask_t mask)
296: {
297: segment_mask_t missing, twice;
298: int i;
299:
300: this->mutex->lock(this->mutex);
301:
302: missing = ~(this->active | mask);
303: twice = this->active & mask;
304:
305: for (i = 1; i <= this->count; i++)
306: {
307: if (missing & SEGMENTS_BIT(i))
308: {
309: if (this->node == i % 2)
310: {
311: DBG1(DBG_CFG, "HA segment %d was not handled, taking", i);
312: enable_disable(this, i, TRUE, TRUE);
313: }
314: else
315: {
316: DBG1(DBG_CFG, "HA segment %d was not handled, dropping", i);
317: enable_disable(this, i, FALSE, TRUE);
318: }
319: }
320: if (twice & SEGMENTS_BIT(i))
321: {
322: if (this->node == i % 2)
323: {
324: DBG1(DBG_CFG, "HA segment %d was handled twice, taking", i);
325: enable_disable(this, i, TRUE, TRUE);
326: }
327: else
328: {
329: DBG1(DBG_CFG, "HA segment %d was handled twice, dropping", i);
330: enable_disable(this, i, FALSE, TRUE);
331: }
332: }
333: }
334:
335: this->condvar->signal(this->condvar);
336: this->mutex->unlock(this->mutex);
337:
338: if (!this->heartbeat_active)
339: {
340: DBG1(DBG_CFG, "received heartbeat, reenabling watchdog");
341: start_watchdog(this);
342: }
343: }
344:
345: /**
346: * Send a status message with our active segments
347: */
348: static job_requeue_t send_status(private_ha_segments_t *this)
349: {
350: ha_message_t *message;
351: int i;
352:
353: message = ha_message_create(HA_STATUS);
354:
355: this->mutex->lock(this->mutex);
356: for (i = 1; i <= this->count; i++)
357: {
358: if (this->active & SEGMENTS_BIT(i))
359: {
360: message->add_attribute(message, HA_SEGMENT, i);
361: }
362: }
363: this->mutex->unlock(this->mutex);
364:
365: this->socket->push(this->socket, message);
366: message->destroy(message);
367:
368: /* schedule next invocation */
369: return JOB_RESCHEDULE_MS(this->heartbeat_delay);
370: }
371:
372: /**
373: * Start the heartbeat sending task
374: */
375: static void start_heartbeat(private_ha_segments_t *this)
376: {
377: lib->processor->queue_job(lib->processor,
378: (job_t*)callback_job_create_with_prio((callback_job_cb_t)send_status,
379: this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
380: }
381:
382: /**
383: * Take a segment if we are handling less than half of segments
384: */
385: static job_requeue_t autobalance(private_ha_segments_t *this)
386: {
387: int i, active = 0;
388:
389: this->mutex->lock(this->mutex);
390:
391: for (i = 1; i <= this->count; i++)
392: {
393: if (this->active & SEGMENTS_BIT(i))
394: {
395: active++;
396: }
397: }
398: if (active < this->count / 2)
399: {
400: for (i = 1; i <= this->count; i++)
401: {
402: if (!(this->active & SEGMENTS_BIT(i)))
403: {
404: DBG1(DBG_CFG, "autobalancing HA (%d/%d active), taking %d",
405: active, this->count, i);
406: enable_disable(this, i, TRUE, TRUE);
407: /* we claim only one in each interval */
408: break;
409: }
410: }
411: }
412:
413: this->mutex->unlock(this->mutex);
414:
415: return JOB_RESCHEDULE(this->autobalance);
416: }
417:
418: /**
419: * Schedule autobalancing
420: */
421: static void start_autobalance(private_ha_segments_t *this)
422: {
423: DBG1(DBG_CFG, "scheduling HA autobalance every %ds", this->autobalance);
424: lib->scheduler->schedule_job(lib->scheduler,
425: (job_t*)callback_job_create_with_prio((callback_job_cb_t)autobalance,
426: this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL),
427: this->autobalance);
428: }
429:
430: METHOD(ha_segments_t, is_active, bool,
431: private_ha_segments_t *this, u_int segment)
432: {
433: return (this->active & SEGMENTS_BIT(segment)) != 0;
434: }
435:
436: METHOD(ha_segments_t, count, u_int,
437: private_ha_segments_t *this)
438: {
439: return this->count;
440: }
441:
442: METHOD(ha_segments_t, destroy, void,
443: private_ha_segments_t *this)
444: {
445: this->mutex->destroy(this->mutex);
446: this->condvar->destroy(this->condvar);
447: free(this);
448: }
449:
450: /**
451: * See header
452: */
453: ha_segments_t *ha_segments_create(ha_socket_t *socket, ha_kernel_t *kernel,
454: ha_tunnel_t *tunnel, u_int count, u_int node,
455: bool monitor)
456: {
457: private_ha_segments_t *this;
458:
459: INIT(this,
460: .public = {
461: .listener = {
462: .alert = _alert_hook,
463: },
464: .activate = _activate,
465: .deactivate = _deactivate,
466: .handle_status = _handle_status,
467: .is_active = _is_active,
468: .count = _count,
469: .destroy = _destroy,
470: },
471: .socket = socket,
472: .tunnel = tunnel,
473: .kernel = kernel,
474: .count = count,
475: .node = node,
476: .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
477: .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
478: .heartbeat_delay = lib->settings->get_int(lib->settings,
479: "%s.plugins.ha.heartbeat_delay", DEFAULT_HEARTBEAT_DELAY,
480: lib->ns),
481: .heartbeat_timeout = lib->settings->get_int(lib->settings,
482: "%s.plugins.ha.heartbeat_timeout", DEFAULT_HEARTBEAT_TIMEOUT,
483: lib->ns),
484: .autobalance = lib->settings->get_int(lib->settings,
485: "%s.plugins.ha.autobalance", 0, lib->ns),
486: );
487:
488: if (monitor)
489: {
490: DBG1(DBG_CFG, "starting HA heartbeat, delay %dms, timeout %dms",
491: this->heartbeat_delay, this->heartbeat_timeout);
492: start_heartbeat(this);
493: start_watchdog(this);
494: }
495: if (this->autobalance)
496: {
497: start_autobalance(this);
498: }
499:
500: return &this->public;
501: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>