2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Herding objects between lists happens here.
25 /* those items will be allocated as needed, never freed */
27 STATLIST(database_list
);
33 * client and server objects will be pre-allocated
34 * they are always in either active or free lists
35 * in addition to others.
37 STATLIST(login_client_list
);
39 ObjectCache
*server_cache
;
40 ObjectCache
*client_cache
;
41 ObjectCache
*db_cache
;
42 ObjectCache
*pool_cache
;
43 ObjectCache
*user_cache
;
44 ObjectCache
*iobuf_cache
;
47 * libevent may still report events when event_del()
48 * is called from somewhere else. So hide just freed
49 * PgSockets for one loop.
51 static STATLIST(justfree_client_list
);
52 static STATLIST(justfree_server_list
);
54 /* init autodb idle list */
55 STATLIST(autodatabase_idle_list
);
57 /* fast way to get number of active clients */
58 int get_active_client_count(void)
60 return objcache_active_count(client_cache
);
63 /* fast way to get number of active servers */
64 int get_active_server_count(void)
66 return objcache_active_count(server_cache
);
69 static void construct_client(void *obj
)
71 PgSocket
*client
= obj
;
73 memset(client
, 0, sizeof(PgSocket
));
74 list_init(&client
->head
);
75 sbuf_init(&client
->sbuf
, client_proto
);
76 client
->state
= CL_FREE
;
79 static void construct_server(void *obj
)
81 PgSocket
*server
= obj
;
83 memset(server
, 0, sizeof(PgSocket
));
84 list_init(&server
->head
);
85 sbuf_init(&server
->sbuf
, server_proto
);
86 server
->state
= SV_FREE
;
89 /* compare string with PgUser->name, for usage with btree */
90 static int user_node_cmp(long userptr
, Node
*node
)
92 const char *name
= (const char *)userptr
;
93 PgUser
*user
= container_of(node
, PgUser
, tree_node
);
94 return strcmp(name
, user
->name
);
97 /* initialization before config loading */
98 void init_objects(void)
100 tree_init(&user_tree
, user_node_cmp
, NULL
);
101 user_cache
= objcache_create("user_cache", sizeof(PgUser
), 0, NULL
);
102 db_cache
= objcache_create("db_cache", sizeof(PgDatabase
), 0, NULL
);
103 pool_cache
= objcache_create("pool_cache", sizeof(PgPool
), 0, NULL
);
105 if (!user_cache
|| !db_cache
|| !pool_cache
)
106 fatal("cannot create initial caches");
109 static void do_iobuf_reset(void *arg
)
115 /* initialization after config loading */
116 void init_caches(void)
118 server_cache
= objcache_create("server_cache", sizeof(PgSocket
), 0, construct_server
);
119 client_cache
= objcache_create("client_cache", sizeof(PgSocket
), 0, construct_client
);
120 iobuf_cache
= objcache_create("iobuf_cache", IOBUF_SIZE
, 0, do_iobuf_reset
);
123 /* state change means moving between lists */
124 void change_client_state(PgSocket
*client
, SocketState newstate
)
126 PgPool
*pool
= client
->pool
;
128 /* remove from old location */
129 switch (client
->state
) {
133 statlist_remove(&client
->head
, &justfree_client_list
);
136 statlist_remove(&client
->head
, &login_client_list
);
139 statlist_remove(&client
->head
, &pool
->waiting_client_list
);
142 statlist_remove(&client
->head
, &pool
->active_client_list
);
145 statlist_remove(&client
->head
, &pool
->cancel_req_list
);
148 fatal("bad cur client state: %d", client
->state
);
151 client
->state
= newstate
;
153 /* put to new location */
154 switch (client
->state
) {
156 obj_free(client_cache
, client
);
159 statlist_append(&client
->head
, &justfree_client_list
);
162 statlist_append(&client
->head
, &login_client_list
);
165 statlist_append(&client
->head
, &pool
->waiting_client_list
);
168 statlist_append(&client
->head
, &pool
->active_client_list
);
171 statlist_append(&client
->head
, &pool
->cancel_req_list
);
174 fatal("bad new client state: %d", client
->state
);
178 /* state change means moving between lists */
179 void change_server_state(PgSocket
*server
, SocketState newstate
)
181 PgPool
*pool
= server
->pool
;
183 /* remove from old location */
184 switch (server
->state
) {
188 statlist_remove(&server
->head
, &justfree_server_list
);
191 statlist_remove(&server
->head
, &pool
->new_server_list
);
194 statlist_remove(&server
->head
, &pool
->used_server_list
);
197 statlist_remove(&server
->head
, &pool
->tested_server_list
);
200 statlist_remove(&server
->head
, &pool
->idle_server_list
);
203 statlist_remove(&server
->head
, &pool
->active_server_list
);
206 fatal("change_server_state: bad old server state: %d", server
->state
);
209 server
->state
= newstate
;
211 /* put to new location */
212 switch (server
->state
) {
214 obj_free(server_cache
, server
);
217 statlist_append(&server
->head
, &justfree_server_list
);
220 statlist_append(&server
->head
, &pool
->new_server_list
);
224 statlist_prepend(&server
->head
, &pool
->used_server_list
);
227 statlist_append(&server
->head
, &pool
->tested_server_list
);
230 if (server
->close_needed
|| cf_server_round_robin
)
231 /* try to avoid immediate usage then */
232 statlist_append(&server
->head
, &pool
->idle_server_list
);
234 /* otherwise use LIFO */
235 statlist_prepend(&server
->head
, &pool
->idle_server_list
);
238 statlist_append(&server
->head
, &pool
->active_server_list
);
241 fatal("bad server state");
245 /* compare pool names, for use with put_in_order */
246 static int cmp_pool(List
*i1
, List
*i2
)
248 PgPool
*p1
= container_of(i1
, PgPool
, head
);
249 PgPool
*p2
= container_of(i2
, PgPool
, head
);
250 if (p1
->db
!= p2
->db
)
251 return strcmp(p1
->db
->name
, p2
->db
->name
);
252 if (p1
->user
!= p2
->user
)
253 return strcmp(p1
->user
->name
, p2
->user
->name
);
257 /* compare user names, for use with put_in_order */
258 static int cmp_user(List
*i1
, List
*i2
)
260 PgUser
*u1
= container_of(i1
, PgUser
, head
);
261 PgUser
*u2
= container_of(i2
, PgUser
, head
);
262 return strcmp(u1
->name
, u2
->name
);
265 /* compare db names, for use with put_in_order */
266 static int cmp_database(List
*i1
, List
*i2
)
268 PgDatabase
*db1
= container_of(i1
, PgDatabase
, head
);
269 PgDatabase
*db2
= container_of(i2
, PgDatabase
, head
);
270 return strcmp(db1
->name
, db2
->name
);
273 /* put elem into list in correct pos */
274 static void put_in_order(List
*newitem
, StatList
*list
, int (*cmpfn
)(List
*, List
*))
279 statlist_for_each(item
, list
) {
280 res
= cmpfn(item
, newitem
);
282 fatal("put_in_order: found existing elem");
284 statlist_put_before(newitem
, list
, item
);
288 statlist_append(newitem
, list
);
291 /* create new object if new, then return it */
292 PgDatabase
*add_database(const char *name
)
294 PgDatabase
*db
= find_database(name
);
296 /* create new object if needed */
298 db
= obj_alloc(db_cache
);
302 list_init(&db
->head
);
303 safe_strcpy(db
->name
, name
, sizeof(db
->name
));
304 put_in_order(&db
->head
, &database_list
, cmp_database
);
310 /* register new auto database */
311 PgDatabase
*register_auto_database(const char *name
)
317 if (!cf_autodb_connstr
)
320 len
= strlen(cf_autodb_connstr
);
321 cs
= malloc(len
+ 1);
324 memcpy(cs
, cf_autodb_connstr
, len
+ 1);
325 parse_database((char*)name
, cs
);
328 db
= find_database(name
);
331 /* do not forget to check pool_size like in config_postprocess */
332 if (db
->pool_size
< 0)
333 db
->pool_size
= cf_default_pool_size
;
334 if (db
->res_pool_size
< 0)
335 db
->res_pool_size
= cf_res_pool_size
;
341 /* add or update client users */
342 PgUser
*add_user(const char *name
, const char *passwd
)
344 PgUser
*user
= find_user(name
);
347 user
= obj_alloc(user_cache
);
351 list_init(&user
->head
);
352 list_init(&user
->pool_list
);
353 safe_strcpy(user
->name
, name
, sizeof(user
->name
));
354 put_in_order(&user
->head
, &user_list
, cmp_user
);
356 tree_insert(&user_tree
, (long)user
->name
, &user
->tree_node
);
358 safe_strcpy(user
->passwd
, passwd
, sizeof(user
->passwd
));
362 /* create separate user object for storing server user info */
363 PgUser
*force_user(PgDatabase
*db
, const char *name
, const char *passwd
)
365 PgUser
*user
= db
->forced_user
;
367 user
= obj_alloc(user_cache
);
370 list_init(&user
->head
);
371 list_init(&user
->pool_list
);
373 safe_strcpy(user
->name
, name
, sizeof(user
->name
));
374 safe_strcpy(user
->passwd
, passwd
, sizeof(user
->passwd
));
375 db
->forced_user
= user
;
379 /* find an existing database */
380 PgDatabase
*find_database(const char *name
)
384 statlist_for_each(item
, &database_list
) {
385 db
= container_of(item
, PgDatabase
, head
);
386 if (strcmp(db
->name
, name
) == 0)
389 /* also trying to find in idle autodatabases list */
390 statlist_for_each_safe(item
, &autodatabase_idle_list
, tmp
) {
391 db
= container_of(item
, PgDatabase
, head
);
392 if (strcmp(db
->name
, name
) == 0) {
393 db
->inactive_time
= 0;
394 statlist_remove(&db
->head
, &autodatabase_idle_list
);
395 put_in_order(&db
->head
, &database_list
, cmp_database
);
402 /* find existing user */
403 PgUser
*find_user(const char *name
)
408 node
= tree_search(&user_tree
, (long)name
);
409 user
= node
? container_of(node
, PgUser
, tree_node
) : NULL
;
413 /* create new pool object */
414 static PgPool
*new_pool(PgDatabase
*db
, PgUser
*user
)
418 pool
= obj_alloc(pool_cache
);
422 list_init(&pool
->head
);
423 list_init(&pool
->map_head
);
428 statlist_init(&pool
->active_client_list
, "active_client_list");
429 statlist_init(&pool
->waiting_client_list
, "waiting_client_list");
430 statlist_init(&pool
->active_server_list
, "active_server_list");
431 statlist_init(&pool
->idle_server_list
, "idle_server_list");
432 statlist_init(&pool
->tested_server_list
, "tested_server_list");
433 statlist_init(&pool
->used_server_list
, "used_server_list");
434 statlist_init(&pool
->new_server_list
, "new_server_list");
435 statlist_init(&pool
->cancel_req_list
, "cancel_req_list");
437 list_append(&pool
->map_head
, &user
->pool_list
);
439 /* keep pools in db/user order to make stats faster */
440 put_in_order(&pool
->head
, &pool_list
, cmp_pool
);
445 /* find pool object, create if needed */
446 PgPool
*get_pool(PgDatabase
*db
, PgUser
*user
)
454 list_for_each(item
, &user
->pool_list
) {
455 pool
= container_of(item
, PgPool
, map_head
);
460 return new_pool(db
, user
);
463 /* deactivate socket and put into wait queue */
464 static void pause_client(PgSocket
*client
)
466 Assert(client
->state
== CL_ACTIVE
);
468 slog_debug(client
, "pause_client");
469 change_client_state(client
, CL_WAITING
);
470 if (!sbuf_pause(&client
->sbuf
))
471 disconnect_client(client
, true, "pause failed");
474 /* wake client from wait */
475 void activate_client(PgSocket
*client
)
477 Assert(client
->state
== CL_WAITING
);
479 slog_debug(client
, "activate_client");
480 change_client_state(client
, CL_ACTIVE
);
481 sbuf_continue(&client
->sbuf
);
485 * Don't let clients queue at all, if there is no working server connection.
487 * It must still allow following cases:
488 * - empty pool on startup
489 * - idle pool where all servers are removed
491 * Current assumptions:
492 * - old server connections will be dropped by query_timeout
493 * - new server connections fail due to server_connect_timeout, or other failure
495 * So here we drop client if all server connections have been dropped
496 * and new one's fail.
498 bool check_fast_fail(PgSocket
*client
)
501 PgPool
*pool
= client
->pool
;
503 /* reject if no servers and last connect failed */
504 if (!pool
->last_connect_failed
)
506 cnt
= pool_server_count(pool
) - statlist_count(&pool
->new_server_list
);
509 disconnect_client(client
, true, "no working server connection");
511 /* usual relaunch wont work, as there are no waiting clients */
512 launch_new_connection(pool
);
517 /* link if found, otherwise put into wait queue */
518 bool find_server(PgSocket
*client
)
520 PgPool
*pool
= client
->pool
;
523 bool varchange
= false;
525 Assert(client
->state
== CL_ACTIVE
);
530 /* try to get idle server, if allowed */
531 if (cf_pause_mode
== P_PAUSE
) {
535 server
= first_socket(&pool
->idle_server_list
);
538 else if (server
->close_needed
)
539 disconnect_server(server
, true, "obsolete connection");
540 else if (!server
->ready
)
541 disconnect_server(server
, true, "idle server got dirty");
546 if (!server
&& !check_fast_fail(client
))
550 Assert(!server
|| server
->state
== SV_IDLE
);
552 /* send var changes */
554 res
= varcache_apply(server
, client
, &varchange
);
556 disconnect_server(server
, true, "var change failed");
561 /* link or send to waiters list */
563 client
->link
= server
;
564 server
->link
= client
;
565 change_server_state(server
, SV_ACTIVE
);
567 server
->setting_vars
= 1;
569 res
= false; /* don't process client data yet */
570 if (!sbuf_pause(&client
->sbuf
))
571 disconnect_client(client
, true, "pause failed");
575 pause_client(client
);
581 /* pick waiting client */
582 static bool reuse_on_release(PgSocket
*server
)
585 PgPool
*pool
= server
->pool
;
586 PgSocket
*client
= first_socket(&pool
->waiting_client_list
);
588 activate_client(client
);
591 * As the activate_client() does full read loop,
592 * then it may happen that linked client close
593 * couses server close. Report it.
595 if (server
->state
== SV_FREE
|| server
->state
== SV_JUSTFREE
)
601 /* send reset query */
602 static bool reset_on_release(PgSocket
*server
)
606 Assert(server
->state
== SV_TESTED
);
608 slog_debug(server
, "Resetting: %s", cf_server_reset_query
);
609 SEND_generic(res
, server
, 'Q', "s", cf_server_reset_query
);
611 disconnect_server(server
, false, "reset query failed");
615 static bool life_over(PgSocket
*server
)
617 PgPool
*pool
= server
->pool
;
618 usec_t lifetime_kill_gap
= 0;
619 usec_t now
= get_cached_time();
620 usec_t age
= now
- server
->connect_time
;
621 usec_t last_kill
= now
- pool
->last_lifetime_disconnect
;
623 if (age
< cf_server_lifetime
)
626 if (pool
->db
->pool_size
> 0)
627 lifetime_kill_gap
= cf_server_lifetime
/ pool
->db
->pool_size
;
629 if (last_kill
>= lifetime_kill_gap
)
635 /* connecting/active -> idle, unlink if needed */
636 bool release_server(PgSocket
*server
)
638 PgPool
*pool
= server
->pool
;
639 SocketState newstate
= SV_IDLE
;
641 Assert(server
->ready
);
643 /* remove from old list */
644 switch (server
->state
) {
646 server
->link
->link
= NULL
;
649 if (*cf_server_reset_query
)
650 /* notify reset is required */
651 newstate
= SV_TESTED
;
652 else if (cf_server_check_delay
== 0 && *cf_server_check_query
)
654 * deprecated: before reset_query, the check_delay = 0
655 * was used to get same effect. This if() can be removed
656 * after couple of releases.
663 pool
->last_connect_failed
= 0;
666 fatal("bad server state in release_server (%d)", server
->state
);
669 /* enforce lifetime immidiately on release */
670 if (server
->state
!= SV_LOGIN
&& life_over(server
)) {
671 disconnect_server(server
, true, "server_lifetime");
675 /* enforce close request */
676 if (server
->close_needed
) {
677 disconnect_server(server
, true, "close_needed");
681 Assert(server
->link
== NULL
);
682 slog_noise(server
, "release_server: new state=%d", newstate
);
683 change_server_state(server
, newstate
);
685 if (newstate
== SV_IDLE
)
686 /* immediately process waiters, to give fair chance */
687 return reuse_on_release(server
);
688 else if (newstate
== SV_TESTED
)
689 return reset_on_release(server
);
694 /* drop server connection */
695 void disconnect_server(PgSocket
*server
, bool notify
, const char *reason
, ...)
697 PgPool
*pool
= server
->pool
;
698 PgSocket
*client
= server
->link
;
699 static const uint8_t pkt_term
[] = {'X', 0,0,0,4};
701 usec_t now
= get_cached_time();
705 va_start(ap
, reason
);
706 vsnprintf(buf
, sizeof(buf
), reason
, ap
);
710 if (cf_log_disconnections
)
711 slog_info(server
, "closing because: %s (age=%llu)", reason
,
712 (now
- server
->connect_time
) / USEC
);
714 switch (server
->state
) {
716 client
= server
->link
;
720 disconnect_client(client
, true, "%s", reason
);
729 * usually disconnect means problems in startup phase,
730 * except when sending cancel packet
733 pool
->last_connect_failed
= 1;
738 fatal("disconnect_server: bad server state (%d)", server
->state
);
741 Assert(server
->link
== NULL
);
743 /* notify server and close connection */
744 if (send_term
&& notify
) {
745 if (!sbuf_answer(&server
->sbuf
, pkt_term
, sizeof(pkt_term
)))
750 change_server_state(server
, SV_JUSTFREE
);
751 if (!sbuf_close(&server
->sbuf
))
752 log_noise("sbuf_close failed, retry later");
755 /* drop client connection */
756 void disconnect_client(PgSocket
*client
, bool notify
, const char *reason
, ...)
760 usec_t now
= get_cached_time();
762 va_start(ap
, reason
);
763 vsnprintf(buf
, sizeof(buf
), reason
, ap
);
767 if (cf_log_disconnections
)
768 slog_info(client
, "closing because: %s (age=%llu)", reason
,
769 (now
- client
->connect_time
) / USEC
);
771 switch (client
->state
) {
774 PgSocket
*server
= client
->link
;
775 /* ->ready may be set before all is sent */
776 if (server
->ready
&& sbuf_is_empty(&server
->sbuf
)) {
777 /* retval does not matter here */
778 release_server(server
);
782 disconnect_server(server
, true, "unclean server");
790 fatal("bad client state in disconnect_client: %d", client
->state
);
793 /* send reason to client */
794 if (notify
&& reason
&& client
->state
!= CL_CANCEL
) {
796 * don't send Ready pkt here, or client won't notice
799 send_pooler_error(client
, false, reason
);
802 change_client_state(client
, CL_JUSTFREE
);
803 if (!sbuf_close(&client
->sbuf
))
804 log_noise("sbuf_close failed, retry later");
807 /* the pool needs new connection, if possible */
808 void launch_new_connection(PgPool
*pool
)
812 const char *unix_dir
= cf_unix_socket_dir
;
815 /* allow only small number of connection attempts at a time */
816 if (!statlist_empty(&pool
->new_server_list
)) {
817 log_debug("launch_new_connection: already progress");
821 /* if server bounces, don't retry too fast */
822 if (pool
->last_connect_failed
) {
823 usec_t now
= get_cached_time();
824 if (now
- pool
->last_connect_time
< cf_server_login_retry
) {
825 log_debug("launch_new_connection: last failed, wait");
830 /* is it allowed to add servers? */
831 total
= pool_server_count(pool
);
832 if (total
>= pool
->db
->pool_size
&& pool
->welcome_msg_ready
) {
833 /* should we use reserve pool? */
834 if (cf_res_pool_timeout
&& pool
->db
->res_pool_size
) {
835 usec_t now
= get_cached_time();
836 PgSocket
*c
= first_socket(&pool
->waiting_client_list
);
837 if (c
&& (now
- c
->request_time
) >= cf_res_pool_timeout
) {
838 if (total
< pool
->db
->pool_size
+ pool
->db
->res_pool_size
) {
839 log_debug("reserve_pool activated");
844 log_debug("launch_new_connection: pool full (%d >= %d)",
845 total
, pool
->db
->pool_size
);
850 /* get free conn object */
851 server
= obj_alloc(server_cache
);
853 log_debug("launch_new_connection: no memory");
859 server
->auth_user
= server
->pool
->user
;
860 server
->remote_addr
= server
->pool
->db
->addr
;
861 server
->connect_time
= get_cached_time();
862 pool
->last_connect_time
= get_cached_time();
863 change_server_state(server
, SV_LOGIN
);
865 if (cf_log_connections
)
866 slog_info(server
, "new connection to server");
868 /* override socket location if requested */
869 if (server
->pool
->db
->unix_socket_dir
[0])
870 unix_dir
= server
->pool
->db
->unix_socket_dir
;
872 /* start connecting */
873 res
= sbuf_connect(&server
->sbuf
, &server
->remote_addr
, unix_dir
,
874 cf_server_connect_timeout
/ USEC
);
876 log_noise("failed to launch new connection");
879 /* new client connection attempt */
880 PgSocket
* accept_client(int sock
,
881 const struct sockaddr_in
*addr
,
887 /* get free PgSocket */
888 client
= obj_alloc(client_cache
);
890 log_warning("cannot allocate client struct");
895 client
->connect_time
= client
->request_time
= get_cached_time();
896 client
->query_start
= 0;
898 fill_remote_addr(client
, sock
, is_unix
);
899 fill_local_addr(client
, sock
, is_unix
);
901 change_client_state(client
, CL_LOGIN
);
903 res
= sbuf_accept(&client
->sbuf
, sock
, is_unix
);
905 if (cf_log_connections
)
906 slog_debug(client
, "failed connection attempt");
913 /* send cached parameters to client to pretend being server */
914 /* client managed to authenticate, send welcome msg and accept queries */
915 bool finish_client_login(PgSocket
*client
)
917 switch (client
->state
) {
919 change_client_state(client
, CL_ACTIVE
);
923 fatal("bad client state");
926 if (!welcome_client(client
)) {
927 log_debug("finish_client_login: no welcome message, pause");
928 client
->wait_for_welcome
= 1;
929 pause_client(client
);
930 if (cf_pause_mode
== P_NONE
)
931 launch_new_connection(client
->pool
);
934 client
->wait_for_welcome
= 0;
936 slog_debug(client
, "logged in");
941 /* client->cancel_key has requested client key */
942 void accept_cancel_request(PgSocket
*req
)
946 PgSocket
*server
= NULL
, *client
, *main_client
= NULL
;
948 Assert(req
->state
== CL_LOGIN
);
950 /* find real client this is for */
951 statlist_for_each(pitem
, &pool_list
) {
952 pool
= container_of(pitem
, PgPool
, head
);
953 statlist_for_each(citem
, &pool
->active_client_list
) {
954 client
= container_of(citem
, PgSocket
, head
);
955 if (memcmp(client
->cancel_key
, req
->cancel_key
, 8) == 0) {
956 main_client
= client
;
965 disconnect_client(req
, false, "failed cancel request");
969 /* not linked client, just drop it then */
970 if (!main_client
->link
) {
973 /* let administrative cancel be handled elsewhere */
974 if (main_client
->pool
->db
->admin
) {
975 disconnect_client(req
, false, "cancel request for console client");
976 admin_handle_cancel(main_client
);
980 disconnect_client(req
, false, "cancel request for idle client");
982 /* notify readiness */
983 SEND_ReadyForQuery(res
, main_client
);
985 disconnect_client(main_client
, true, "ReadyForQuery for main_client failed");
989 /* drop the connection, if fails, retry later in justfree list */
990 if (!sbuf_close(&req
->sbuf
))
991 log_noise("sbuf_close failed, retry later");
993 /* remember server key */
994 server
= main_client
->link
;
995 memcpy(req
->cancel_key
, server
->cancel_key
, 8);
997 /* attach to target pool */
999 change_client_state(req
, CL_CANCEL
);
1001 /* need fresh connection */
1002 launch_new_connection(pool
);
1005 void forward_cancel_request(PgSocket
*server
)
1008 PgSocket
*req
= first_socket(&server
->pool
->cancel_req_list
);
1010 Assert(req
!= NULL
&& req
->state
== CL_CANCEL
);
1011 Assert(server
->state
== SV_LOGIN
);
1013 SEND_CancelRequest(res
, server
, req
->cancel_key
);
1015 change_client_state(req
, CL_JUSTFREE
);
1018 bool use_client_socket(int fd
, PgAddr
*addr
,
1019 const char *dbname
, const char *username
,
1020 uint64_t ckey
, int oldfd
, int linkfd
,
1021 const char *client_enc
, const char *std_string
,
1022 const char *datestyle
, const char *timezone
)
1027 client
= accept_client(fd
, NULL
, addr
->is_unix
);
1030 client
->suspended
= 1;
1032 if (!set_pool(client
, dbname
, username
))
1035 change_client_state(client
, CL_ACTIVE
);
1037 /* store old cancel key */
1038 pktbuf_static(&tmp
, client
->cancel_key
, 8);
1039 pktbuf_put_uint64(&tmp
, ckey
);
1042 client
->tmp_sk_oldfd
= oldfd
;
1043 client
->tmp_sk_linkfd
= linkfd
;
1045 varcache_set(&client
->vars
, "client_encoding", client_enc
);
1046 varcache_set(&client
->vars
, "standard_conforming_strings", std_string
);
1047 varcache_set(&client
->vars
, "datestyle", datestyle
);
1048 varcache_set(&client
->vars
, "timezone", timezone
);
1053 bool use_server_socket(int fd
, PgAddr
*addr
,
1054 const char *dbname
, const char *username
,
1055 uint64_t ckey
, int oldfd
, int linkfd
,
1056 const char *client_enc
, const char *std_string
,
1057 const char *datestyle
, const char *timezone
)
1059 PgDatabase
*db
= find_database(dbname
);
1066 /* if the database not found, it's an auto database -> registering... */
1068 db
= register_auto_database(dbname
);
1073 if (db
->forced_user
)
1074 user
= db
->forced_user
;
1076 user
= find_user(username
);
1078 pool
= get_pool(db
, user
);
1082 server
= obj_alloc(server_cache
);
1086 res
= sbuf_accept(&server
->sbuf
, fd
, addr
->is_unix
);
1090 server
->suspended
= 1;
1091 server
->pool
= pool
;
1092 server
->auth_user
= user
;
1093 server
->connect_time
= server
->request_time
= get_cached_time();
1094 server
->query_start
= 0;
1096 fill_remote_addr(server
, fd
, addr
->is_unix
);
1097 fill_local_addr(server
, fd
, addr
->is_unix
);
1101 change_server_state(server
, SV_ACTIVE
);
1104 change_server_state(server
, SV_IDLE
);
1107 /* store old cancel key */
1108 pktbuf_static(&tmp
, server
->cancel_key
, 8);
1109 pktbuf_put_uint64(&tmp
, ckey
);
1112 server
->tmp_sk_oldfd
= oldfd
;
1113 server
->tmp_sk_linkfd
= linkfd
;
1115 varcache_set(&server
->vars
, "client_encoding", client_enc
);
1116 varcache_set(&server
->vars
, "standard_conforming_strings", std_string
);
1117 varcache_set(&server
->vars
, "datestyle", datestyle
);
1118 varcache_set(&server
->vars
, "timezone", timezone
);
1123 void for_each_server(PgPool
*pool
, void (*func
)(PgSocket
*sk
))
1127 statlist_for_each(item
, &pool
->idle_server_list
)
1128 func(container_of(item
, PgSocket
, head
));
1130 statlist_for_each(item
, &pool
->used_server_list
)
1131 func(container_of(item
, PgSocket
, head
));
1133 statlist_for_each(item
, &pool
->tested_server_list
)
1134 func(container_of(item
, PgSocket
, head
));
1136 statlist_for_each(item
, &pool
->active_server_list
)
1137 func(container_of(item
, PgSocket
, head
));
1139 statlist_for_each(item
, &pool
->new_server_list
)
1140 func(container_of(item
, PgSocket
, head
));
1143 static void tag_dirty(PgSocket
*sk
)
1145 sk
->close_needed
= 1;
1148 void tag_database_dirty(PgDatabase
*db
)
1153 statlist_for_each(item
, &pool_list
) {
1154 pool
= container_of(item
, PgPool
, head
);
1156 for_each_server(pool
, tag_dirty
);
1160 /* move objects from justfree_* to free_* lists */
1161 void reuse_just_freed_objects(void)
1165 bool close_works
= true;
1168 * event_del() may fail because of ENOMEM for event handlers
1169 * that need only changes sent to kernel on each loop.
1171 * Keep open sbufs in justfree lists until successful.
1174 statlist_for_each_safe(item
, &justfree_client_list
, tmp
) {
1175 sk
= container_of(item
, PgSocket
, head
);
1176 if (sbuf_is_closed(&sk
->sbuf
))
1177 change_client_state(sk
, CL_FREE
);
1178 else if (close_works
)
1179 close_works
= sbuf_close(&sk
->sbuf
);
1181 statlist_for_each_safe(item
, &justfree_server_list
, tmp
) {
1182 sk
= container_of(item
, PgSocket
, head
);
1183 if (sbuf_is_closed(&sk
->sbuf
))
1184 change_server_state(sk
, SV_FREE
);
1185 else if (close_works
)
1186 close_works
= sbuf_close(&sk
->sbuf
);