Apply fast-fail at connect time.
[pgbouncer.git] / src / objects.c
bloba391d5e382efd77482884aff1a2f7bd0ce84c6e8
1 /*
2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
3 *
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
5 *
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Herding objects between lists happens here.
23 #include "bouncer.h"
25 /* those items will be allocated as needed, never freed */
26 STATLIST(user_list);
27 STATLIST(database_list);
28 STATLIST(pool_list);
30 Tree user_tree;
33 * client and server objects will be pre-allocated
34 * they are always in either active or free lists
35 * in addition to others.
37 STATLIST(login_client_list);
39 ObjectCache *server_cache;
40 ObjectCache *client_cache;
41 ObjectCache *db_cache;
42 ObjectCache *pool_cache;
43 ObjectCache *user_cache;
44 ObjectCache *iobuf_cache;
47 * libevent may still report events when event_del()
48 * is called from somewhere else. So hide just freed
49 * PgSockets for one loop.
51 static STATLIST(justfree_client_list);
52 static STATLIST(justfree_server_list);
54 /* init autodb idle list */
55 STATLIST(autodatabase_idle_list);
57 /* fast way to get number of active clients */
58 int get_active_client_count(void)
60 return objcache_active_count(client_cache);
63 /* fast way to get number of active servers */
64 int get_active_server_count(void)
66 return objcache_active_count(server_cache);
69 static void construct_client(void *obj)
71 PgSocket *client = obj;
73 memset(client, 0, sizeof(PgSocket));
74 list_init(&client->head);
75 sbuf_init(&client->sbuf, client_proto);
76 client->state = CL_FREE;
79 static void construct_server(void *obj)
81 PgSocket *server = obj;
83 memset(server, 0, sizeof(PgSocket));
84 list_init(&server->head);
85 sbuf_init(&server->sbuf, server_proto);
86 server->state = SV_FREE;
89 /* compare string with PgUser->name, for usage with btree */
90 static int user_node_cmp(long userptr, Node *node)
92 const char *name = (const char *)userptr;
93 PgUser *user = container_of(node, PgUser, tree_node);
94 return strcmp(name, user->name);
97 /* initialization before config loading */
98 void init_objects(void)
100 tree_init(&user_tree, user_node_cmp, NULL);
101 user_cache = objcache_create("user_cache", sizeof(PgUser), 0, NULL);
102 db_cache = objcache_create("db_cache", sizeof(PgDatabase), 0, NULL);
103 pool_cache = objcache_create("pool_cache", sizeof(PgPool), 0, NULL);
105 if (!user_cache || !db_cache || !pool_cache)
106 fatal("cannot create initial caches");
109 static void do_iobuf_reset(void *arg)
111 IOBuf *io = arg;
112 iobuf_reset(io);
115 /* initialization after config loading */
116 void init_caches(void)
118 server_cache = objcache_create("server_cache", sizeof(PgSocket), 0, construct_server);
119 client_cache = objcache_create("client_cache", sizeof(PgSocket), 0, construct_client);
120 iobuf_cache = objcache_create("iobuf_cache", IOBUF_SIZE, 0, do_iobuf_reset);
123 /* state change means moving between lists */
124 void change_client_state(PgSocket *client, SocketState newstate)
126 PgPool *pool = client->pool;
128 /* remove from old location */
129 switch (client->state) {
130 case CL_FREE:
131 break;
132 case CL_JUSTFREE:
133 statlist_remove(&client->head, &justfree_client_list);
134 break;
135 case CL_LOGIN:
136 statlist_remove(&client->head, &login_client_list);
137 break;
138 case CL_WAITING:
139 statlist_remove(&client->head, &pool->waiting_client_list);
140 break;
141 case CL_ACTIVE:
142 statlist_remove(&client->head, &pool->active_client_list);
143 break;
144 case CL_CANCEL:
145 statlist_remove(&client->head, &pool->cancel_req_list);
146 break;
147 default:
148 fatal("bad cur client state: %d", client->state);
151 client->state = newstate;
153 /* put to new location */
154 switch (client->state) {
155 case CL_FREE:
156 obj_free(client_cache, client);
157 break;
158 case CL_JUSTFREE:
159 statlist_append(&client->head, &justfree_client_list);
160 break;
161 case CL_LOGIN:
162 statlist_append(&client->head, &login_client_list);
163 break;
164 case CL_WAITING:
165 statlist_append(&client->head, &pool->waiting_client_list);
166 break;
167 case CL_ACTIVE:
168 statlist_append(&client->head, &pool->active_client_list);
169 break;
170 case CL_CANCEL:
171 statlist_append(&client->head, &pool->cancel_req_list);
172 break;
173 default:
174 fatal("bad new client state: %d", client->state);
178 /* state change means moving between lists */
179 void change_server_state(PgSocket *server, SocketState newstate)
181 PgPool *pool = server->pool;
183 /* remove from old location */
184 switch (server->state) {
185 case SV_FREE:
186 break;
187 case SV_JUSTFREE:
188 statlist_remove(&server->head, &justfree_server_list);
189 break;
190 case SV_LOGIN:
191 statlist_remove(&server->head, &pool->new_server_list);
192 break;
193 case SV_USED:
194 statlist_remove(&server->head, &pool->used_server_list);
195 break;
196 case SV_TESTED:
197 statlist_remove(&server->head, &pool->tested_server_list);
198 break;
199 case SV_IDLE:
200 statlist_remove(&server->head, &pool->idle_server_list);
201 break;
202 case SV_ACTIVE:
203 statlist_remove(&server->head, &pool->active_server_list);
204 break;
205 default:
206 fatal("change_server_state: bad old server state: %d", server->state);
209 server->state = newstate;
211 /* put to new location */
212 switch (server->state) {
213 case SV_FREE:
214 obj_free(server_cache, server);
215 break;
216 case SV_JUSTFREE:
217 statlist_append(&server->head, &justfree_server_list);
218 break;
219 case SV_LOGIN:
220 statlist_append(&server->head, &pool->new_server_list);
221 break;
222 case SV_USED:
223 /* use LIFO */
224 statlist_prepend(&server->head, &pool->used_server_list);
225 break;
226 case SV_TESTED:
227 statlist_append(&server->head, &pool->tested_server_list);
228 break;
229 case SV_IDLE:
230 if (server->close_needed || cf_server_round_robin)
231 /* try to avoid immediate usage then */
232 statlist_append(&server->head, &pool->idle_server_list);
233 else
234 /* otherwise use LIFO */
235 statlist_prepend(&server->head, &pool->idle_server_list);
236 break;
237 case SV_ACTIVE:
238 statlist_append(&server->head, &pool->active_server_list);
239 break;
240 default:
241 fatal("bad server state");
245 /* compare pool names, for use with put_in_order */
246 static int cmp_pool(List *i1, List *i2)
248 PgPool *p1 = container_of(i1, PgPool, head);
249 PgPool *p2 = container_of(i2, PgPool, head);
250 if (p1->db != p2->db)
251 return strcmp(p1->db->name, p2->db->name);
252 if (p1->user != p2->user)
253 return strcmp(p1->user->name, p2->user->name);
254 return 0;
257 /* compare user names, for use with put_in_order */
258 static int cmp_user(List *i1, List *i2)
260 PgUser *u1 = container_of(i1, PgUser, head);
261 PgUser *u2 = container_of(i2, PgUser, head);
262 return strcmp(u1->name, u2->name);
265 /* compare db names, for use with put_in_order */
266 static int cmp_database(List *i1, List *i2)
268 PgDatabase *db1 = container_of(i1, PgDatabase, head);
269 PgDatabase *db2 = container_of(i2, PgDatabase, head);
270 return strcmp(db1->name, db2->name);
273 /* put elem into list in correct pos */
274 static void put_in_order(List *newitem, StatList *list, int (*cmpfn)(List *, List *))
276 int res;
277 List *item;
279 statlist_for_each(item, list) {
280 res = cmpfn(item, newitem);
281 if (res == 0)
282 fatal("put_in_order: found existing elem");
283 else if (res > 0) {
284 statlist_put_before(newitem, list, item);
285 return;
288 statlist_append(newitem, list);
291 /* create new object if new, then return it */
292 PgDatabase *add_database(const char *name)
294 PgDatabase *db = find_database(name);
296 /* create new object if needed */
297 if (db == NULL) {
298 db = obj_alloc(db_cache);
299 if (!db)
300 return NULL;
302 list_init(&db->head);
303 safe_strcpy(db->name, name, sizeof(db->name));
304 put_in_order(&db->head, &database_list, cmp_database);
307 return db;
310 /* register new auto database */
311 PgDatabase *register_auto_database(const char *name)
313 PgDatabase *db;
314 int len;
315 char *cs;
317 if (!cf_autodb_connstr)
318 return NULL;
320 len = strlen(cf_autodb_connstr);
321 cs = malloc(len + 1);
322 if (!cs)
323 return NULL;
324 memcpy(cs, cf_autodb_connstr, len + 1);
325 parse_database((char*)name, cs);
326 free(cs);
328 db = find_database(name);
329 if (db) {
330 db->db_auto = 1;
331 /* do not forget to check pool_size like in config_postprocess */
332 if (db->pool_size < 0)
333 db->pool_size = cf_default_pool_size;
334 if (db->res_pool_size < 0)
335 db->res_pool_size = cf_res_pool_size;
338 return db;
341 /* add or update client users */
342 PgUser *add_user(const char *name, const char *passwd)
344 PgUser *user = find_user(name);
346 if (user == NULL) {
347 user = obj_alloc(user_cache);
348 if (!user)
349 return NULL;
351 list_init(&user->head);
352 list_init(&user->pool_list);
353 safe_strcpy(user->name, name, sizeof(user->name));
354 put_in_order(&user->head, &user_list, cmp_user);
356 tree_insert(&user_tree, (long)user->name, &user->tree_node);
358 safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
359 return user;
362 /* create separate user object for storing server user info */
363 PgUser *force_user(PgDatabase *db, const char *name, const char *passwd)
365 PgUser *user = db->forced_user;
366 if (!user) {
367 user = obj_alloc(user_cache);
368 if (!user)
369 return NULL;
370 list_init(&user->head);
371 list_init(&user->pool_list);
373 safe_strcpy(user->name, name, sizeof(user->name));
374 safe_strcpy(user->passwd, passwd, sizeof(user->passwd));
375 db->forced_user = user;
376 return user;
379 /* find an existing database */
380 PgDatabase *find_database(const char *name)
382 List *item, *tmp;
383 PgDatabase *db;
384 statlist_for_each(item, &database_list) {
385 db = container_of(item, PgDatabase, head);
386 if (strcmp(db->name, name) == 0)
387 return db;
389 /* also trying to find in idle autodatabases list */
390 statlist_for_each_safe(item, &autodatabase_idle_list, tmp) {
391 db = container_of(item, PgDatabase, head);
392 if (strcmp(db->name, name) == 0) {
393 db->inactive_time = 0;
394 statlist_remove(&db->head, &autodatabase_idle_list);
395 put_in_order(&db->head, &database_list, cmp_database);
396 return db;
399 return NULL;
402 /* find existing user */
403 PgUser *find_user(const char *name)
405 PgUser *user = NULL;
406 Node *node;
408 node = tree_search(&user_tree, (long)name);
409 user = node ? container_of(node, PgUser, tree_node) : NULL;
410 return user;
413 /* create new pool object */
414 static PgPool *new_pool(PgDatabase *db, PgUser *user)
416 PgPool *pool;
418 pool = obj_alloc(pool_cache);
419 if (!pool)
420 return NULL;
422 list_init(&pool->head);
423 list_init(&pool->map_head);
425 pool->user = user;
426 pool->db = db;
428 statlist_init(&pool->active_client_list, "active_client_list");
429 statlist_init(&pool->waiting_client_list, "waiting_client_list");
430 statlist_init(&pool->active_server_list, "active_server_list");
431 statlist_init(&pool->idle_server_list, "idle_server_list");
432 statlist_init(&pool->tested_server_list, "tested_server_list");
433 statlist_init(&pool->used_server_list, "used_server_list");
434 statlist_init(&pool->new_server_list, "new_server_list");
435 statlist_init(&pool->cancel_req_list, "cancel_req_list");
437 list_append(&pool->map_head, &user->pool_list);
439 /* keep pools in db/user order to make stats faster */
440 put_in_order(&pool->head, &pool_list, cmp_pool);
442 return pool;
445 /* find pool object, create if needed */
446 PgPool *get_pool(PgDatabase *db, PgUser *user)
448 List *item;
449 PgPool *pool;
451 if (!db || !user)
452 return NULL;
454 list_for_each(item, &user->pool_list) {
455 pool = container_of(item, PgPool, map_head);
456 if (pool->db == db)
457 return pool;
460 return new_pool(db, user);
463 /* deactivate socket and put into wait queue */
464 static void pause_client(PgSocket *client)
466 Assert(client->state == CL_ACTIVE);
468 slog_debug(client, "pause_client");
469 change_client_state(client, CL_WAITING);
470 if (!sbuf_pause(&client->sbuf))
471 disconnect_client(client, true, "pause failed");
474 /* wake client from wait */
475 void activate_client(PgSocket *client)
477 Assert(client->state == CL_WAITING);
479 slog_debug(client, "activate_client");
480 change_client_state(client, CL_ACTIVE);
481 sbuf_continue(&client->sbuf);
485 * Don't let clients queue at all, if there is no working server connection.
487 * It must still allow following cases:
488 * - empty pool on startup
489 * - idle pool where all servers are removed
491 * Current assumptions:
492 * - old server connections will be dropped by query_timeout
493 * - new server connections fail due to server_connect_timeout, or other failure
495 * So here we drop client if all server connections have been dropped
496 * and new one's fail.
498 bool check_fast_fail(PgSocket *client)
500 int cnt;
501 PgPool *pool = client->pool;
503 /* reject if no servers and last connect failed */
504 if (!pool->last_connect_failed)
505 return true;
506 cnt = pool_server_count(pool) - statlist_count(&pool->new_server_list);
507 if (cnt)
508 return true;
509 disconnect_client(client, true, "no working server connection");
511 /* usual relaunch wont work, as there are no waiting clients */
512 launch_new_connection(pool);
514 return false;
517 /* link if found, otherwise put into wait queue */
518 bool find_server(PgSocket *client)
520 PgPool *pool = client->pool;
521 PgSocket *server;
522 bool res;
523 bool varchange = false;
525 Assert(client->state == CL_ACTIVE);
527 if (client->link)
528 return true;
530 /* try to get idle server, if allowed */
531 if (cf_pause_mode == P_PAUSE) {
532 server = NULL;
533 } else {
534 while (1) {
535 server = first_socket(&pool->idle_server_list);
536 if (!server)
537 break;
538 else if (server->close_needed)
539 disconnect_server(server, true, "obsolete connection");
540 else if (!server->ready)
541 disconnect_server(server, true, "idle server got dirty");
542 else
543 break;
546 if (!server && !check_fast_fail(client))
547 return false;
550 Assert(!server || server->state == SV_IDLE);
552 /* send var changes */
553 if (server) {
554 res = varcache_apply(server, client, &varchange);
555 if (!res) {
556 disconnect_server(server, true, "var change failed");
557 server = NULL;
561 /* link or send to waiters list */
562 if (server) {
563 client->link = server;
564 server->link = client;
565 change_server_state(server, SV_ACTIVE);
566 if (varchange) {
567 server->setting_vars = 1;
568 server->ready = 0;
569 res = false; /* don't process client data yet */
570 if (!sbuf_pause(&client->sbuf))
571 disconnect_client(client, true, "pause failed");
572 } else
573 res = true;
574 } else {
575 pause_client(client);
576 res = false;
578 return res;
581 /* pick waiting client */
582 static bool reuse_on_release(PgSocket *server)
584 bool res = true;
585 PgPool *pool = server->pool;
586 PgSocket *client = first_socket(&pool->waiting_client_list);
587 if (client) {
588 activate_client(client);
591 * As the activate_client() does full read loop,
592 * then it may happen that linked client close
593 * couses server close. Report it.
595 if (server->state == SV_FREE || server->state == SV_JUSTFREE)
596 res = false;
598 return res;
601 /* send reset query */
602 static bool reset_on_release(PgSocket *server)
604 bool res;
606 Assert(server->state == SV_TESTED);
608 slog_debug(server, "Resetting: %s", cf_server_reset_query);
609 SEND_generic(res, server, 'Q', "s", cf_server_reset_query);
610 if (!res)
611 disconnect_server(server, false, "reset query failed");
612 return res;
615 static bool life_over(PgSocket *server)
617 PgPool *pool = server->pool;
618 usec_t lifetime_kill_gap = 0;
619 usec_t now = get_cached_time();
620 usec_t age = now - server->connect_time;
621 usec_t last_kill = now - pool->last_lifetime_disconnect;
623 if (age < cf_server_lifetime)
624 return false;
626 if (pool->db->pool_size > 0)
627 lifetime_kill_gap = cf_server_lifetime / pool->db->pool_size;
629 if (last_kill >= lifetime_kill_gap)
630 return true;
632 return false;
635 /* connecting/active -> idle, unlink if needed */
636 bool release_server(PgSocket *server)
638 PgPool *pool = server->pool;
639 SocketState newstate = SV_IDLE;
641 Assert(server->ready);
643 /* remove from old list */
644 switch (server->state) {
645 case SV_ACTIVE:
646 server->link->link = NULL;
647 server->link = NULL;
649 if (*cf_server_reset_query)
650 /* notify reset is required */
651 newstate = SV_TESTED;
652 else if (cf_server_check_delay == 0 && *cf_server_check_query)
654 * deprecated: before reset_query, the check_delay = 0
655 * was used to get same effect. This if() can be removed
656 * after couple of releases.
658 newstate = SV_USED;
659 case SV_USED:
660 case SV_TESTED:
661 break;
662 case SV_LOGIN:
663 pool->last_connect_failed = 0;
664 break;
665 default:
666 fatal("bad server state in release_server (%d)", server->state);
669 /* enforce lifetime immidiately on release */
670 if (server->state != SV_LOGIN && life_over(server)) {
671 disconnect_server(server, true, "server_lifetime");
672 return false;
675 /* enforce close request */
676 if (server->close_needed) {
677 disconnect_server(server, true, "close_needed");
678 return false;
681 Assert(server->link == NULL);
682 slog_noise(server, "release_server: new state=%d", newstate);
683 change_server_state(server, newstate);
685 if (newstate == SV_IDLE)
686 /* immediately process waiters, to give fair chance */
687 return reuse_on_release(server);
688 else if (newstate == SV_TESTED)
689 return reset_on_release(server);
691 return true;
694 /* drop server connection */
695 void disconnect_server(PgSocket *server, bool notify, const char *reason, ...)
697 PgPool *pool = server->pool;
698 PgSocket *client = server->link;
699 static const uint8_t pkt_term[] = {'X', 0,0,0,4};
700 int send_term = 1;
701 usec_t now = get_cached_time();
702 char buf[128];
703 va_list ap;
705 va_start(ap, reason);
706 vsnprintf(buf, sizeof(buf), reason, ap);
707 va_end(ap);
708 reason = buf;
710 if (cf_log_disconnections)
711 slog_info(server, "closing because: %s (age=%llu)", reason,
712 (now - server->connect_time) / USEC);
714 switch (server->state) {
715 case SV_ACTIVE:
716 client = server->link;
717 if (client) {
718 client->link = NULL;
719 server->link = NULL;
720 disconnect_client(client, true, "%s", reason);
722 break;
723 case SV_TESTED:
724 case SV_USED:
725 case SV_IDLE:
726 break;
727 case SV_LOGIN:
729 * usually disconnect means problems in startup phase,
730 * except when sending cancel packet
732 if (!server->ready)
733 pool->last_connect_failed = 1;
734 else
735 send_term = 0;
736 break;
737 default:
738 fatal("disconnect_server: bad server state (%d)", server->state);
741 Assert(server->link == NULL);
743 /* notify server and close connection */
744 if (send_term && notify) {
745 if (!sbuf_answer(&server->sbuf, pkt_term, sizeof(pkt_term)))
746 /* ignore result */
747 notify = false;
750 change_server_state(server, SV_JUSTFREE);
751 if (!sbuf_close(&server->sbuf))
752 log_noise("sbuf_close failed, retry later");
755 /* drop client connection */
756 void disconnect_client(PgSocket *client, bool notify, const char *reason, ...)
758 char buf[128];
759 va_list ap;
760 usec_t now = get_cached_time();
762 va_start(ap, reason);
763 vsnprintf(buf, sizeof(buf), reason, ap);
764 va_end(ap);
765 reason = buf;
767 if (cf_log_disconnections)
768 slog_info(client, "closing because: %s (age=%llu)", reason,
769 (now - client->connect_time) / USEC);
771 switch (client->state) {
772 case CL_ACTIVE:
773 if (client->link) {
774 PgSocket *server = client->link;
775 /* ->ready may be set before all is sent */
776 if (server->ready && sbuf_is_empty(&server->sbuf)) {
777 /* retval does not matter here */
778 release_server(server);
779 } else {
780 server->link = NULL;
781 client->link = NULL;
782 disconnect_server(server, true, "unclean server");
785 case CL_LOGIN:
786 case CL_WAITING:
787 case CL_CANCEL:
788 break;
789 default:
790 fatal("bad client state in disconnect_client: %d", client->state);
793 /* send reason to client */
794 if (notify && reason && client->state != CL_CANCEL) {
796 * don't send Ready pkt here, or client won't notice
797 * closed connection
799 send_pooler_error(client, false, reason);
802 change_client_state(client, CL_JUSTFREE);
803 if (!sbuf_close(&client->sbuf))
804 log_noise("sbuf_close failed, retry later");
807 /* the pool needs new connection, if possible */
808 void launch_new_connection(PgPool *pool)
810 PgSocket *server;
811 int total;
812 const char *unix_dir = cf_unix_socket_dir;
813 bool res;
815 /* allow only small number of connection attempts at a time */
816 if (!statlist_empty(&pool->new_server_list)) {
817 log_debug("launch_new_connection: already progress");
818 return;
821 /* if server bounces, don't retry too fast */
822 if (pool->last_connect_failed) {
823 usec_t now = get_cached_time();
824 if (now - pool->last_connect_time < cf_server_login_retry) {
825 log_debug("launch_new_connection: last failed, wait");
826 return;
830 /* is it allowed to add servers? */
831 total = pool_server_count(pool);
832 if (total >= pool->db->pool_size && pool->welcome_msg_ready) {
833 /* should we use reserve pool? */
834 if (cf_res_pool_timeout && pool->db->res_pool_size) {
835 usec_t now = get_cached_time();
836 PgSocket *c = first_socket(&pool->waiting_client_list);
837 if (c && (now - c->request_time) >= cf_res_pool_timeout) {
838 if (total < pool->db->pool_size + pool->db->res_pool_size) {
839 log_debug("reserve_pool activated");
840 goto allow_new;
844 log_debug("launch_new_connection: pool full (%d >= %d)",
845 total, pool->db->pool_size);
846 return;
849 allow_new:
850 /* get free conn object */
851 server = obj_alloc(server_cache);
852 if (!server) {
853 log_debug("launch_new_connection: no memory");
854 return;
857 /* initialize it */
858 server->pool = pool;
859 server->auth_user = server->pool->user;
860 server->remote_addr = server->pool->db->addr;
861 server->connect_time = get_cached_time();
862 pool->last_connect_time = get_cached_time();
863 change_server_state(server, SV_LOGIN);
865 if (cf_log_connections)
866 slog_info(server, "new connection to server");
868 /* override socket location if requested */
869 if (server->pool->db->unix_socket_dir[0])
870 unix_dir = server->pool->db->unix_socket_dir;
872 /* start connecting */
873 res = sbuf_connect(&server->sbuf, &server->remote_addr, unix_dir,
874 cf_server_connect_timeout / USEC);
875 if (!res)
876 log_noise("failed to launch new connection");
879 /* new client connection attempt */
880 PgSocket * accept_client(int sock,
881 const struct sockaddr_in *addr,
882 bool is_unix)
884 bool res;
885 PgSocket *client;
887 /* get free PgSocket */
888 client = obj_alloc(client_cache);
889 if (!client) {
890 log_warning("cannot allocate client struct");
891 safe_close(sock);
892 return NULL;
895 client->connect_time = client->request_time = get_cached_time();
896 client->query_start = 0;
898 fill_remote_addr(client, sock, is_unix);
899 fill_local_addr(client, sock, is_unix);
901 change_client_state(client, CL_LOGIN);
903 res = sbuf_accept(&client->sbuf, sock, is_unix);
904 if (!res) {
905 if (cf_log_connections)
906 slog_debug(client, "failed connection attempt");
907 return NULL;
910 return client;
913 /* send cached parameters to client to pretend being server */
914 /* client managed to authenticate, send welcome msg and accept queries */
915 bool finish_client_login(PgSocket *client)
917 switch (client->state) {
918 case CL_LOGIN:
919 change_client_state(client, CL_ACTIVE);
920 case CL_ACTIVE:
921 break;
922 default:
923 fatal("bad client state");
926 if (!welcome_client(client)) {
927 log_debug("finish_client_login: no welcome message, pause");
928 client->wait_for_welcome = 1;
929 pause_client(client);
930 if (cf_pause_mode == P_NONE)
931 launch_new_connection(client->pool);
932 return false;
934 client->wait_for_welcome = 0;
936 slog_debug(client, "logged in");
938 return true;
941 /* client->cancel_key has requested client key */
942 void accept_cancel_request(PgSocket *req)
944 List *pitem, *citem;
945 PgPool *pool;
946 PgSocket *server = NULL, *client, *main_client = NULL;
948 Assert(req->state == CL_LOGIN);
950 /* find real client this is for */
951 statlist_for_each(pitem, &pool_list) {
952 pool = container_of(pitem, PgPool, head);
953 statlist_for_each(citem, &pool->active_client_list) {
954 client = container_of(citem, PgSocket, head);
955 if (memcmp(client->cancel_key, req->cancel_key, 8) == 0) {
956 main_client = client;
957 goto found;
961 found:
963 /* wrong key */
964 if (!main_client) {
965 disconnect_client(req, false, "failed cancel request");
966 return;
969 /* not linked client, just drop it then */
970 if (!main_client->link) {
971 bool res;
973 /* let administrative cancel be handled elsewhere */
974 if (main_client->pool->db->admin) {
975 disconnect_client(req, false, "cancel request for console client");
976 admin_handle_cancel(main_client);
977 return;
980 disconnect_client(req, false, "cancel request for idle client");
982 /* notify readiness */
983 SEND_ReadyForQuery(res, main_client);
984 if (!res)
985 disconnect_client(main_client, true, "ReadyForQuery for main_client failed");
986 return;
989 /* drop the connection, if fails, retry later in justfree list */
990 if (!sbuf_close(&req->sbuf))
991 log_noise("sbuf_close failed, retry later");
993 /* remember server key */
994 server = main_client->link;
995 memcpy(req->cancel_key, server->cancel_key, 8);
997 /* attach to target pool */
998 req->pool = pool;
999 change_client_state(req, CL_CANCEL);
1001 /* need fresh connection */
1002 launch_new_connection(pool);
1005 void forward_cancel_request(PgSocket *server)
1007 bool res;
1008 PgSocket *req = first_socket(&server->pool->cancel_req_list);
1010 Assert(req != NULL && req->state == CL_CANCEL);
1011 Assert(server->state == SV_LOGIN);
1013 SEND_CancelRequest(res, server, req->cancel_key);
1015 change_client_state(req, CL_JUSTFREE);
1018 bool use_client_socket(int fd, PgAddr *addr,
1019 const char *dbname, const char *username,
1020 uint64_t ckey, int oldfd, int linkfd,
1021 const char *client_enc, const char *std_string,
1022 const char *datestyle, const char *timezone)
1024 PgSocket *client;
1025 PktBuf tmp;
1027 client = accept_client(fd, NULL, addr->is_unix);
1028 if (client == NULL)
1029 return false;
1030 client->suspended = 1;
1032 if (!set_pool(client, dbname, username))
1033 return false;
1035 change_client_state(client, CL_ACTIVE);
1037 /* store old cancel key */
1038 pktbuf_static(&tmp, client->cancel_key, 8);
1039 pktbuf_put_uint64(&tmp, ckey);
1041 /* store old fds */
1042 client->tmp_sk_oldfd = oldfd;
1043 client->tmp_sk_linkfd = linkfd;
1045 varcache_set(&client->vars, "client_encoding", client_enc);
1046 varcache_set(&client->vars, "standard_conforming_strings", std_string);
1047 varcache_set(&client->vars, "datestyle", datestyle);
1048 varcache_set(&client->vars, "timezone", timezone);
1050 return true;
1053 bool use_server_socket(int fd, PgAddr *addr,
1054 const char *dbname, const char *username,
1055 uint64_t ckey, int oldfd, int linkfd,
1056 const char *client_enc, const char *std_string,
1057 const char *datestyle, const char *timezone)
1059 PgDatabase *db = find_database(dbname);
1060 PgUser *user;
1061 PgPool *pool;
1062 PgSocket *server;
1063 PktBuf tmp;
1064 bool res;
1066 /* if the database not found, it's an auto database -> registering... */
1067 if (!db) {
1068 db = register_auto_database(dbname);
1069 if (!db)
1070 return true;
1073 if (db->forced_user)
1074 user = db->forced_user;
1075 else
1076 user = find_user(username);
1078 pool = get_pool(db, user);
1079 if (!pool)
1080 return false;
1082 server = obj_alloc(server_cache);
1083 if (!server)
1084 return false;
1086 res = sbuf_accept(&server->sbuf, fd, addr->is_unix);
1087 if (!res)
1088 return false;
1090 server->suspended = 1;
1091 server->pool = pool;
1092 server->auth_user = user;
1093 server->connect_time = server->request_time = get_cached_time();
1094 server->query_start = 0;
1096 fill_remote_addr(server, fd, addr->is_unix);
1097 fill_local_addr(server, fd, addr->is_unix);
1099 if (linkfd) {
1100 server->ready = 0;
1101 change_server_state(server, SV_ACTIVE);
1102 } else {
1103 server->ready = 1;
1104 change_server_state(server, SV_IDLE);
1107 /* store old cancel key */
1108 pktbuf_static(&tmp, server->cancel_key, 8);
1109 pktbuf_put_uint64(&tmp, ckey);
1111 /* store old fds */
1112 server->tmp_sk_oldfd = oldfd;
1113 server->tmp_sk_linkfd = linkfd;
1115 varcache_set(&server->vars, "client_encoding", client_enc);
1116 varcache_set(&server->vars, "standard_conforming_strings", std_string);
1117 varcache_set(&server->vars, "datestyle", datestyle);
1118 varcache_set(&server->vars, "timezone", timezone);
1120 return true;
1123 void for_each_server(PgPool *pool, void (*func)(PgSocket *sk))
1125 List *item;
1127 statlist_for_each(item, &pool->idle_server_list)
1128 func(container_of(item, PgSocket, head));
1130 statlist_for_each(item, &pool->used_server_list)
1131 func(container_of(item, PgSocket, head));
1133 statlist_for_each(item, &pool->tested_server_list)
1134 func(container_of(item, PgSocket, head));
1136 statlist_for_each(item, &pool->active_server_list)
1137 func(container_of(item, PgSocket, head));
1139 statlist_for_each(item, &pool->new_server_list)
1140 func(container_of(item, PgSocket, head));
1143 static void tag_dirty(PgSocket *sk)
1145 sk->close_needed = 1;
1148 void tag_database_dirty(PgDatabase *db)
1150 List *item;
1151 PgPool *pool;
1153 statlist_for_each(item, &pool_list) {
1154 pool = container_of(item, PgPool, head);
1155 if (pool->db == db)
1156 for_each_server(pool, tag_dirty);
1160 /* move objects from justfree_* to free_* lists */
1161 void reuse_just_freed_objects(void)
1163 List *tmp, *item;
1164 PgSocket *sk;
1165 bool close_works = true;
1168 * event_del() may fail because of ENOMEM for event handlers
1169 * that need only changes sent to kernel on each loop.
1171 * Keep open sbufs in justfree lists until successful.
1174 statlist_for_each_safe(item, &justfree_client_list, tmp) {
1175 sk = container_of(item, PgSocket, head);
1176 if (sbuf_is_closed(&sk->sbuf))
1177 change_client_state(sk, CL_FREE);
1178 else if (close_works)
1179 close_works = sbuf_close(&sk->sbuf);
1181 statlist_for_each_safe(item, &justfree_server_list, tmp) {
1182 sk = container_of(item, PgSocket, head);
1183 if (sbuf_is_closed(&sk->sbuf))
1184 change_server_state(sk, SV_FREE);
1185 else if (close_works)
1186 close_works = sbuf_close(&sk->sbuf);