Return to ha_segments.c CVS log | Up to [ELWIX - Embedded LightWeight unIX -] / embedaddon / strongswan / src / libcharon / plugins / ha |
1.1 misho 1: /* 2: * Copyright (C) 2008 Martin Willi 3: * HSR Hochschule fuer Technik Rapperswil 4: * 5: * This program is free software; you can redistribute it and/or modify it 6: * under the terms of the GNU General Public License as published by the 7: * Free Software Foundation; either version 2 of the License, or (at your 8: * option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>. 9: * 10: * This program is distributed in the hope that it will be useful, but 11: * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY 12: * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13: * for more details. 14: */ 15: 16: #include "ha_segments.h" 17: 18: #include <threading/mutex.h> 19: #include <threading/condvar.h> 20: #include <collections/linked_list.h> 21: #include <threading/thread.h> 22: #include <processing/jobs/callback_job.h> 23: 24: #define DEFAULT_HEARTBEAT_DELAY 1000 25: #define DEFAULT_HEARTBEAT_TIMEOUT 2100 26: 27: typedef struct private_ha_segments_t private_ha_segments_t; 28: 29: /** 30: * Private data of an ha_segments_t object. 31: */ 32: struct private_ha_segments_t { 33: 34: /** 35: * Public ha_segments_t interface. 36: */ 37: ha_segments_t public; 38: 39: /** 40: * communication socket 41: */ 42: ha_socket_t *socket; 43: 44: /** 45: * Sync tunnel, if any 46: */ 47: ha_tunnel_t *tunnel; 48: 49: /** 50: * Interface to control segments at kernel level 51: */ 52: ha_kernel_t *kernel; 53: 54: /** 55: * Mutex to lock segment manipulation 56: */ 57: mutex_t *mutex; 58: 59: /** 60: * Condvar to wait for heartbeats 61: */ 62: condvar_t *condvar; 63: 64: /** 65: * Total number of ClusterIP segments 66: */ 67: u_int count; 68: 69: /** 70: * mask of active segments 71: */ 72: segment_mask_t active; 73: 74: /** 75: * Node number 76: */ 77: u_int node; 78: 79: /** 80: * Are we checking for heartbeats? 81: */ 82: bool heartbeat_active; 83: 84: /** 85: * Interval we send heartbeats 86: */ 87: int heartbeat_delay; 88: 89: /** 90: * Timeout for heartbeats received from other node 91: */ 92: int heartbeat_timeout; 93: 94: /** 95: * Interval to check for autobalance, 0 to disable 96: */ 97: int autobalance; 98: }; 99: 100: /** 101: * Log currently active segments 102: */ 103: static void log_segments(private_ha_segments_t *this, bool activated, 104: u_int segment) 105: { 106: char buf[64] = "none", *pos = buf; 107: int i; 108: bool first = TRUE; 109: 110: for (i = 1; i <= this->count; i++) 111: { 112: if (this->active & SEGMENTS_BIT(i)) 113: { 114: if (first) 115: { 116: first = FALSE; 117: } 118: else 119: { 120: pos += snprintf(pos, buf + sizeof(buf) - pos, ","); 121: } 122: pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i); 123: } 124: } 125: DBG1(DBG_CFG, "HA segment %d %sactivated, now active: %s", 126: segment, activated ? "" : "de", buf); 127: } 128: 129: /** 130: * Enable/Disable a specific segment 131: */ 132: static void enable_disable(private_ha_segments_t *this, u_int segment, 133: bool enable, bool notify) 134: { 135: ike_sa_t *ike_sa; 136: enumerator_t *enumerator; 137: ike_sa_state_t old, new; 138: ha_message_t *message = NULL; 139: ha_message_type_t type; 140: bool changes = FALSE; 141: 142: if (segment > this->count) 143: { 144: return; 145: } 146: 147: if (enable) 148: { 149: old = IKE_PASSIVE; 150: new = IKE_ESTABLISHED; 151: type = HA_SEGMENT_TAKE; 152: if (!(this->active & SEGMENTS_BIT(segment))) 153: { 154: this->active |= SEGMENTS_BIT(segment); 155: this->kernel->activate(this->kernel, segment); 156: changes = TRUE; 157: } 158: } 159: else 160: { 161: old = IKE_ESTABLISHED; 162: new = IKE_PASSIVE; 163: type = HA_SEGMENT_DROP; 164: if (this->active & SEGMENTS_BIT(segment)) 165: { 166: this->active &= ~SEGMENTS_BIT(segment); 167: this->kernel->deactivate(this->kernel, segment); 168: changes = TRUE; 169: } 170: } 171: 172: if (changes) 173: { 174: enumerator = charon->ike_sa_manager->create_enumerator( 175: charon->ike_sa_manager, TRUE); 176: while (enumerator->enumerate(enumerator, &ike_sa)) 177: { 178: if (ike_sa->get_state(ike_sa) != old) 179: { 180: continue; 181: } 182: if (this->tunnel && this->tunnel->is_sa(this->tunnel, ike_sa)) 183: { 184: continue; 185: } 186: if (this->kernel->get_segment(this->kernel, 187: ike_sa->get_other_host(ike_sa)) == segment) 188: { 189: ike_sa->set_state(ike_sa, new); 190: } 191: } 192: enumerator->destroy(enumerator); 193: log_segments(this, enable, segment); 194: } 195: 196: if (notify) 197: { 198: message = ha_message_create(type); 199: message->add_attribute(message, HA_SEGMENT, segment); 200: this->socket->push(this->socket, message); 201: message->destroy(message); 202: } 203: } 204: 205: /** 206: * Enable/Disable all or a specific segment, do locking 207: */ 208: static void enable_disable_all(private_ha_segments_t *this, u_int segment, 209: bool enable, bool notify) 210: { 211: int i; 212: 213: this->mutex->lock(this->mutex); 214: if (segment == 0) 215: { 216: for (i = 1; i <= this->count; i++) 217: { 218: enable_disable(this, i, enable, notify); 219: } 220: } 221: else 222: { 223: enable_disable(this, segment, enable, notify); 224: } 225: this->mutex->unlock(this->mutex); 226: } 227: 228: METHOD(ha_segments_t, activate, void, 229: private_ha_segments_t *this, u_int segment, bool notify) 230: { 231: enable_disable_all(this, segment, TRUE, notify); 232: } 233: 234: METHOD(ha_segments_t, deactivate, void, 235: private_ha_segments_t *this, u_int segment, bool notify) 236: { 237: enable_disable_all(this, segment, FALSE, notify); 238: } 239: 240: METHOD(listener_t, alert_hook, bool, 241: private_ha_segments_t *this, ike_sa_t *ike_sa, alert_t alert, va_list args) 242: { 243: if (alert == ALERT_SHUTDOWN_SIGNAL) 244: { 245: if (this->heartbeat_active) 246: { 247: DBG1(DBG_CFG, "HA heartbeat active, dropping all segments"); 248: deactivate(this, 0, TRUE); 249: } 250: else 251: { 252: DBG1(DBG_CFG, "no HA heartbeat active, closing IKE_SAs"); 253: } 254: } 255: return TRUE; 256: } 257: 258: /** 259: * Monitor heartbeat activity of remote node 260: */ 261: static job_requeue_t watchdog(private_ha_segments_t *this) 262: { 263: bool timeout, oldstate; 264: 265: this->mutex->lock(this->mutex); 266: thread_cleanup_push((void*)this->mutex->unlock, this->mutex); 267: oldstate = thread_cancelability(TRUE); 268: timeout = this->condvar->timed_wait(this->condvar, this->mutex, 269: this->heartbeat_timeout); 270: thread_cancelability(oldstate); 271: thread_cleanup_pop(TRUE); 272: if (timeout) 273: { 274: DBG1(DBG_CFG, "no heartbeat received, taking all segments"); 275: activate(this, 0, TRUE); 276: /* disable heartbeat detection util we get one */ 277: this->heartbeat_active = FALSE; 278: return JOB_REQUEUE_NONE; 279: } 280: return JOB_REQUEUE_DIRECT; 281: } 282: 283: /** 284: * Start the heartbeat detection thread 285: */ 286: static void start_watchdog(private_ha_segments_t *this) 287: { 288: this->heartbeat_active = TRUE; 289: lib->processor->queue_job(lib->processor, 290: (job_t*)callback_job_create_with_prio((callback_job_cb_t)watchdog, this, 291: NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); 292: } 293: 294: METHOD(ha_segments_t, handle_status, void, 295: private_ha_segments_t *this, segment_mask_t mask) 296: { 297: segment_mask_t missing, twice; 298: int i; 299: 300: this->mutex->lock(this->mutex); 301: 302: missing = ~(this->active | mask); 303: twice = this->active & mask; 304: 305: for (i = 1; i <= this->count; i++) 306: { 307: if (missing & SEGMENTS_BIT(i)) 308: { 309: if (this->node == i % 2) 310: { 311: DBG1(DBG_CFG, "HA segment %d was not handled, taking", i); 312: enable_disable(this, i, TRUE, TRUE); 313: } 314: else 315: { 316: DBG1(DBG_CFG, "HA segment %d was not handled, dropping", i); 317: enable_disable(this, i, FALSE, TRUE); 318: } 319: } 320: if (twice & SEGMENTS_BIT(i)) 321: { 322: if (this->node == i % 2) 323: { 324: DBG1(DBG_CFG, "HA segment %d was handled twice, taking", i); 325: enable_disable(this, i, TRUE, TRUE); 326: } 327: else 328: { 329: DBG1(DBG_CFG, "HA segment %d was handled twice, dropping", i); 330: enable_disable(this, i, FALSE, TRUE); 331: } 332: } 333: } 334: 335: this->condvar->signal(this->condvar); 336: this->mutex->unlock(this->mutex); 337: 338: if (!this->heartbeat_active) 339: { 340: DBG1(DBG_CFG, "received heartbeat, reenabling watchdog"); 341: start_watchdog(this); 342: } 343: } 344: 345: /** 346: * Send a status message with our active segments 347: */ 348: static job_requeue_t send_status(private_ha_segments_t *this) 349: { 350: ha_message_t *message; 351: int i; 352: 353: message = ha_message_create(HA_STATUS); 354: 355: this->mutex->lock(this->mutex); 356: for (i = 1; i <= this->count; i++) 357: { 358: if (this->active & SEGMENTS_BIT(i)) 359: { 360: message->add_attribute(message, HA_SEGMENT, i); 361: } 362: } 363: this->mutex->unlock(this->mutex); 364: 365: this->socket->push(this->socket, message); 366: message->destroy(message); 367: 368: /* schedule next invocation */ 369: return JOB_RESCHEDULE_MS(this->heartbeat_delay); 370: } 371: 372: /** 373: * Start the heartbeat sending task 374: */ 375: static void start_heartbeat(private_ha_segments_t *this) 376: { 377: lib->processor->queue_job(lib->processor, 378: (job_t*)callback_job_create_with_prio((callback_job_cb_t)send_status, 379: this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL)); 380: } 381: 382: /** 383: * Take a segment if we are handling less than half of segments 384: */ 385: static job_requeue_t autobalance(private_ha_segments_t *this) 386: { 387: int i, active = 0; 388: 389: this->mutex->lock(this->mutex); 390: 391: for (i = 1; i <= this->count; i++) 392: { 393: if (this->active & SEGMENTS_BIT(i)) 394: { 395: active++; 396: } 397: } 398: if (active < this->count / 2) 399: { 400: for (i = 1; i <= this->count; i++) 401: { 402: if (!(this->active & SEGMENTS_BIT(i))) 403: { 404: DBG1(DBG_CFG, "autobalancing HA (%d/%d active), taking %d", 405: active, this->count, i); 406: enable_disable(this, i, TRUE, TRUE); 407: /* we claim only one in each interval */ 408: break; 409: } 410: } 411: } 412: 413: this->mutex->unlock(this->mutex); 414: 415: return JOB_RESCHEDULE(this->autobalance); 416: } 417: 418: /** 419: * Schedule autobalancing 420: */ 421: static void start_autobalance(private_ha_segments_t *this) 422: { 423: DBG1(DBG_CFG, "scheduling HA autobalance every %ds", this->autobalance); 424: lib->scheduler->schedule_job(lib->scheduler, 425: (job_t*)callback_job_create_with_prio((callback_job_cb_t)autobalance, 426: this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL), 427: this->autobalance); 428: } 429: 430: METHOD(ha_segments_t, is_active, bool, 431: private_ha_segments_t *this, u_int segment) 432: { 433: return (this->active & SEGMENTS_BIT(segment)) != 0; 434: } 435: 436: METHOD(ha_segments_t, count, u_int, 437: private_ha_segments_t *this) 438: { 439: return this->count; 440: } 441: 442: METHOD(ha_segments_t, destroy, void, 443: private_ha_segments_t *this) 444: { 445: this->mutex->destroy(this->mutex); 446: this->condvar->destroy(this->condvar); 447: free(this); 448: } 449: 450: /** 451: * See header 452: */ 453: ha_segments_t *ha_segments_create(ha_socket_t *socket, ha_kernel_t *kernel, 454: ha_tunnel_t *tunnel, u_int count, u_int node, 455: bool monitor) 456: { 457: private_ha_segments_t *this; 458: 459: INIT(this, 460: .public = { 461: .listener = { 462: .alert = _alert_hook, 463: }, 464: .activate = _activate, 465: .deactivate = _deactivate, 466: .handle_status = _handle_status, 467: .is_active = _is_active, 468: .count = _count, 469: .destroy = _destroy, 470: }, 471: .socket = socket, 472: .tunnel = tunnel, 473: .kernel = kernel, 474: .count = count, 475: .node = node, 476: .mutex = mutex_create(MUTEX_TYPE_DEFAULT), 477: .condvar = condvar_create(CONDVAR_TYPE_DEFAULT), 478: .heartbeat_delay = lib->settings->get_int(lib->settings, 479: "%s.plugins.ha.heartbeat_delay", DEFAULT_HEARTBEAT_DELAY, 480: lib->ns), 481: .heartbeat_timeout = lib->settings->get_int(lib->settings, 482: "%s.plugins.ha.heartbeat_timeout", DEFAULT_HEARTBEAT_TIMEOUT, 483: lib->ns), 484: .autobalance = lib->settings->get_int(lib->settings, 485: "%s.plugins.ha.autobalance", 0, lib->ns), 486: ); 487: 488: if (monitor) 489: { 490: DBG1(DBG_CFG, "starting HA heartbeat, delay %dms, timeout %dms", 491: this->heartbeat_delay, this->heartbeat_timeout); 492: start_heartbeat(this); 493: start_watchdog(this); 494: } 495: if (this->autobalance) 496: { 497: start_autobalance(this); 498: } 499: 500: return &this->public; 501: }