Forgot to change second listen() call also to cf_listen_backlog.
[pgbouncer.git] / src / pooler.c
blob8b7fd4c20bbc5044ea3997f601a7587b68481f26
1 /*
2 * PgBouncer - Lightweight connection pooler for PostgreSQL.
3 *
4 * Copyright (c) 2007-2009 Marko Kreen, Skype Technologies OÜ
5 *
6 * Permission to use, copy, modify, and/or distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
20 * Handling of pooler listening sockets
23 #include "bouncer.h"
25 static int fd_net = 0;
26 static int fd_unix = 0;
28 static struct event ev_net;
29 static struct event ev_unix;
31 /* if sockets are registered in libevent */
32 static bool reg_net = false;
33 static bool reg_unix = false;
35 /* should listening sockets be active or suspended? */
36 static bool pooler_active = false;
38 /* on accept() failure sleep 5 seconds */
39 static struct event ev_err;
40 static struct timeval err_timeout = {5, 0};
42 /* atexit() cleanup func */
43 static void cleanup_unix_socket(void)
45 char fn[256];
47 /* avoid cleanup if exit() while suspended */
48 if (!reg_unix)
49 return;
51 snprintf(fn, sizeof(fn), "%s/.s.PGSQL.%d",
52 cf_unix_socket_dir, cf_listen_port);
53 unlink(fn);
56 void get_pooler_fds(int *p_net, int *p_unix)
58 *p_net = fd_net;
59 *p_unix = fd_unix;
62 static int create_unix_socket(const char *socket_dir, int listen_port)
64 struct sockaddr_un un;
65 int res, sock;
66 char lockfile[256];
67 struct stat st;
69 /* fill sockaddr struct */
70 memset(&un, 0, sizeof(un));
71 un.sun_family = AF_UNIX;
72 snprintf(un.sun_path, sizeof(un.sun_path),
73 "%s/.s.PGSQL.%d", socket_dir, listen_port);
75 /* check for lockfile */
76 snprintf(lockfile, sizeof(lockfile), "%s.lock", un.sun_path);
77 res = lstat(lockfile, &st);
78 if (res == 0)
79 fatal("unix port %d is in use", listen_port);
81 /* expect old bouncer gone */
82 unlink(un.sun_path);
84 /* create socket */
85 sock = socket(PF_UNIX, SOCK_STREAM, 0);
86 if (sock < 0)
87 fatal_perror("socket");
89 /* bind it */
90 res = bind(sock, (const struct sockaddr *)&un, sizeof(un));
91 if (res < 0)
92 fatal_perror("bind");
94 /* remove socket on shutdown */
95 atexit(cleanup_unix_socket);
97 /* set common options */
98 tune_socket(sock, true);
100 /* finally, accept connections */
101 res = listen(sock, cf_listen_backlog);
102 if (res < 0)
103 fatal_perror("listen");
105 res = chmod(un.sun_path, 0777);
106 if (res < 0)
107 fatal_perror("chmod");
109 log_info("listening on unix:%s", un.sun_path);
111 return sock;
115 * Notify pooler only when also data is arrived.
117 * optval specifies how long after connection attempt to wait for data.
119 * Related to tcp_synack_retries sysctl, default 5 (corresponds 180 secs).
121 * SO_ACCEPTFILTER needs to be set after listern(), maybe TCP_DEFER_ACCEPT too.
123 static void tune_accept(int sock, bool on)
125 const char *act = on ? "install" : "uninstall";
126 int res = 0;
127 #ifdef TCP_DEFER_ACCEPT
128 int val = 45; /* fixme: proper value */
129 socklen_t vlen = sizeof(val);
130 res = getsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, &vlen);
131 log_noise("old TCP_DEFER_ACCEPT on %d = %d", sock, val);
132 val = on ? 1 : 0;
133 log_noise("%s TCP_DEFER_ACCEPT on %d", act, sock);
134 res = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &val, sizeof(val));
135 #else
136 #if 0
137 #ifdef SO_ACCEPTFILTER
138 struct accept_filter_arg af, *afp = on ? &af : NULL;
139 socklen_t af_len = on ? sizeof(af) : 0;
140 memset(&af, 0, sizeof(af));
141 strcpy(af.af_name, "dataready");
142 log_noise("%s SO_ACCEPTFILTER on %d", act, sock);
143 res = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, afp, af_len);
144 #endif
145 #endif
146 #endif
147 if (res < 0)
148 log_warning("tune_accept: %s TCP_DEFER_ACCEPT/SO_ACCEPTFILTER: %s",
149 act, strerror(errno));
152 void pooler_tune_accept(bool on)
154 if (fd_net > 0)
155 tune_accept(fd_net, on);
158 static int create_net_socket(const char *listen_addr, int listen_port)
160 int sock;
161 struct sockaddr_in sa;
162 int res;
163 int val;
165 /* create socket */
166 sock = socket(AF_INET, SOCK_STREAM, 0);
167 if (sock < 0)
168 fatal_perror("socket");
170 /* parse address */
171 memset(&sa, 0, sizeof(sa));
172 sa.sin_family = AF_INET;
173 sa.sin_port = htons(cf_listen_port);
174 if (strcmp(listen_addr, "*") == 0) {
175 sa.sin_addr.s_addr = htonl(INADDR_ANY);
176 } else {
177 sa.sin_addr.s_addr = inet_addr(listen_addr);
178 if (sa.sin_addr.s_addr == INADDR_NONE)
179 fatal("cannot parse addr: '%s'", listen_addr);
182 /* relaxed binding */
183 val = 1;
184 res = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
185 if (res < 0)
186 fatal_perror("setsockopt");
188 /* bind to address */
189 res = bind(sock, (struct sockaddr *)&sa, sizeof(sa));
190 if (res < 0)
191 fatal_perror("bind");
193 /* set common options */
194 tune_socket(sock, false);
196 /* make it accept connections */
197 res = listen(sock, cf_listen_backlog);
198 if (res < 0)
199 fatal_perror("listen");
201 tune_accept(sock, cf_tcp_defer_accept);
203 log_info("listening on %s:%d", cf_listen_addr, cf_listen_port);
205 return sock;
208 static void err_wait_func(int sock, short flags, void *arg)
210 if (cf_pause_mode != P_SUSPEND)
211 resume_pooler();
214 static const char *addrpair(const PgAddr *src, const PgAddr *dst)
216 static char ip1buf[64], ip2buf[64], buf[256];
217 const char *ip1, *ip2;
218 if (src->is_unix)
219 return "unix->unix";
221 ip1 = inet_ntop(AF_INET, &src->ip_addr, ip1buf, sizeof(ip1buf));
222 if (!ip1)
223 ip1 = strerror(errno);
224 ip2 = inet_ntop(AF_INET, &dst->ip_addr, ip2buf, sizeof(ip2buf));
225 if (!ip2)
226 ip2 = strerror(errno);
227 snprintf(buf, sizeof(buf), "%s:%d -> %s:%d",
228 ip1, src->port, ip2, dst->port);
229 return buf;
232 static const char *conninfo(const PgSocket *sk)
234 if (is_server_socket(sk))
235 return addrpair(&sk->local_addr, &sk->remote_addr);
236 else
237 return addrpair(&sk->remote_addr, &sk->local_addr);
240 /* got new connection, associate it with client struct */
241 static void pool_accept(int sock, short flags, void *is_unix)
243 int fd;
244 PgSocket *client;
245 union {
246 struct sockaddr_in in;
247 struct sockaddr_un un;
248 struct sockaddr sa;
249 } addr;
250 socklen_t len = sizeof(addr);
252 if(!(flags & EV_READ)) {
253 log_warning("No EV_READ in pool_accept");
254 return;
256 loop:
257 /* get fd */
258 fd = safe_accept(sock, &addr.sa, &len);
259 if (fd < 0) {
260 if (errno == EAGAIN)
261 return;
262 else if (errno == ECONNABORTED)
263 return;
266 * probably fd limit, pointless to try often
267 * wait a bit, hope that admin resolves somehow
269 log_error("accept() failed: %s", strerror(errno));
270 evtimer_set(&ev_err, err_wait_func, NULL);
271 safe_evtimer_add(&ev_err, &err_timeout);
272 suspend_pooler();
273 return;
276 log_noise("new fd from accept=%d", fd);
277 if (is_unix) {
279 uid_t uid;
280 gid_t gid;
281 log_noise("getuid(): %d", (int)getuid());
282 if (getpeereid(fd, &uid, &gid) >= 0)
283 log_noise("unix peer uid: %d", (int)uid);
284 else
285 log_warning("unix peer uid failed: %s", strerror(errno));
287 client = accept_client(fd, NULL, true);
288 } else {
289 client = accept_client(fd, &addr.in, false);
292 if (client)
293 slog_debug(client, "P: got connection: %s", conninfo(client));
296 * there may be several clients waiting,
297 * avoid context switch by looping
299 goto loop;
302 bool use_pooler_socket(int sock, bool is_unix)
304 tune_socket(sock, is_unix);
306 if (is_unix)
307 fd_unix = sock;
308 else
309 fd_net = sock;
310 return true;
313 void suspend_pooler(void)
315 pooler_active = false;
317 if (fd_net && reg_net) {
318 if (event_del(&ev_net) < 0) {
319 log_warning("suspend_pooler, event_del: %s", strerror(errno));
320 return;
322 reg_net = false;
324 if (fd_unix && reg_unix) {
325 if (event_del(&ev_unix) < 0) {
326 log_warning("suspend_pooler, event_del: %s", strerror(errno));
327 return;
329 reg_unix = false;
333 void resume_pooler(void)
335 pooler_active = true;
337 if (fd_unix && !reg_unix) {
338 event_set(&ev_unix, fd_unix, EV_READ | EV_PERSIST, pool_accept, "1");
339 if (event_add(&ev_unix, NULL) < 0) {
340 log_warning("event_add failed: %s", strerror(errno));
341 return;
343 reg_unix = true;
346 if (fd_net && !reg_net) {
347 event_set(&ev_net, fd_net, EV_READ | EV_PERSIST, pool_accept, NULL);
348 if (event_add(&ev_net, NULL) < 0) {
349 log_warning("event_add failed: %s", strerror(errno));
351 reg_net = true;
355 /* listen on socket - should happen after all other initializations */
356 void pooler_setup(void)
358 if (cf_listen_addr && !fd_net)
359 fd_net = create_net_socket(cf_listen_addr, cf_listen_port);
361 if (*cf_unix_socket_dir && !fd_unix)
362 fd_unix = create_unix_socket(cf_unix_socket_dir, cf_listen_port);
364 if (!fd_net && !fd_unix)
365 fatal("nowhere to listen on");
367 resume_pooler();
370 /* retry previously failed suspend_pooler() / resume_pooler() */
371 void per_loop_pooler_maint(void)
373 if (pooler_active) {
374 if ((fd_unix && !reg_unix) || (fd_net && !reg_net))
375 resume_pooler();
376 } else {
377 if ((fd_unix && reg_unix) || (fd_net && reg_net))
378 suspend_pooler();