Annotation of embedaddon/nginx/src/http/modules/ngx_http_upstream_keepalive_module.c, revision 1.1.1.1
1.1 misho 1:
2: /*
3: * Copyright (C) Maxim Dounin
4: * Copyright (C) Nginx, Inc.
5: */
6:
7:
8: #include <ngx_config.h>
9: #include <ngx_core.h>
10: #include <ngx_http.h>
11:
12:
13: typedef struct {
14: ngx_uint_t max_cached;
15:
16: ngx_queue_t cache;
17: ngx_queue_t free;
18:
19: ngx_http_upstream_init_pt original_init_upstream;
20: ngx_http_upstream_init_peer_pt original_init_peer;
21:
22: } ngx_http_upstream_keepalive_srv_conf_t;
23:
24:
25: typedef struct {
26: ngx_http_upstream_keepalive_srv_conf_t *conf;
27:
28: ngx_http_upstream_t *upstream;
29:
30: void *data;
31:
32: ngx_event_get_peer_pt original_get_peer;
33: ngx_event_free_peer_pt original_free_peer;
34:
35: #if (NGX_HTTP_SSL)
36: ngx_event_set_peer_session_pt original_set_session;
37: ngx_event_save_peer_session_pt original_save_session;
38: #endif
39:
40: } ngx_http_upstream_keepalive_peer_data_t;
41:
42:
43: typedef struct {
44: ngx_http_upstream_keepalive_srv_conf_t *conf;
45:
46: ngx_queue_t queue;
47: ngx_connection_t *connection;
48:
49: socklen_t socklen;
50: u_char sockaddr[NGX_SOCKADDRLEN];
51:
52: } ngx_http_upstream_keepalive_cache_t;
53:
54:
55: static ngx_int_t ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
56: ngx_http_upstream_srv_conf_t *us);
57: static ngx_int_t ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc,
58: void *data);
59: static void ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc,
60: void *data, ngx_uint_t state);
61:
62: static void ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev);
63: static void ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev);
64: static void ngx_http_upstream_keepalive_close(ngx_connection_t *c);
65:
66:
67: #if (NGX_HTTP_SSL)
68: static ngx_int_t ngx_http_upstream_keepalive_set_session(
69: ngx_peer_connection_t *pc, void *data);
70: static void ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc,
71: void *data);
72: #endif
73:
74: static void *ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf);
75: static char *ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd,
76: void *conf);
77:
78:
79: static ngx_command_t ngx_http_upstream_keepalive_commands[] = {
80:
81: { ngx_string("keepalive"),
82: NGX_HTTP_UPS_CONF|NGX_CONF_TAKE12,
83: ngx_http_upstream_keepalive,
84: 0,
85: 0,
86: NULL },
87:
88: ngx_null_command
89: };
90:
91:
92: static ngx_http_module_t ngx_http_upstream_keepalive_module_ctx = {
93: NULL, /* preconfiguration */
94: NULL, /* postconfiguration */
95:
96: NULL, /* create main configuration */
97: NULL, /* init main configuration */
98:
99: ngx_http_upstream_keepalive_create_conf, /* create server configuration */
100: NULL, /* merge server configuration */
101:
102: NULL, /* create location configuration */
103: NULL /* merge location configuration */
104: };
105:
106:
107: ngx_module_t ngx_http_upstream_keepalive_module = {
108: NGX_MODULE_V1,
109: &ngx_http_upstream_keepalive_module_ctx, /* module context */
110: ngx_http_upstream_keepalive_commands, /* module directives */
111: NGX_HTTP_MODULE, /* module type */
112: NULL, /* init master */
113: NULL, /* init module */
114: NULL, /* init process */
115: NULL, /* init thread */
116: NULL, /* exit thread */
117: NULL, /* exit process */
118: NULL, /* exit master */
119: NGX_MODULE_V1_PADDING
120: };
121:
122:
123: static ngx_int_t
124: ngx_http_upstream_init_keepalive(ngx_conf_t *cf,
125: ngx_http_upstream_srv_conf_t *us)
126: {
127: ngx_uint_t i;
128: ngx_http_upstream_keepalive_srv_conf_t *kcf;
129: ngx_http_upstream_keepalive_cache_t *cached;
130:
131: ngx_log_debug0(NGX_LOG_DEBUG_HTTP, cf->log, 0,
132: "init keepalive");
133:
134: kcf = ngx_http_conf_upstream_srv_conf(us,
135: ngx_http_upstream_keepalive_module);
136:
137: if (kcf->original_init_upstream(cf, us) != NGX_OK) {
138: return NGX_ERROR;
139: }
140:
141: kcf->original_init_peer = us->peer.init;
142:
143: us->peer.init = ngx_http_upstream_init_keepalive_peer;
144:
145: /* allocate cache items and add to free queue */
146:
147: cached = ngx_pcalloc(cf->pool,
148: sizeof(ngx_http_upstream_keepalive_cache_t) * kcf->max_cached);
149: if (cached == NULL) {
150: return NGX_ERROR;
151: }
152:
153: ngx_queue_init(&kcf->cache);
154: ngx_queue_init(&kcf->free);
155:
156: for (i = 0; i < kcf->max_cached; i++) {
157: ngx_queue_insert_head(&kcf->free, &cached[i].queue);
158: cached[i].conf = kcf;
159: }
160:
161: return NGX_OK;
162: }
163:
164:
165: static ngx_int_t
166: ngx_http_upstream_init_keepalive_peer(ngx_http_request_t *r,
167: ngx_http_upstream_srv_conf_t *us)
168: {
169: ngx_http_upstream_keepalive_peer_data_t *kp;
170: ngx_http_upstream_keepalive_srv_conf_t *kcf;
171:
172: ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
173: "init keepalive peer");
174:
175: kcf = ngx_http_conf_upstream_srv_conf(us,
176: ngx_http_upstream_keepalive_module);
177:
178: kp = ngx_palloc(r->pool, sizeof(ngx_http_upstream_keepalive_peer_data_t));
179: if (kp == NULL) {
180: return NGX_ERROR;
181: }
182:
183: if (kcf->original_init_peer(r, us) != NGX_OK) {
184: return NGX_ERROR;
185: }
186:
187: kp->conf = kcf;
188: kp->upstream = r->upstream;
189: kp->data = r->upstream->peer.data;
190: kp->original_get_peer = r->upstream->peer.get;
191: kp->original_free_peer = r->upstream->peer.free;
192:
193: r->upstream->peer.data = kp;
194: r->upstream->peer.get = ngx_http_upstream_get_keepalive_peer;
195: r->upstream->peer.free = ngx_http_upstream_free_keepalive_peer;
196:
197: #if (NGX_HTTP_SSL)
198: kp->original_set_session = r->upstream->peer.set_session;
199: kp->original_save_session = r->upstream->peer.save_session;
200: r->upstream->peer.set_session = ngx_http_upstream_keepalive_set_session;
201: r->upstream->peer.save_session = ngx_http_upstream_keepalive_save_session;
202: #endif
203:
204: return NGX_OK;
205: }
206:
207:
208: static ngx_int_t
209: ngx_http_upstream_get_keepalive_peer(ngx_peer_connection_t *pc, void *data)
210: {
211: ngx_http_upstream_keepalive_peer_data_t *kp = data;
212: ngx_http_upstream_keepalive_cache_t *item;
213:
214: ngx_int_t rc;
215: ngx_queue_t *q, *cache;
216: ngx_connection_t *c;
217:
218: ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
219: "get keepalive peer");
220:
221: /* ask balancer */
222:
223: rc = kp->original_get_peer(pc, kp->data);
224:
225: if (rc != NGX_OK) {
226: return rc;
227: }
228:
229: /* search cache for suitable connection */
230:
231: cache = &kp->conf->cache;
232:
233: for (q = ngx_queue_head(cache);
234: q != ngx_queue_sentinel(cache);
235: q = ngx_queue_next(q))
236: {
237: item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
238: c = item->connection;
239:
240: if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr,
241: item->socklen, pc->socklen)
242: == 0)
243: {
244: ngx_queue_remove(q);
245: ngx_queue_insert_head(&kp->conf->free, q);
246:
247: ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
248: "get keepalive peer: using connection %p", c);
249:
250: c->idle = 0;
251: c->log = pc->log;
252: c->read->log = pc->log;
253: c->write->log = pc->log;
254: c->pool->log = pc->log;
255:
256: pc->connection = c;
257: pc->cached = 1;
258:
259: return NGX_DONE;
260: }
261: }
262:
263: return NGX_OK;
264: }
265:
266:
267: static void
268: ngx_http_upstream_free_keepalive_peer(ngx_peer_connection_t *pc, void *data,
269: ngx_uint_t state)
270: {
271: ngx_http_upstream_keepalive_peer_data_t *kp = data;
272: ngx_http_upstream_keepalive_cache_t *item;
273:
274: ngx_queue_t *q;
275: ngx_connection_t *c;
276: ngx_http_upstream_t *u;
277:
278: ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
279: "free keepalive peer");
280:
281: /* cache valid connections */
282:
283: u = kp->upstream;
284: c = pc->connection;
285:
286: if (state & NGX_PEER_FAILED
287: || c == NULL
288: || c->read->eof
289: || c->read->error
290: || c->read->timedout
291: || c->write->error
292: || c->write->timedout)
293: {
294: goto invalid;
295: }
296:
297: if (!u->keepalive) {
298: goto invalid;
299: }
300:
301: if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
302: goto invalid;
303: }
304:
305: ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
306: "free keepalive peer: saving connection %p", c);
307:
308: if (ngx_queue_empty(&kp->conf->free)) {
309:
310: q = ngx_queue_last(&kp->conf->cache);
311: ngx_queue_remove(q);
312:
313: item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
314:
315: ngx_http_upstream_keepalive_close(item->connection);
316:
317: } else {
318: q = ngx_queue_head(&kp->conf->free);
319: ngx_queue_remove(q);
320:
321: item = ngx_queue_data(q, ngx_http_upstream_keepalive_cache_t, queue);
322: }
323:
324: item->connection = c;
325: ngx_queue_insert_head(&kp->conf->cache, q);
326:
327: pc->connection = NULL;
328:
329: if (c->read->timer_set) {
330: ngx_del_timer(c->read);
331: }
332: if (c->write->timer_set) {
333: ngx_del_timer(c->write);
334: }
335:
336: c->write->handler = ngx_http_upstream_keepalive_dummy_handler;
337: c->read->handler = ngx_http_upstream_keepalive_close_handler;
338:
339: c->data = item;
340: c->idle = 1;
341: c->log = ngx_cycle->log;
342: c->read->log = ngx_cycle->log;
343: c->write->log = ngx_cycle->log;
344: c->pool->log = ngx_cycle->log;
345:
346: item->socklen = pc->socklen;
347: ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);
348:
349: if (c->read->ready) {
350: ngx_http_upstream_keepalive_close_handler(c->read);
351: }
352:
353: invalid:
354:
355: kp->original_free_peer(pc, kp->data, state);
356: }
357:
358:
359: static void
360: ngx_http_upstream_keepalive_dummy_handler(ngx_event_t *ev)
361: {
362: ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
363: "keepalive dummy handler");
364: }
365:
366:
367: static void
368: ngx_http_upstream_keepalive_close_handler(ngx_event_t *ev)
369: {
370: ngx_http_upstream_keepalive_srv_conf_t *conf;
371: ngx_http_upstream_keepalive_cache_t *item;
372:
373: int n;
374: char buf[1];
375: ngx_connection_t *c;
376:
377: ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ev->log, 0,
378: "keepalive close handler");
379:
380: c = ev->data;
381:
382: if (c->close) {
383: goto close;
384: }
385:
386: n = recv(c->fd, buf, 1, MSG_PEEK);
387:
388: if (n == -1 && ngx_socket_errno == NGX_EAGAIN) {
389: /* stale event */
390:
391: if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
392: goto close;
393: }
394:
395: return;
396: }
397:
398: close:
399:
400: item = c->data;
401: conf = item->conf;
402:
403: ngx_http_upstream_keepalive_close(c);
404:
405: ngx_queue_remove(&item->queue);
406: ngx_queue_insert_head(&conf->free, &item->queue);
407: }
408:
409:
410: static void
411: ngx_http_upstream_keepalive_close(ngx_connection_t *c)
412: {
413:
414: #if (NGX_HTTP_SSL)
415:
416: if (c->ssl) {
417: c->ssl->no_wait_shutdown = 1;
418: c->ssl->no_send_shutdown = 1;
419:
420: if (ngx_ssl_shutdown(c) == NGX_AGAIN) {
421: c->ssl->handler = ngx_http_upstream_keepalive_close;
422: return;
423: }
424: }
425:
426: #endif
427:
428: ngx_destroy_pool(c->pool);
429: ngx_close_connection(c);
430: }
431:
432:
433: #if (NGX_HTTP_SSL)
434:
435: static ngx_int_t
436: ngx_http_upstream_keepalive_set_session(ngx_peer_connection_t *pc, void *data)
437: {
438: ngx_http_upstream_keepalive_peer_data_t *kp = data;
439:
440: return kp->original_set_session(pc, kp->data);
441: }
442:
443:
444: static void
445: ngx_http_upstream_keepalive_save_session(ngx_peer_connection_t *pc, void *data)
446: {
447: ngx_http_upstream_keepalive_peer_data_t *kp = data;
448:
449: kp->original_save_session(pc, kp->data);
450: return;
451: }
452:
453: #endif
454:
455:
456: static void *
457: ngx_http_upstream_keepalive_create_conf(ngx_conf_t *cf)
458: {
459: ngx_http_upstream_keepalive_srv_conf_t *conf;
460:
461: conf = ngx_pcalloc(cf->pool,
462: sizeof(ngx_http_upstream_keepalive_srv_conf_t));
463: if (conf == NULL) {
464: return NULL;
465: }
466:
467: /*
468: * set by ngx_pcalloc():
469: *
470: * conf->original_init_upstream = NULL;
471: * conf->original_init_peer = NULL;
472: */
473:
474: conf->max_cached = 1;
475:
476: return conf;
477: }
478:
479:
480: static char *
481: ngx_http_upstream_keepalive(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
482: {
483: ngx_http_upstream_srv_conf_t *uscf;
484: ngx_http_upstream_keepalive_srv_conf_t *kcf;
485:
486: ngx_int_t n;
487: ngx_str_t *value;
488: ngx_uint_t i;
489:
490: uscf = ngx_http_conf_get_module_srv_conf(cf, ngx_http_upstream_module);
491:
492: kcf = ngx_http_conf_upstream_srv_conf(uscf,
493: ngx_http_upstream_keepalive_module);
494:
495: if (kcf->original_init_upstream) {
496: return "is duplicate";
497: }
498:
499: kcf->original_init_upstream = uscf->peer.init_upstream
500: ? uscf->peer.init_upstream
501: : ngx_http_upstream_init_round_robin;
502:
503: uscf->peer.init_upstream = ngx_http_upstream_init_keepalive;
504:
505: /* read options */
506:
507: value = cf->args->elts;
508:
509: n = ngx_atoi(value[1].data, value[1].len);
510:
511: if (n == NGX_ERROR || n == 0) {
512: ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
513: "invalid value \"%V\" in \"%V\" directive",
514: &value[1], &cmd->name);
515: return NGX_CONF_ERROR;
516: }
517:
518: kcf->max_cached = n;
519:
520: for (i = 2; i < cf->args->nelts; i++) {
521:
522: if (ngx_strcmp(value[i].data, "single") == 0) {
523: ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
524: "the \"single\" parameter is deprecated");
525: continue;
526: }
527:
528: goto invalid;
529: }
530:
531: return NGX_CONF_OK;
532:
533: invalid:
534:
535: ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
536: "invalid parameter \"%V\"", &value[i]);
537:
538: return NGX_CONF_ERROR;
539: }
FreeBSD-CVSweb <freebsd-cvsweb@FreeBSD.org>