Annotation of embedaddon/libevent/evrpc.c, revision 1.1.1.1
1.1 misho 1: /*
2: * Copyright (c) 2000-2004 Niels Provos <provos@citi.umich.edu>
3: * All rights reserved.
4: *
5: * Redistribution and use in source and binary forms, with or without
6: * modification, are permitted provided that the following conditions
7: * are met:
8: * 1. Redistributions of source code must retain the above copyright
9: * notice, this list of conditions and the following disclaimer.
10: * 2. Redistributions in binary form must reproduce the above copyright
11: * notice, this list of conditions and the following disclaimer in the
12: * documentation and/or other materials provided with the distribution.
13: * 3. The name of the author may not be used to endorse or promote products
14: * derived from this software without specific prior written permission.
15: *
16: * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
17: * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
18: * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
19: * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
20: * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
21: * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
22: * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
23: * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24: * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
25: * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26: */
27: #ifdef HAVE_CONFIG_H
28: #include "config.h"
29: #endif
30:
31: #ifdef WIN32
32: #define WIN32_LEAN_AND_MEAN
33: #include <winsock2.h>
34: #include <windows.h>
35: #undef WIN32_LEAN_AND_MEAN
36: #endif
37:
38: #include <sys/types.h>
39: #ifndef WIN32
40: #include <sys/socket.h>
41: #endif
42: #ifdef HAVE_SYS_TIME_H
43: #include <sys/time.h>
44: #endif
45: #include <sys/queue.h>
46: #include <stdio.h>
47: #include <stdlib.h>
48: #ifndef WIN32
49: #include <unistd.h>
50: #endif
51: #include <errno.h>
52: #include <signal.h>
53: #include <string.h>
54: #include <assert.h>
55:
56: #include "event.h"
57: #include "evrpc.h"
58: #include "evrpc-internal.h"
59: #include "evhttp.h"
60: #include "evutil.h"
61: #include "log.h"
62:
63: struct evrpc_base *
64: evrpc_init(struct evhttp *http_server)
65: {
66: struct evrpc_base* base = calloc(1, sizeof(struct evrpc_base));
67: if (base == NULL)
68: return (NULL);
69:
70: /* we rely on the tagging sub system */
71: evtag_init();
72:
73: TAILQ_INIT(&base->registered_rpcs);
74: TAILQ_INIT(&base->input_hooks);
75: TAILQ_INIT(&base->output_hooks);
76: base->http_server = http_server;
77:
78: return (base);
79: }
80:
81: void
82: evrpc_free(struct evrpc_base *base)
83: {
84: struct evrpc *rpc;
85: struct evrpc_hook *hook;
86:
87: while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
88: assert(evrpc_unregister_rpc(base, rpc->uri));
89: }
90: while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
91: assert(evrpc_remove_hook(base, EVRPC_INPUT, hook));
92: }
93: while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
94: assert(evrpc_remove_hook(base, EVRPC_OUTPUT, hook));
95: }
96: free(base);
97: }
98:
99: void *
100: evrpc_add_hook(void *vbase,
101: enum EVRPC_HOOK_TYPE hook_type,
102: int (*cb)(struct evhttp_request *, struct evbuffer *, void *),
103: void *cb_arg)
104: {
105: struct _evrpc_hooks *base = vbase;
106: struct evrpc_hook_list *head = NULL;
107: struct evrpc_hook *hook = NULL;
108: switch (hook_type) {
109: case EVRPC_INPUT:
110: head = &base->in_hooks;
111: break;
112: case EVRPC_OUTPUT:
113: head = &base->out_hooks;
114: break;
115: default:
116: assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
117: }
118:
119: hook = calloc(1, sizeof(struct evrpc_hook));
120: assert(hook != NULL);
121:
122: hook->process = cb;
123: hook->process_arg = cb_arg;
124: TAILQ_INSERT_TAIL(head, hook, next);
125:
126: return (hook);
127: }
128:
129: static int
130: evrpc_remove_hook_internal(struct evrpc_hook_list *head, void *handle)
131: {
132: struct evrpc_hook *hook = NULL;
133: TAILQ_FOREACH(hook, head, next) {
134: if (hook == handle) {
135: TAILQ_REMOVE(head, hook, next);
136: free(hook);
137: return (1);
138: }
139: }
140:
141: return (0);
142: }
143:
144: /*
145: * remove the hook specified by the handle
146: */
147:
148: int
149: evrpc_remove_hook(void *vbase, enum EVRPC_HOOK_TYPE hook_type, void *handle)
150: {
151: struct _evrpc_hooks *base = vbase;
152: struct evrpc_hook_list *head = NULL;
153: switch (hook_type) {
154: case EVRPC_INPUT:
155: head = &base->in_hooks;
156: break;
157: case EVRPC_OUTPUT:
158: head = &base->out_hooks;
159: break;
160: default:
161: assert(hook_type == EVRPC_INPUT || hook_type == EVRPC_OUTPUT);
162: }
163:
164: return (evrpc_remove_hook_internal(head, handle));
165: }
166:
167: static int
168: evrpc_process_hooks(struct evrpc_hook_list *head,
169: struct evhttp_request *req, struct evbuffer *evbuf)
170: {
171: struct evrpc_hook *hook;
172: TAILQ_FOREACH(hook, head, next) {
173: if (hook->process(req, evbuf, hook->process_arg) == -1)
174: return (-1);
175: }
176:
177: return (0);
178: }
179:
180: static void evrpc_pool_schedule(struct evrpc_pool *pool);
181: static void evrpc_request_cb(struct evhttp_request *, void *);
182: void evrpc_request_done(struct evrpc_req_generic*);
183:
184: /*
185: * Registers a new RPC with the HTTP server. The evrpc object is expected
186: * to have been filled in via the EVRPC_REGISTER_OBJECT macro which in turn
187: * calls this function.
188: */
189:
190: static char *
191: evrpc_construct_uri(const char *uri)
192: {
193: char *constructed_uri;
194: int constructed_uri_len;
195:
196: constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
197: if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
198: event_err(1, "%s: failed to register rpc at %s",
199: __func__, uri);
200: memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
201: memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
202: constructed_uri[constructed_uri_len - 1] = '\0';
203:
204: return (constructed_uri);
205: }
206:
207: int
208: evrpc_register_rpc(struct evrpc_base *base, struct evrpc *rpc,
209: void (*cb)(struct evrpc_req_generic *, void *), void *cb_arg)
210: {
211: char *constructed_uri = evrpc_construct_uri(rpc->uri);
212:
213: rpc->base = base;
214: rpc->cb = cb;
215: rpc->cb_arg = cb_arg;
216:
217: TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
218:
219: evhttp_set_cb(base->http_server,
220: constructed_uri,
221: evrpc_request_cb,
222: rpc);
223:
224: free(constructed_uri);
225:
226: return (0);
227: }
228:
229: int
230: evrpc_unregister_rpc(struct evrpc_base *base, const char *name)
231: {
232: char *registered_uri = NULL;
233: struct evrpc *rpc;
234:
235: /* find the right rpc; linear search might be slow */
236: TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
237: if (strcmp(rpc->uri, name) == 0)
238: break;
239: }
240: if (rpc == NULL) {
241: /* We did not find an RPC with this name */
242: return (-1);
243: }
244: TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
245:
246: free((char *)rpc->uri);
247: free(rpc);
248:
249: registered_uri = evrpc_construct_uri(name);
250:
251: /* remove the http server callback */
252: assert(evhttp_del_cb(base->http_server, registered_uri) == 0);
253:
254: free(registered_uri);
255: return (0);
256: }
257:
258: static void
259: evrpc_request_cb(struct evhttp_request *req, void *arg)
260: {
261: struct evrpc *rpc = arg;
262: struct evrpc_req_generic *rpc_state = NULL;
263:
264: /* let's verify the outside parameters */
265: if (req->type != EVHTTP_REQ_POST ||
266: EVBUFFER_LENGTH(req->input_buffer) <= 0)
267: goto error;
268:
269: /*
270: * we might want to allow hooks to suspend the processing,
271: * but at the moment, we assume that they just act as simple
272: * filters.
273: */
274: if (evrpc_process_hooks(&rpc->base->input_hooks,
275: req, req->input_buffer) == -1)
276: goto error;
277:
278: rpc_state = calloc(1, sizeof(struct evrpc_req_generic));
279: if (rpc_state == NULL)
280: goto error;
281:
282: /* let's check that we can parse the request */
283: rpc_state->request = rpc->request_new();
284: if (rpc_state->request == NULL)
285: goto error;
286:
287: rpc_state->rpc = rpc;
288:
289: if (rpc->request_unmarshal(
290: rpc_state->request, req->input_buffer) == -1) {
291: /* we failed to parse the request; that's a bummer */
292: goto error;
293: }
294:
295: /* at this point, we have a well formed request, prepare the reply */
296:
297: rpc_state->reply = rpc->reply_new();
298: if (rpc_state->reply == NULL)
299: goto error;
300:
301: rpc_state->http_req = req;
302: rpc_state->done = evrpc_request_done;
303:
304: /* give the rpc to the user; they can deal with it */
305: rpc->cb(rpc_state, rpc->cb_arg);
306:
307: return;
308:
309: error:
310: evrpc_reqstate_free(rpc_state);
311: evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
312: return;
313: }
314:
315: void
316: evrpc_reqstate_free(struct evrpc_req_generic* rpc_state)
317: {
318: /* clean up all memory */
319: if (rpc_state != NULL) {
320: struct evrpc *rpc = rpc_state->rpc;
321:
322: if (rpc_state->request != NULL)
323: rpc->request_free(rpc_state->request);
324: if (rpc_state->reply != NULL)
325: rpc->reply_free(rpc_state->reply);
326: free(rpc_state);
327: }
328: }
329:
330: void
331: evrpc_request_done(struct evrpc_req_generic* rpc_state)
332: {
333: struct evhttp_request *req = rpc_state->http_req;
334: struct evrpc *rpc = rpc_state->rpc;
335: struct evbuffer* data = NULL;
336:
337: if (rpc->reply_complete(rpc_state->reply) == -1) {
338: /* the reply was not completely filled in. error out */
339: goto error;
340: }
341:
342: if ((data = evbuffer_new()) == NULL) {
343: /* out of memory */
344: goto error;
345: }
346:
347: /* serialize the reply */
348: rpc->reply_marshal(data, rpc_state->reply);
349:
350: /* do hook based tweaks to the request */
351: if (evrpc_process_hooks(&rpc->base->output_hooks,
352: req, data) == -1)
353: goto error;
354:
355: /* on success, we are going to transmit marshaled binary data */
356: if (evhttp_find_header(req->output_headers, "Content-Type") == NULL) {
357: evhttp_add_header(req->output_headers,
358: "Content-Type", "application/octet-stream");
359: }
360:
361: evhttp_send_reply(req, HTTP_OK, "OK", data);
362:
363: evbuffer_free(data);
364:
365: evrpc_reqstate_free(rpc_state);
366:
367: return;
368:
369: error:
370: if (data != NULL)
371: evbuffer_free(data);
372: evrpc_reqstate_free(rpc_state);
373: evhttp_send_error(req, HTTP_SERVUNAVAIL, "Service Error");
374: return;
375: }
376:
377: /* Client implementation of RPC site */
378:
379: static int evrpc_schedule_request(struct evhttp_connection *connection,
380: struct evrpc_request_wrapper *ctx);
381:
382: struct evrpc_pool *
383: evrpc_pool_new(struct event_base *base)
384: {
385: struct evrpc_pool *pool = calloc(1, sizeof(struct evrpc_pool));
386: if (pool == NULL)
387: return (NULL);
388:
389: TAILQ_INIT(&pool->connections);
390: TAILQ_INIT(&pool->requests);
391:
392: TAILQ_INIT(&pool->input_hooks);
393: TAILQ_INIT(&pool->output_hooks);
394:
395: pool->base = base;
396: pool->timeout = -1;
397:
398: return (pool);
399: }
400:
401: static void
402: evrpc_request_wrapper_free(struct evrpc_request_wrapper *request)
403: {
404: free(request->name);
405: free(request);
406: }
407:
408: void
409: evrpc_pool_free(struct evrpc_pool *pool)
410: {
411: struct evhttp_connection *connection;
412: struct evrpc_request_wrapper *request;
413: struct evrpc_hook *hook;
414:
415: while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
416: TAILQ_REMOVE(&pool->requests, request, next);
417: /* if this gets more complicated we need our own function */
418: evrpc_request_wrapper_free(request);
419: }
420:
421: while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
422: TAILQ_REMOVE(&pool->connections, connection, next);
423: evhttp_connection_free(connection);
424: }
425:
426: while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
427: assert(evrpc_remove_hook(pool, EVRPC_INPUT, hook));
428: }
429:
430: while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
431: assert(evrpc_remove_hook(pool, EVRPC_OUTPUT, hook));
432: }
433:
434: free(pool);
435: }
436:
437: /*
438: * Add a connection to the RPC pool. A request scheduled on the pool
439: * may use any available connection.
440: */
441:
442: void
443: evrpc_pool_add_connection(struct evrpc_pool *pool,
444: struct evhttp_connection *connection) {
445: assert(connection->http_server == NULL);
446: TAILQ_INSERT_TAIL(&pool->connections, connection, next);
447:
448: /*
449: * associate an event base with this connection
450: */
451: if (pool->base != NULL)
452: evhttp_connection_set_base(connection, pool->base);
453:
454: /*
455: * unless a timeout was specifically set for a connection,
456: * the connection inherits the timeout from the pool.
457: */
458: if (connection->timeout == -1)
459: connection->timeout = pool->timeout;
460:
461: /*
462: * if we have any requests pending, schedule them with the new
463: * connections.
464: */
465:
466: if (TAILQ_FIRST(&pool->requests) != NULL) {
467: struct evrpc_request_wrapper *request =
468: TAILQ_FIRST(&pool->requests);
469: TAILQ_REMOVE(&pool->requests, request, next);
470: evrpc_schedule_request(connection, request);
471: }
472: }
473:
474: void
475: evrpc_pool_set_timeout(struct evrpc_pool *pool, int timeout_in_secs)
476: {
477: struct evhttp_connection *evcon;
478: TAILQ_FOREACH(evcon, &pool->connections, next) {
479: evcon->timeout = timeout_in_secs;
480: }
481: pool->timeout = timeout_in_secs;
482: }
483:
484:
485: static void evrpc_reply_done(struct evhttp_request *, void *);
486: static void evrpc_request_timeout(int, short, void *);
487:
488: /*
489: * Finds a connection object associated with the pool that is currently
490: * idle and can be used to make a request.
491: */
492: static struct evhttp_connection *
493: evrpc_pool_find_connection(struct evrpc_pool *pool)
494: {
495: struct evhttp_connection *connection;
496: TAILQ_FOREACH(connection, &pool->connections, next) {
497: if (TAILQ_FIRST(&connection->requests) == NULL)
498: return (connection);
499: }
500:
501: return (NULL);
502: }
503:
504: /*
505: * We assume that the ctx is no longer queued on the pool.
506: */
507: static int
508: evrpc_schedule_request(struct evhttp_connection *connection,
509: struct evrpc_request_wrapper *ctx)
510: {
511: struct evhttp_request *req = NULL;
512: struct evrpc_pool *pool = ctx->pool;
513: struct evrpc_status status;
514: char *uri = NULL;
515: int res = 0;
516:
517: if ((req = evhttp_request_new(evrpc_reply_done, ctx)) == NULL)
518: goto error;
519:
520: /* serialize the request data into the output buffer */
521: ctx->request_marshal(req->output_buffer, ctx->request);
522:
523: uri = evrpc_construct_uri(ctx->name);
524: if (uri == NULL)
525: goto error;
526:
527: /* we need to know the connection that we might have to abort */
528: ctx->evcon = connection;
529:
530: /* apply hooks to the outgoing request */
531: if (evrpc_process_hooks(&pool->output_hooks,
532: req, req->output_buffer) == -1)
533: goto error;
534:
535: if (pool->timeout > 0) {
536: /*
537: * a timeout after which the whole rpc is going to be aborted.
538: */
539: struct timeval tv;
540: evutil_timerclear(&tv);
541: tv.tv_sec = pool->timeout;
542: evtimer_add(&ctx->ev_timeout, &tv);
543: }
544:
545: /* start the request over the connection */
546: res = evhttp_make_request(connection, req, EVHTTP_REQ_POST, uri);
547: free(uri);
548:
549: if (res == -1)
550: goto error;
551:
552: return (0);
553:
554: error:
555: memset(&status, 0, sizeof(status));
556: status.error = EVRPC_STATUS_ERR_UNSTARTED;
557: (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
558: evrpc_request_wrapper_free(ctx);
559: return (-1);
560: }
561:
562: int
563: evrpc_make_request(struct evrpc_request_wrapper *ctx)
564: {
565: struct evrpc_pool *pool = ctx->pool;
566:
567: /* initialize the event structure for this rpc */
568: evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
569: if (pool->base != NULL)
570: event_base_set(pool->base, &ctx->ev_timeout);
571:
572: /* we better have some available connections on the pool */
573: assert(TAILQ_FIRST(&pool->connections) != NULL);
574:
575: /*
576: * if no connection is available, we queue the request on the pool,
577: * the next time a connection is empty, the rpc will be send on that.
578: */
579: TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
580:
581: evrpc_pool_schedule(pool);
582:
583: return (0);
584: }
585:
586: static void
587: evrpc_reply_done(struct evhttp_request *req, void *arg)
588: {
589: struct evrpc_request_wrapper *ctx = arg;
590: struct evrpc_pool *pool = ctx->pool;
591: struct evrpc_status status;
592: int res = -1;
593:
594: /* cancel any timeout we might have scheduled */
595: event_del(&ctx->ev_timeout);
596:
597: memset(&status, 0, sizeof(status));
598: status.http_req = req;
599:
600: /* we need to get the reply now */
601: if (req != NULL) {
602: /* apply hooks to the incoming request */
603: if (evrpc_process_hooks(&pool->input_hooks,
604: req, req->input_buffer) == -1) {
605: status.error = EVRPC_STATUS_ERR_HOOKABORTED;
606: res = -1;
607: } else {
608: res = ctx->reply_unmarshal(ctx->reply,
609: req->input_buffer);
610: if (res == -1) {
611: status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
612: }
613: }
614: } else {
615: status.error = EVRPC_STATUS_ERR_TIMEOUT;
616: }
617:
618: if (res == -1) {
619: /* clear everything that we might have written previously */
620: ctx->reply_clear(ctx->reply);
621: }
622:
623: (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
624:
625: evrpc_request_wrapper_free(ctx);
626:
627: /* the http layer owns the request structure */
628:
629: /* see if we can schedule another request */
630: evrpc_pool_schedule(pool);
631: }
632:
633: static void
634: evrpc_pool_schedule(struct evrpc_pool *pool)
635: {
636: struct evrpc_request_wrapper *ctx = TAILQ_FIRST(&pool->requests);
637: struct evhttp_connection *evcon;
638:
639: /* if no requests are pending, we have no work */
640: if (ctx == NULL)
641: return;
642:
643: if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
644: TAILQ_REMOVE(&pool->requests, ctx, next);
645: evrpc_schedule_request(evcon, ctx);
646: }
647: }
648:
649: static void
650: evrpc_request_timeout(int fd, short what, void *arg)
651: {
652: struct evrpc_request_wrapper *ctx = arg;
653: struct evhttp_connection *evcon = ctx->evcon;
654: assert(evcon != NULL);
655:
656: evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);
657: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>