Annotation of embedaddon/strongswan/src/libcharon/plugins/ha/ha_segments.c, revision 1.1.1.1

1.1       misho       1: /*
                      2:  * Copyright (C) 2008 Martin Willi
                      3:  * HSR Hochschule fuer Technik Rapperswil
                      4:  *
                      5:  * This program is free software; you can redistribute it and/or modify it
                      6:  * under the terms of the GNU General Public License as published by the
                      7:  * Free Software Foundation; either version 2 of the License, or (at your
                      8:  * option) any later version.  See <http://www.fsf.org/copyleft/gpl.txt>.
                      9:  *
                     10:  * This program is distributed in the hope that it will be useful, but
                     11:  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
                     12:  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
                     13:  * for more details.
                     14:  */
                     15: 
                     16: #include "ha_segments.h"
                     17: 
                     18: #include <threading/mutex.h>
                     19: #include <threading/condvar.h>
                     20: #include <collections/linked_list.h>
                     21: #include <threading/thread.h>
                     22: #include <processing/jobs/callback_job.h>
                     23: 
                     24: #define DEFAULT_HEARTBEAT_DELAY 1000
                     25: #define DEFAULT_HEARTBEAT_TIMEOUT 2100
                     26: 
                     27: typedef struct private_ha_segments_t private_ha_segments_t;
                     28: 
                     29: /**
                     30:  * Private data of an ha_segments_t object.
                     31:  */
                     32: struct private_ha_segments_t {
                     33: 
                     34:        /**
                     35:         * Public ha_segments_t interface.
                     36:         */
                     37:        ha_segments_t public;
                     38: 
                     39:        /**
                     40:         * communication socket
                     41:         */
                     42:        ha_socket_t *socket;
                     43: 
                     44:        /**
                     45:         * Sync tunnel, if any
                     46:         */
                     47:        ha_tunnel_t *tunnel;
                     48: 
                     49:        /**
                     50:         * Interface to control segments at kernel level
                     51:         */
                     52:        ha_kernel_t *kernel;
                     53: 
                     54:        /**
                     55:         * Mutex to lock segment manipulation
                     56:         */
                     57:        mutex_t *mutex;
                     58: 
                     59:        /**
                     60:         * Condvar to wait for heartbeats
                     61:         */
                     62:        condvar_t *condvar;
                     63: 
                     64:        /**
                     65:         * Total number of ClusterIP segments
                     66:         */
                     67:        u_int count;
                     68: 
                     69:        /**
                     70:         * mask of active segments
                     71:         */
                     72:        segment_mask_t active;
                     73: 
                     74:        /**
                     75:         * Node number
                     76:         */
                     77:        u_int node;
                     78: 
                     79:        /**
                     80:         * Are we checking for heartbeats?
                     81:         */
                     82:        bool heartbeat_active;
                     83: 
                     84:        /**
                     85:         * Interval we send heartbeats
                     86:         */
                     87:        int heartbeat_delay;
                     88: 
                     89:        /**
                     90:         * Timeout for heartbeats received from other node
                     91:         */
                     92:        int heartbeat_timeout;
                     93: 
                     94:        /**
                     95:         * Interval to check for autobalance, 0 to disable
                     96:         */
                     97:        int autobalance;
                     98: };
                     99: 
                    100: /**
                    101:  * Log currently active segments
                    102:  */
                    103: static void log_segments(private_ha_segments_t *this, bool activated,
                    104:                                                 u_int segment)
                    105: {
                    106:        char buf[64] = "none", *pos = buf;
                    107:        int i;
                    108:        bool first = TRUE;
                    109: 
                    110:        for (i = 1; i <= this->count; i++)
                    111:        {
                    112:                if (this->active & SEGMENTS_BIT(i))
                    113:                {
                    114:                        if (first)
                    115:                        {
                    116:                                first = FALSE;
                    117:                        }
                    118:                        else
                    119:                        {
                    120:                                pos += snprintf(pos, buf + sizeof(buf) - pos, ",");
                    121:                        }
                    122:                        pos += snprintf(pos, buf + sizeof(buf) - pos, "%d", i);
                    123:                }
                    124:        }
                    125:        DBG1(DBG_CFG, "HA segment %d %sactivated, now active: %s",
                    126:                 segment, activated ? "" : "de", buf);
                    127: }
                    128: 
                    129: /**
                    130:  * Enable/Disable a specific segment
                    131:  */
                    132: static void enable_disable(private_ha_segments_t *this, u_int segment,
                    133:                                                   bool enable, bool notify)
                    134: {
                    135:        ike_sa_t *ike_sa;
                    136:        enumerator_t *enumerator;
                    137:        ike_sa_state_t old, new;
                    138:        ha_message_t *message = NULL;
                    139:        ha_message_type_t type;
                    140:        bool changes = FALSE;
                    141: 
                    142:        if (segment > this->count)
                    143:        {
                    144:                return;
                    145:        }
                    146: 
                    147:        if (enable)
                    148:        {
                    149:                old = IKE_PASSIVE;
                    150:                new = IKE_ESTABLISHED;
                    151:                type = HA_SEGMENT_TAKE;
                    152:                if (!(this->active & SEGMENTS_BIT(segment)))
                    153:                {
                    154:                        this->active |= SEGMENTS_BIT(segment);
                    155:                        this->kernel->activate(this->kernel, segment);
                    156:                        changes = TRUE;
                    157:                }
                    158:        }
                    159:        else
                    160:        {
                    161:                old = IKE_ESTABLISHED;
                    162:                new = IKE_PASSIVE;
                    163:                type = HA_SEGMENT_DROP;
                    164:                if (this->active & SEGMENTS_BIT(segment))
                    165:                {
                    166:                        this->active &= ~SEGMENTS_BIT(segment);
                    167:                        this->kernel->deactivate(this->kernel, segment);
                    168:                        changes = TRUE;
                    169:                }
                    170:        }
                    171: 
                    172:        if (changes)
                    173:        {
                    174:                enumerator = charon->ike_sa_manager->create_enumerator(
                    175:                                                                                                charon->ike_sa_manager, TRUE);
                    176:                while (enumerator->enumerate(enumerator, &ike_sa))
                    177:                {
                    178:                        if (ike_sa->get_state(ike_sa) != old)
                    179:                        {
                    180:                                continue;
                    181:                        }
                    182:                        if (this->tunnel && this->tunnel->is_sa(this->tunnel, ike_sa))
                    183:                        {
                    184:                                continue;
                    185:                        }
                    186:                        if (this->kernel->get_segment(this->kernel,
                    187:                                                                        ike_sa->get_other_host(ike_sa)) == segment)
                    188:                        {
                    189:                                ike_sa->set_state(ike_sa, new);
                    190:                        }
                    191:                }
                    192:                enumerator->destroy(enumerator);
                    193:                log_segments(this, enable, segment);
                    194:        }
                    195: 
                    196:        if (notify)
                    197:        {
                    198:                message = ha_message_create(type);
                    199:                message->add_attribute(message, HA_SEGMENT, segment);
                    200:                this->socket->push(this->socket, message);
                    201:                message->destroy(message);
                    202:        }
                    203: }
                    204: 
                    205: /**
                    206:  * Enable/Disable all or a specific segment, do locking
                    207:  */
                    208: static void enable_disable_all(private_ha_segments_t *this, u_int segment,
                    209:                                                           bool enable, bool notify)
                    210: {
                    211:        int i;
                    212: 
                    213:        this->mutex->lock(this->mutex);
                    214:        if (segment == 0)
                    215:        {
                    216:                for (i = 1; i <= this->count; i++)
                    217:                {
                    218:                        enable_disable(this, i, enable, notify);
                    219:                }
                    220:        }
                    221:        else
                    222:        {
                    223:                enable_disable(this, segment, enable, notify);
                    224:        }
                    225:        this->mutex->unlock(this->mutex);
                    226: }
                    227: 
                    228: METHOD(ha_segments_t, activate, void,
                    229:        private_ha_segments_t *this, u_int segment, bool notify)
                    230: {
                    231:        enable_disable_all(this, segment, TRUE, notify);
                    232: }
                    233: 
                    234: METHOD(ha_segments_t, deactivate, void,
                    235:        private_ha_segments_t *this, u_int segment, bool notify)
                    236: {
                    237:        enable_disable_all(this, segment, FALSE, notify);
                    238: }
                    239: 
                    240: METHOD(listener_t, alert_hook, bool,
                    241:        private_ha_segments_t *this, ike_sa_t *ike_sa, alert_t alert, va_list args)
                    242: {
                    243:        if (alert == ALERT_SHUTDOWN_SIGNAL)
                    244:        {
                    245:                if (this->heartbeat_active)
                    246:                {
                    247:                        DBG1(DBG_CFG, "HA heartbeat active, dropping all segments");
                    248:                        deactivate(this, 0, TRUE);
                    249:                }
                    250:                else
                    251:                {
                    252:                        DBG1(DBG_CFG, "no HA heartbeat active, closing IKE_SAs");
                    253:                }
                    254:        }
                    255:        return TRUE;
                    256: }
                    257: 
                    258: /**
                    259:  * Monitor heartbeat activity of remote node
                    260:  */
                    261: static job_requeue_t watchdog(private_ha_segments_t *this)
                    262: {
                    263:        bool timeout, oldstate;
                    264: 
                    265:        this->mutex->lock(this->mutex);
                    266:        thread_cleanup_push((void*)this->mutex->unlock, this->mutex);
                    267:        oldstate = thread_cancelability(TRUE);
                    268:        timeout = this->condvar->timed_wait(this->condvar, this->mutex,
                    269:                                                                                this->heartbeat_timeout);
                    270:        thread_cancelability(oldstate);
                    271:        thread_cleanup_pop(TRUE);
                    272:        if (timeout)
                    273:        {
                    274:                DBG1(DBG_CFG, "no heartbeat received, taking all segments");
                    275:                activate(this, 0, TRUE);
                    276:                /* disable heartbeat detection util we get one */
                    277:                this->heartbeat_active = FALSE;
                    278:                return JOB_REQUEUE_NONE;
                    279:        }
                    280:        return JOB_REQUEUE_DIRECT;
                    281: }
                    282: 
                    283: /**
                    284:  * Start the heartbeat detection thread
                    285:  */
                    286: static void start_watchdog(private_ha_segments_t *this)
                    287: {
                    288:        this->heartbeat_active = TRUE;
                    289:        lib->processor->queue_job(lib->processor,
                    290:                (job_t*)callback_job_create_with_prio((callback_job_cb_t)watchdog, this,
                    291:                                NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
                    292: }
                    293: 
                    294: METHOD(ha_segments_t, handle_status, void,
                    295:        private_ha_segments_t *this, segment_mask_t mask)
                    296: {
                    297:        segment_mask_t missing, twice;
                    298:        int i;
                    299: 
                    300:        this->mutex->lock(this->mutex);
                    301: 
                    302:        missing = ~(this->active | mask);
                    303:        twice = this->active & mask;
                    304: 
                    305:        for (i = 1; i <= this->count; i++)
                    306:        {
                    307:                if (missing & SEGMENTS_BIT(i))
                    308:                {
                    309:                        if (this->node == i % 2)
                    310:                        {
                    311:                                DBG1(DBG_CFG, "HA segment %d was not handled, taking", i);
                    312:                                enable_disable(this, i, TRUE, TRUE);
                    313:                        }
                    314:                        else
                    315:                        {
                    316:                                DBG1(DBG_CFG, "HA segment %d was not handled, dropping", i);
                    317:                                enable_disable(this, i, FALSE, TRUE);
                    318:                        }
                    319:                }
                    320:                if (twice & SEGMENTS_BIT(i))
                    321:                {
                    322:                        if (this->node == i % 2)
                    323:                        {
                    324:                                DBG1(DBG_CFG, "HA segment %d was handled twice, taking", i);
                    325:                                enable_disable(this, i, TRUE, TRUE);
                    326:                        }
                    327:                        else
                    328:                        {
                    329:                                DBG1(DBG_CFG, "HA segment %d was handled twice, dropping", i);
                    330:                                enable_disable(this, i, FALSE, TRUE);
                    331:                        }
                    332:                }
                    333:        }
                    334: 
                    335:        this->condvar->signal(this->condvar);
                    336:        this->mutex->unlock(this->mutex);
                    337: 
                    338:        if (!this->heartbeat_active)
                    339:        {
                    340:                DBG1(DBG_CFG, "received heartbeat, reenabling watchdog");
                    341:                start_watchdog(this);
                    342:        }
                    343: }
                    344: 
                    345: /**
                    346:  * Send a status message with our active segments
                    347:  */
                    348: static job_requeue_t send_status(private_ha_segments_t *this)
                    349: {
                    350:        ha_message_t *message;
                    351:        int i;
                    352: 
                    353:        message = ha_message_create(HA_STATUS);
                    354: 
                    355:        this->mutex->lock(this->mutex);
                    356:        for (i = 1; i <= this->count; i++)
                    357:        {
                    358:                if (this->active & SEGMENTS_BIT(i))
                    359:                {
                    360:                        message->add_attribute(message, HA_SEGMENT, i);
                    361:                }
                    362:        }
                    363:        this->mutex->unlock(this->mutex);
                    364: 
                    365:        this->socket->push(this->socket, message);
                    366:        message->destroy(message);
                    367: 
                    368:        /* schedule next invocation */
                    369:        return JOB_RESCHEDULE_MS(this->heartbeat_delay);
                    370: }
                    371: 
                    372: /**
                    373:  * Start the heartbeat sending task
                    374:  */
                    375: static void start_heartbeat(private_ha_segments_t *this)
                    376: {
                    377:        lib->processor->queue_job(lib->processor,
                    378:                (job_t*)callback_job_create_with_prio((callback_job_cb_t)send_status,
                    379:                        this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL));
                    380: }
                    381: 
                    382: /**
                    383:  * Take a segment if we are handling less than half of segments
                    384:  */
                    385: static job_requeue_t autobalance(private_ha_segments_t *this)
                    386: {
                    387:        int i, active = 0;
                    388: 
                    389:        this->mutex->lock(this->mutex);
                    390: 
                    391:        for (i = 1; i <= this->count; i++)
                    392:        {
                    393:                if (this->active & SEGMENTS_BIT(i))
                    394:                {
                    395:                        active++;
                    396:                }
                    397:        }
                    398:        if (active < this->count / 2)
                    399:        {
                    400:                for (i = 1; i <= this->count; i++)
                    401:                {
                    402:                        if (!(this->active & SEGMENTS_BIT(i)))
                    403:                        {
                    404:                                DBG1(DBG_CFG, "autobalancing HA (%d/%d active), taking %d",
                    405:                                         active, this->count, i);
                    406:                                enable_disable(this, i, TRUE, TRUE);
                    407:                                /* we claim only one in each interval */
                    408:                                break;
                    409:                        }
                    410:                }
                    411:        }
                    412: 
                    413:        this->mutex->unlock(this->mutex);
                    414: 
                    415:        return JOB_RESCHEDULE(this->autobalance);
                    416: }
                    417: 
                    418: /**
                    419:  * Schedule autobalancing
                    420:  */
                    421: static void start_autobalance(private_ha_segments_t *this)
                    422: {
                    423:        DBG1(DBG_CFG, "scheduling HA autobalance every %ds", this->autobalance);
                    424:        lib->scheduler->schedule_job(lib->scheduler,
                    425:                (job_t*)callback_job_create_with_prio((callback_job_cb_t)autobalance,
                    426:                        this, NULL, (callback_job_cancel_t)return_false, JOB_PRIO_CRITICAL),
                    427:                this->autobalance);
                    428: }
                    429: 
                    430: METHOD(ha_segments_t, is_active, bool,
                    431:        private_ha_segments_t *this, u_int segment)
                    432: {
                    433:        return (this->active & SEGMENTS_BIT(segment)) != 0;
                    434: }
                    435: 
                    436: METHOD(ha_segments_t, count, u_int,
                    437:        private_ha_segments_t *this)
                    438: {
                    439:        return this->count;
                    440: }
                    441: 
                    442: METHOD(ha_segments_t, destroy, void,
                    443:        private_ha_segments_t *this)
                    444: {
                    445:        this->mutex->destroy(this->mutex);
                    446:        this->condvar->destroy(this->condvar);
                    447:        free(this);
                    448: }
                    449: 
                    450: /**
                    451:  * See header
                    452:  */
                    453: ha_segments_t *ha_segments_create(ha_socket_t *socket, ha_kernel_t *kernel,
                    454:                                                                  ha_tunnel_t *tunnel, u_int count, u_int node,
                    455:                                                                  bool monitor)
                    456: {
                    457:        private_ha_segments_t *this;
                    458: 
                    459:        INIT(this,
                    460:                .public = {
                    461:                        .listener = {
                    462:                                .alert = _alert_hook,
                    463:                        },
                    464:                        .activate = _activate,
                    465:                        .deactivate = _deactivate,
                    466:                        .handle_status = _handle_status,
                    467:                        .is_active = _is_active,
                    468:                        .count = _count,
                    469:                        .destroy = _destroy,
                    470:                },
                    471:                .socket = socket,
                    472:                .tunnel = tunnel,
                    473:                .kernel = kernel,
                    474:                .count = count,
                    475:                .node = node,
                    476:                .mutex = mutex_create(MUTEX_TYPE_DEFAULT),
                    477:                .condvar = condvar_create(CONDVAR_TYPE_DEFAULT),
                    478:                .heartbeat_delay = lib->settings->get_int(lib->settings,
                    479:                                "%s.plugins.ha.heartbeat_delay", DEFAULT_HEARTBEAT_DELAY,
                    480:                                lib->ns),
                    481:                .heartbeat_timeout = lib->settings->get_int(lib->settings,
                    482:                                "%s.plugins.ha.heartbeat_timeout", DEFAULT_HEARTBEAT_TIMEOUT,
                    483:                                lib->ns),
                    484:                .autobalance = lib->settings->get_int(lib->settings,
                    485:                                "%s.plugins.ha.autobalance", 0, lib->ns),
                    486:        );
                    487: 
                    488:        if (monitor)
                    489:        {
                    490:                DBG1(DBG_CFG, "starting HA heartbeat, delay %dms, timeout %dms",
                    491:                         this->heartbeat_delay, this->heartbeat_timeout);
                    492:                start_heartbeat(this);
                    493:                start_watchdog(this);
                    494:        }
                    495:        if (this->autobalance)
                    496:        {
                    497:                start_autobalance(this);
                    498:        }
                    499: 
                    500:        return &this->public;
                    501: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>