File:  [ELWIX - Embedded LightWeight unIX -] / embedaddon / libevent / evrpc.c
Revision 1.1.1.1 (vendor branch): download - view: text, annotated - select for diffs - revision graph
Tue Feb 21 23:02:54 2012 UTC (12 years, 3 months ago) by misho
Branches: libevent, MAIN
CVS tags: v1_4_14bp0, v1_4_14b, HEAD
libevent

    1: /*
    2:  * Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
    3:  * All rights reserved.
    4:  *
    5:  * Redistribution and use in source and binary forms, with or without
    6:  * modification, are permitted provided that the following conditions
    7:  * are met:
    8:  * 1. Redistributions of source code must retain the above copyright
    9:  *    notice, this list of conditions and the following disclaimer.
   10:  * 2. Redistributions in binary form must reproduce the above copyright
   11:  *    notice, this list of conditions and the following disclaimer in the
   12:  *    documentation and/or other materials provided with the distribution.
   13:  * 3. The name of the author may not be used to endorse or promote products
   14:  *    derived from this software without specific prior written permission.
   15:  *
   16:  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
   17:  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   18:  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
   19:  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
   20:  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
   21:  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
   22:  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
   23:  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
   24:  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
   25:  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
   26:  */
   27: #ifdef HAVE_CONFIG_H
   28: #include "config.h"
   29: #endif
   30: 
   31: #ifdef WIN32
   32: #define WIN32_LEAN_AND_MEAN
   33: #include <winsock2.h>
   34: #include <windows.h>
   35: #undef WIN32_LEAN_AND_MEAN
   36: #endif
   37: 
   38: #include <sys/types.h>
   39: #ifndef WIN32
   40: #include <sys/socket.h>
   41: #endif
   42: #ifdef HAVE_SYS_TIME_H
   43: #include <sys/time.h>
   44: #endif
   45: #include <sys/queue.h>
   46: #include <stdio.h>
   47: #include <stdlib.h>
   48: #ifndef WIN32
   49: #include <unistd.h>
   50: #endif
   51: #include <errno.h>
   52: #include <signal.h>
   53: #include <string.h>
   54: #include <assert.h>
   55: 
   56: #include "event.h"
   57: #include "evrpc.h"
   58: #include "evrpc-internal.h"
   59: #include "evhttp.h"
   60: #include "evutil.h"
   61: #include "log.h"
   62: 
   63: struct evrpc_base *
   64: evrpc_init(struct evhttp *http_server)
   65: {
   66: 	struct evrpc_base* base = calloc(1, sizeof(struct evrpc_base));
   67: 	if (base == NULL)
   68: 		return (NULL);
   69: 
   70: 	/* we rely on the tagging sub system */
   71: 	evtag_init();
   72: 
   73: 	TAILQ_INIT(&base->registered_rpcs);
   74: 	TAILQ_INIT(&base->input_hooks);
   75: 	TAILQ_INIT(&base->output_hooks);
   76: 	base->http_server = http_server;
   77: 
   78: 	return (base);
   79: }
   80: 
   81: void
   82: evrpc_free(struct evrpc_base *base)
   83: {
   84: 	struct evrpc *rpc;
   85: 	struct evrpc_hook *hook;
   86: 
   87: 	while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
   88: 		assert(evrpc_unregister_rpc(base, rpc->uri));
   89: 	}
   90: 	while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
   91: 		assert(evrpc_remove_hook(base, EVRPC_INPUT, hook));
   92: 	}
   93: 	while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
   94: 		assert(evrpc_remove_hook(base, EVRPC_OUTPUT, hook));
   95: 	}
   96: 	free(base);
   97: }
   98: 
   99: void *
  100: evrpc_add_hook(void *vbase,
  101:     enum EVRPC_HOOK_TYPE hook_type,
  102:     int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
  103:     void *cb_arg)
  104: {
  105: 	struct _evrpc_hooks *base = vbase;
  106: 	struct evrpc_hook_list *head = NULL;
  107: 	struct evrpc_hook *hook = NULL;
  108: 	switch (hook_type) {
  109: 	case EVRPC_INPUT:
  110: 		head = &base->in_hooks;
  111: 		break;
  112: 	case EVRPC_OUTPUT:
  113: 		head = &base->out_hooks;
  114: 		break;
  115: 	default:
  116: 		assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
  117: 	}
  118: 
  119: 	hook = calloc(1, sizeof(struct evrpc_hook));
  120: 	assert(hook != NULL);
  121: 	
  122: 	hook->process = cb;
  123: 	hook->process_arg = cb_arg;
  124: 	TAILQ_INSERT_TAIL(head, hook, next);
  125: 
  126: 	return (hook);
  127: }
  128: 
  129: static int
  130: evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
  131: {
  132: 	struct evrpc_hook *hook = NULL;
  133: 	TAILQ_FOREACH(hook, head, next) {
  134: 		if (hook == handle) {
  135: 			TAILQ_REMOVE(head, hook, next);
  136: 			free(hook);
  137: 			return (1);
  138: 		}
  139: 	}
  140: 
  141: 	return (0);
  142: }
  143: 
  144: /*
  145:  * remove the hook specified by the handle
  146:  */
  147: 
  148: int
  149: evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
  150: {
  151: 	struct _evrpc_hooks *base = vbase;
  152: 	struct evrpc_hook_list *head = NULL;
  153: 	switch (hook_type) {
  154: 	case EVRPC_INPUT:
  155: 		head = &base->in_hooks;
  156: 		break;
  157: 	case EVRPC_OUTPUT:
  158: 		head = &base->out_hooks;
  159: 		break;
  160: 	default:
  161: 		assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
  162: 	}
  163: 
  164: 	return (evrpc_remove_hook_internal(head, handle));
  165: }
  166: 
  167: static int
  168: evrpc_process_hooks(struct evrpc_hook_list *head,
  169:     struct evhttp_request *req, struct evbuffer *evbuf)
  170: {
  171: 	struct evrpc_hook *hook;
  172: 	TAILQ_FOREACH(hook, head, next) {
  173: 		if (hook->process(req, evbuf, hook->process_arg) == -1)
  174: 			return (-1);
  175: 	}
  176: 
  177: 	return (0);
  178: }
  179: 
  180: static void evrpc_pool_schedule(struct evrpc_pool *pool);
  181: static void evrpc_request_cb(struct evhttp_request *, void *);
  182: void evrpc_request_done(struct evrpc_req_generic*);
  183: 
  184: /*
  185:  * Registers a new RPC with the HTTP server.   The evrpc object is expected
  186:  * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
  187:  * calls this function.
  188:  */
  189: 
  190: static char *
  191: evrpc_construct_uri(const char *uri)
  192: {
  193: 	char *constructed_uri;
  194: 	int constructed_uri_len;
  195: 
  196: 	constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
  197: 	if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
  198: 		event_err(1, "%s: failed to register rpc at %s",
  199: 		    __func__, uri);
  200: 	memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
  201: 	memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
  202: 	constructed_uri[constructed_uri_len - 1] = '\0';
  203: 
  204: 	return (constructed_uri);
  205: }
  206: 
  207: int
  208: evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
  209:     void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
  210: {
  211: 	char *constructed_uri = evrpc_construct_uri(rpc->uri);
  212: 
  213: 	rpc->base = base;
  214: 	rpc->cb = cb;
  215: 	rpc->cb_arg = cb_arg;
  216: 
  217: 	TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
  218: 
  219: 	evhttp_set_cb(base->http_server,
  220: 	    constructed_uri,
  221: 	    evrpc_request_cb,
  222: 	    rpc);
  223: 	
  224: 	free(constructed_uri);
  225: 
  226: 	return (0);
  227: }
  228: 
  229: int
  230: evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
  231: {
  232: 	char *registered_uri = NULL;
  233: 	struct evrpc *rpc;
  234: 
  235: 	/* find the right rpc; linear search might be slow */
  236: 	TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
  237: 		if (strcmp(rpc->uri, name) == 0)
  238: 			break;
  239: 	}
  240: 	if (rpc == NULL) {
  241: 		/* We did not find an RPC with this name */
  242: 		return (-1);
  243: 	}
  244: 	TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
  245: 	
  246: 	free((char *)rpc->uri);
  247: 	free(rpc);
  248: 
  249:         registered_uri = evrpc_construct_uri(name);
  250: 
  251: 	/* remove the http server callback */
  252: 	assert(evhttp_del_cb(base->http_server, registered_uri) == 0);
  253: 
  254: 	free(registered_uri);
  255: 	return (0);
  256: }
  257: 
  258: static void
  259: evrpc_request_cb(struct evhttp_request *req, void *arg)
  260: {
  261: 	struct evrpc *rpc = arg;
  262: 	struct evrpc_req_generic *rpc_state = NULL;
  263: 
  264: 	/* let's verify the outside parameters */
  265: 	if (req->type != EVHTTP_REQ_POST ||
  266: 	    EVBUFFER_LENGTH(req->input_buffer) <= 0)
  267: 		goto error;
  268: 
  269: 	/*
  270: 	 * we might want to allow hooks to suspend the processing,
  271: 	 * but at the moment, we assume that they just act as simple
  272: 	 * filters.
  273: 	 */
  274: 	if (evrpc_process_hooks(&rpc->base->input_hooks,
  275: 		req, req->input_buffer) == -1)
  276: 		goto error;
  277: 
  278: 	rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
  279: 	if (rpc_state == NULL)
  280: 		goto error;
  281: 
  282: 	/* let's check that we can parse the request */
  283: 	rpc_state->request = rpc->request_new();
  284: 	if (rpc_state->request == NULL)
  285: 		goto error;
  286: 
  287: 	rpc_state->rpc = rpc;
  288: 
  289: 	if (rpc->request_unmarshal(
  290: 		    rpc_state->request, req->input_buffer) == -1) {
  291: 		/* we failed to parse the request; that's a bummer */
  292: 		goto error;
  293: 	}
  294: 
  295: 	/* at this point, we have a well formed request, prepare the reply */
  296: 
  297: 	rpc_state->reply = rpc->reply_new();
  298: 	if (rpc_state->reply == NULL)
  299: 		goto error;
  300: 
  301: 	rpc_state->http_req = req;
  302: 	rpc_state->done = evrpc_request_done;
  303: 
  304: 	/* give the rpc to the user; they can deal with it */
  305: 	rpc->cb(rpc_state, rpc->cb_arg);
  306: 
  307: 	return;
  308: 
  309: error:
  310: 	evrpc_reqstate_free(rpc_state);
  311: 	evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
  312: 	return;
  313: }
  314: 
  315: void
  316: evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
  317: {
  318: 	/* clean up all memory */
  319: 	if (rpc_state != NULL) {
  320: 		struct evrpc *rpc = rpc_state->rpc;
  321: 
  322: 		if (rpc_state->request != NULL)
  323: 			rpc->request_free(rpc_state->request);
  324: 		if (rpc_state->reply != NULL)
  325: 			rpc->reply_free(rpc_state->reply);
  326: 		free(rpc_state);
  327: 	}
  328: }
  329: 
  330: void
  331: evrpc_request_done(struct evrpc_req_generic* rpc_state)
  332: {
  333: 	struct evhttp_request *req = rpc_state->http_req;
  334: 	struct evrpc *rpc = rpc_state->rpc;
  335: 	struct evbuffer* data = NULL;
  336: 
  337: 	if (rpc->reply_complete(rpc_state->reply) == -1) {
  338: 		/* the reply was not completely filled in.  error out */
  339: 		goto error;
  340: 	}
  341: 
  342: 	if ((data = evbuffer_new()) == NULL) {
  343: 		/* out of memory */
  344: 		goto error;
  345: 	}
  346: 
  347: 	/* serialize the reply */
  348: 	rpc->reply_marshal(data, rpc_state->reply);
  349: 
  350: 	/* do hook based tweaks to the request */
  351: 	if (evrpc_process_hooks(&rpc->base->output_hooks,
  352: 		req, data) == -1)
  353: 		goto error;
  354: 
  355: 	/* on success, we are going to transmit marshaled binary data */
  356: 	if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
  357: 		evhttp_add_header(req->output_headers,
  358: 		    "Content-Type", "application/octet-stream");
  359: 	}
  360: 
  361: 	evhttp_send_reply(req, HTTP_OK, "OK", data);
  362: 
  363: 	evbuffer_free(data);
  364: 
  365: 	evrpc_reqstate_free(rpc_state);
  366: 
  367: 	return;
  368: 
  369: error:
  370: 	if (data != NULL)
  371: 		evbuffer_free(data);
  372: 	evrpc_reqstate_free(rpc_state);
  373: 	evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
  374: 	return;
  375: }
  376: 
  377: /* Client implementation of RPC site */
  378: 
  379: static int evrpc_schedule_request(struct evhttp_connection *connection,
  380:     struct evrpc_request_wrapper *ctx);
  381: 
  382: struct evrpc_pool *
  383: evrpc_pool_new(struct event_base *base)
  384: {
  385: 	struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
  386: 	if (pool == NULL)
  387: 		return (NULL);
  388: 
  389: 	TAILQ_INIT(&pool->connections);
  390: 	TAILQ_INIT(&pool->requests);
  391: 
  392: 	TAILQ_INIT(&pool->input_hooks);
  393: 	TAILQ_INIT(&pool->output_hooks);
  394: 
  395: 	pool->base = base;
  396: 	pool->timeout = -1;
  397: 
  398: 	return (pool);
  399: }
  400: 
  401: static void
  402: evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
  403: {
  404: 	free(request->name);
  405: 	free(request);
  406: }
  407: 
  408: void
  409: evrpc_pool_free(struct evrpc_pool *pool)
  410: {
  411: 	struct evhttp_connection *connection;
  412: 	struct evrpc_request_wrapper *request;
  413: 	struct evrpc_hook *hook;
  414: 
  415: 	while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
  416: 		TAILQ_REMOVE(&pool->requests, request, next);
  417: 		/* if this gets more complicated we need our own function */
  418: 		evrpc_request_wrapper_free(request);
  419: 	}
  420: 
  421: 	while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
  422: 		TAILQ_REMOVE(&pool->connections, connection, next);
  423: 		evhttp_connection_free(connection);
  424: 	}
  425: 
  426: 	while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
  427: 		assert(evrpc_remove_hook(pool, EVRPC_INPUT, hook));
  428: 	}
  429: 
  430: 	while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
  431: 		assert(evrpc_remove_hook(pool, EVRPC_OUTPUT, hook));
  432: 	}
  433: 
  434: 	free(pool);
  435: }
  436: 
  437: /*
  438:  * Add a connection to the RPC pool.   A request scheduled on the pool
  439:  * may use any available connection.
  440:  */
  441: 
  442: void
  443: evrpc_pool_add_connection(struct evrpc_pool *pool,
  444:     struct evhttp_connection *connection) {
  445: 	assert(connection->http_server == NULL);
  446: 	TAILQ_INSERT_TAIL(&pool->connections, connection, next);
  447: 
  448: 	/*
  449: 	 * associate an event base with this connection
  450: 	 */
  451: 	if (pool->base != NULL)
  452: 		evhttp_connection_set_base(connection, pool->base);
  453: 
  454: 	/* 
  455: 	 * unless a timeout was specifically set for a connection,
  456: 	 * the connection inherits the timeout from the pool.
  457: 	 */
  458: 	if (connection->timeout == -1)
  459: 		connection->timeout = pool->timeout;
  460: 
  461: 	/* 
  462: 	 * if we have any requests pending, schedule them with the new
  463: 	 * connections.
  464: 	 */
  465: 
  466: 	if (TAILQ_FIRST(&pool->requests) != NULL) {
  467: 		struct evrpc_request_wrapper *request = 
  468: 		    TAILQ_FIRST(&pool->requests);
  469: 		TAILQ_REMOVE(&pool->requests, request, next);
  470: 		evrpc_schedule_request(connection, request);
  471: 	}
  472: }
  473: 
  474: void
  475: evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
  476: {
  477: 	struct evhttp_connection *evcon;
  478: 	TAILQ_FOREACH(evcon, &pool->connections, next) {
  479: 		evcon->timeout = timeout_in_secs;
  480: 	}
  481: 	pool->timeout = timeout_in_secs;
  482: }
  483: 
  484: 
  485: static void evrpc_reply_done(struct evhttp_request *, void *);
  486: static void evrpc_request_timeout(int, short, void *);
  487: 
  488: /*
  489:  * Finds a connection object associated with the pool that is currently
  490:  * idle and can be used to make a request.
  491:  */
  492: static struct evhttp_connection *
  493: evrpc_pool_find_connection(struct evrpc_pool *pool)
  494: {
  495: 	struct evhttp_connection *connection;
  496: 	TAILQ_FOREACH(connection, &pool->connections, next) {
  497: 		if (TAILQ_FIRST(&connection->requests) == NULL)
  498: 			return (connection);
  499: 	}
  500: 
  501: 	return (NULL);
  502: }
  503: 
  504: /*
  505:  * We assume that the ctx is no longer queued on the pool.
  506:  */
  507: static int
  508: evrpc_schedule_request(struct evhttp_connection *connection,
  509:     struct evrpc_request_wrapper *ctx)
  510: {
  511: 	struct evhttp_request *req = NULL;
  512: 	struct evrpc_pool *pool = ctx->pool;
  513: 	struct evrpc_status status;
  514: 	char *uri = NULL;
  515: 	int res = 0;
  516: 
  517: 	if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
  518: 		goto error;
  519: 
  520: 	/* serialize the request data into the output buffer */
  521: 	ctx->request_marshal(req->output_buffer, ctx->request);
  522: 
  523: 	uri = evrpc_construct_uri(ctx->name);
  524: 	if (uri == NULL)
  525: 		goto error;
  526: 
  527: 	/* we need to know the connection that we might have to abort */
  528: 	ctx->evcon = connection;
  529: 
  530: 	/* apply hooks to the outgoing request */
  531: 	if (evrpc_process_hooks(&pool->output_hooks,
  532: 		req, req->output_buffer) == -1)
  533: 		goto error;
  534: 
  535: 	if (pool->timeout > 0) {
  536: 		/* 
  537: 		 * a timeout after which the whole rpc is going to be aborted.
  538: 		 */
  539: 		struct timeval tv;
  540: 		evutil_timerclear(&tv);
  541: 		tv.tv_sec = pool->timeout;
  542: 		evtimer_add(&ctx->ev_timeout, &tv);
  543: 	}
  544: 
  545: 	/* start the request over the connection */
  546: 	res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
  547: 	free(uri);
  548: 
  549: 	if (res == -1)
  550: 		goto error;
  551: 
  552: 	return (0);
  553: 
  554: error:
  555: 	memset(&status, 0, sizeof(status));
  556: 	status.error = EVRPC_STATUS_ERR_UNSTARTED;
  557: 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
  558: 	evrpc_request_wrapper_free(ctx);
  559: 	return (-1);
  560: }
  561: 
  562: int
  563: evrpc_make_request(struct evrpc_request_wrapper *ctx)
  564: {
  565: 	struct evrpc_pool *pool = ctx->pool;
  566: 
  567: 	/* initialize the event structure for this rpc */
  568: 	evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
  569: 	if (pool->base != NULL)
  570: 		event_base_set(pool->base, &ctx->ev_timeout);
  571: 
  572: 	/* we better have some available connections on the pool */
  573: 	assert(TAILQ_FIRST(&pool->connections) != NULL);
  574: 
  575: 	/* 
  576: 	 * if no connection is available, we queue the request on the pool,
  577: 	 * the next time a connection is empty, the rpc will be send on that.
  578: 	 */
  579: 	TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
  580: 
  581: 	evrpc_pool_schedule(pool);
  582: 
  583: 	return (0);
  584: }
  585: 
  586: static void
  587: evrpc_reply_done(struct evhttp_request *req, void *arg)
  588: {
  589: 	struct evrpc_request_wrapper *ctx = arg;
  590: 	struct evrpc_pool *pool = ctx->pool;
  591: 	struct evrpc_status status;
  592: 	int res = -1;
  593: 	
  594: 	/* cancel any timeout we might have scheduled */
  595: 	event_del(&ctx->ev_timeout);
  596: 
  597: 	memset(&status, 0, sizeof(status));
  598: 	status.http_req = req;
  599: 
  600: 	/* we need to get the reply now */
  601: 	if (req != NULL) {
  602: 		/* apply hooks to the incoming request */
  603: 		if (evrpc_process_hooks(&pool->input_hooks,
  604: 			req, req->input_buffer) == -1) {
  605: 			status.error = EVRPC_STATUS_ERR_HOOKABORTED;
  606: 			res = -1;
  607: 		} else {
  608: 			res = ctx->reply_unmarshal(ctx->reply,
  609: 			    req->input_buffer);
  610: 			if (res == -1) {
  611: 				status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
  612: 			}
  613: 		}
  614: 	} else {
  615: 		status.error = EVRPC_STATUS_ERR_TIMEOUT;
  616: 	}
  617: 
  618: 	if (res == -1) {
  619: 		/* clear everything that we might have written previously */
  620: 		ctx->reply_clear(ctx->reply);
  621: 	}
  622: 
  623: 	(*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
  624: 	
  625: 	evrpc_request_wrapper_free(ctx);
  626: 
  627: 	/* the http layer owns the request structure */
  628: 
  629: 	/* see if we can schedule another request */
  630: 	evrpc_pool_schedule(pool);
  631: }
  632: 
  633: static void
  634: evrpc_pool_schedule(struct evrpc_pool *pool)
  635: {
  636: 	struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
  637: 	struct evhttp_connection *evcon;
  638: 
  639: 	/* if no requests are pending, we have no work */
  640: 	if (ctx == NULL)
  641: 		return;
  642: 
  643: 	if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
  644: 		TAILQ_REMOVE(&pool->requests, ctx, next);
  645: 		evrpc_schedule_request(evcon, ctx);
  646: 	}
  647: }
  648: 
  649: static void
  650: evrpc_request_timeout(int fd, short what, void *arg)
  651: {
  652: 	struct evrpc_request_wrapper *ctx = arg;
  653: 	struct evhttp_connection *evcon = ctx->evcon;
  654: 	assert(evcon != NULL);
  655: 
  656: 	evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
  657: }

FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>