Version 1.8.0.0
[socat.git] / xio-posixmq.c
blob186e932a62068aef4e03672306c9f4f091e3702f
1 /* Source: xio-posixmq.c */
2 /* Copyright Gerhard Rieger and contributors (see file CHANGES) */
3 /* Published under the GNU General Public License V.2, see file COPYING */
5 /* This file contains the source for opening addresses of POSIX MQ type */
7 #include "xiosysincludes.h"
8 #include "xioopen.h"
10 #include "xio-socket.h"
11 #include "xio-listen.h"
12 #include "xio-posixmq.h"
13 #include "xio-named.h"
16 #if WITH_POSIXMQ
18 static int _posixmq_unlink(
19 const char *name,
20 int level); /* message level on error */
22 static int xioopen_posixmq(int argc, const char *argv[], struct opt *opts, int xioflags, xiofile_t *xfd, const struct addrdesc *addrdesc);
24 const struct addrdesc xioaddr_posixmq_bidir = { "POSIXMQ-BIDIRECTIONAL", 1+XIO_RDWR, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY, XIO_RDWR, 0, 0 HELP(":<mqname>") };
25 const struct addrdesc xioaddr_posixmq_read = { "POSIXMQ-READ", 1+XIO_RDONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY, XIO_RDONLY, 0, 0 HELP(":<mqname>") };
26 const struct addrdesc xioaddr_posixmq_receive = { "POSIXMQ-RECEIVE", 1+XIO_RDONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY|GROUP_CHILD, XIO_RDONLY, XIOREAD_RECV_ONESHOT, 0 HELP(":<mqname>") };
27 const struct addrdesc xioaddr_posixmq_send = { "POSIXMQ-SEND", 1+XIO_WRONLY, xioopen_posixmq, GROUP_FD|GROUP_NAMED|GROUP_POSIXMQ|GROUP_RETRY|GROUP_CHILD, XIO_WRONLY, 0, 0 HELP(":<mqname>") };
29 const struct optdesc opt_posixmq_priority = { "posixmq-priority", "mq-pri", OPT_POSIXMQ_PRIORITY, GROUP_POSIXMQ, PH_INIT, TYPE_BOOL, OFUNC_OFFSET, XIO_OFFSETOF(para.posixmq.prio), XIO_SIZEOF(para.posixmq.prio), 0 };
31 /* _read(): open immediately, stay in transfer loop
32 _recv(): wait until data (how we know there is??), oneshot, opt.fork
34 static int xioopen_posixmq(
35 int argc,
36 const char *argv[],
37 struct opt *opts,
38 int xioflags,
39 xiofile_t *xfd,
40 const struct addrdesc *addrdesc)
42 /* We expect the form: /mqname */
43 xiosingle_t *sfd = &xfd->stream;
44 const char *name;
45 int dirs = addrdesc->arg1;
46 int oneshot = addrdesc->arg2;
47 bool opt_unlink_early = false;
48 int oflag;
49 bool opt_o_excl = false;
50 mode_t opt_mode = 0666;
51 mqd_t mqd;
52 int _errno;
53 bool dofork = false;
54 int maxchildren = 0;
55 bool with_intv = false;
56 int result = 0;
58 if (!xioparms.experimental) {
59 Error1("%s: use option --experimental to acknowledge unmature state", argv[0]);
60 return STAT_NORETRY;
62 if (argc != 2) {
63 xio_syntax(argv[0], 1, argc-1, addrdesc->syntax);
64 return STAT_NORETRY;
67 name = argv[1];
69 retropt_bool(opts, OPT_FORK, &dofork);
70 if (dofork) {
71 if (!(xioflags & XIO_MAYFORK)) {
72 Error1("%s: option fork not allowed in this context", argv[0]);
73 return STAT_NORETRY;
75 sfd->flags |= XIO_DOESFORK;
76 if (dirs == XIO_WRONLY) {
77 with_intv = true;
81 retropt_int(opts, OPT_MAX_CHILDREN, &maxchildren);
82 if (! dofork && maxchildren) {
83 Error("option max-children not allowed without option fork");
84 return STAT_NORETRY;
86 if (maxchildren) {
87 xiosetchilddied(); /* set SIGCHLD handler */
89 applyopts_offset(sfd, opts);
90 if (applyopts_single(sfd, opts, PH_INIT) < 0) return STAT_NORETRY;
91 applyopts(sfd, -1, opts, PH_INIT);
93 if ((sfd->para.posixmq.name = strdup(name)) == NULL) {
94 Error1("strdup(\"%s\"): out of memory", name);
97 retropt_bool(opts, OPT_O_EXCL, &opt_o_excl);
98 retropt_mode(opts, OPT_PERM, &opt_mode);
100 retropt_bool(opts, OPT_UNLINK_EARLY, &opt_unlink_early);
101 if (opt_unlink_early) {
102 _posixmq_unlink(sfd->para.posixmq.name, E_INFO);
104 retropt_bool(opts, OPT_UNLINK_CLOSE, &sfd->opt_unlink_close);
105 if (sfd->howtoend == END_UNSPEC)
106 sfd->howtoend = END_CLOSE;
107 sfd->dtype = XIODATA_POSIXMQ | oneshot;
109 oflag = O_CREAT;
110 if (opt_o_excl) oflag |= O_EXCL;
111 switch (dirs) {
112 case XIO_RDWR: oflag |= O_RDWR; break;
113 case XIO_RDONLY: oflag |= O_RDONLY; break;
114 case XIO_WRONLY: oflag |= O_WRONLY; break;
117 /* Now open the message queue */
118 Debug3("mq_open(\"%s\", %d, "F_mode", NULL)", name, oflag, opt_mode);
119 mqd = mq_open(name, oflag, opt_mode, NULL);
120 _errno = errno;
121 Debug1("mq_open() -> %d", mqd);
122 if (mqd < 0) {
123 Error3("%s: mq_open(\"%s\"): %s", argv[0], name, strerror(errno));
124 errno = _errno;
125 return STAT_RETRYLATER;
127 sfd->fd = mqd;
129 if (!dofork && !oneshot) {
130 return STAT_OK;
132 /* Continue with modes that open only when data available */
134 if (!oneshot) {
135 if (xioparms.logopt == 'm') {
136 Info("starting POSIX-MQ fork loop, switching to syslog");
137 diag_set('y', xioparms.syslogfac); xioparms.logopt = 'y';
138 } else {
139 Info("starting POSIX-MQ fork loop");
143 /* Wait until a message is available (or until interval has expired),
144 then fork a sub process that handles this single message. Here we
145 continue waiting for more.
146 The trigger mechanism is described with function
147 _xioopen_dgram_recvfrom()
149 while (true) {
150 int trigger[2];
151 pid_t pid; /* mostly int; only used with fork */
152 sigset_t mask_sigchld;
154 Info1("%s: waiting for data or interval", argv[0]);
155 do {
156 struct pollfd pollfd;
158 pollfd.fd = sfd->fd;
159 pollfd.events = (dirs==XIO_RDONLY?POLLIN:POLLOUT);
160 if (xiopoll(&pollfd, 1, NULL) > 0) {
161 break;
163 if (errno == EINTR) {
164 continue;
166 Warn2("poll({%d,,},,-1): %s", sfd->fd, strerror(errno));
167 Sleep(1);
168 } while (true);
169 if (!dofork) return STAT_OK;
171 Info("generating pipe that triggers parent when packet has been consumed");
172 if (dirs == XIO_RDONLY) {
173 if (Pipe(trigger) < 0) {
174 Error1("pipe(): %s", strerror(errno));
178 /* Block SIGCHLD until parent is ready to react */
179 sigemptyset(&mask_sigchld);
180 sigaddset(&mask_sigchld, SIGCHLD);
181 Sigprocmask(SIG_BLOCK, &mask_sigchld, NULL);
183 if ((pid = xio_fork(false, E_ERROR, xfd->stream.shutup)) < 0) {
184 Sigprocmask(SIG_UNBLOCK, &mask_sigchld, NULL);
185 if (dirs==XIO_RDONLY) {
186 Close(trigger[0]);
187 Close(trigger[1]);
189 xioclose_posixmq(sfd);
190 return STAT_RETRYLATER;
192 if (pid == 0) { /* child */
193 pid_t cpid = Getpid();
194 Sigprocmask(SIG_UNBLOCK, &mask_sigchld, NULL);
195 xiosetenvulong("PID", cpid, 1);
197 if (dirs == XIO_RDONLY) {
198 Close(trigger[0]);
199 Fcntl_l(trigger[1], F_SETFD, FD_CLOEXEC);
200 sfd->triggerfd = trigger[1];
202 break;
205 /* Parent */
206 if (dirs == XIO_RDONLY) {
207 char buf[1];
208 Close(trigger[1]);
209 while (Read(trigger[0], buf, 1) < 0 && errno == EINTR)
213 #if WITH_RETRY
214 if (with_intv) {
215 Nanosleep(&sfd->intervall, NULL);
217 #endif
219 /* now we are ready to handle signals */
220 Sigprocmask(SIG_UNBLOCK, &mask_sigchld, NULL);
221 while (maxchildren) {
222 if (num_child < maxchildren) break;
223 Notice1("max of %d children is active, waiting", num_child);
224 while (!Sleep(UINT_MAX)) ; /* any signal lets us continue */
226 Info("continue listening");
229 _xio_openlate(sfd, opts);
230 return result;
234 ssize_t xiowrite_posixmq(
235 struct single *sfd,
236 const void *buff,
237 size_t bufsiz)
239 int res;
240 int _errno;
242 Debug4("mq_send(mqd=%d, %p, "F_Zu", %u)", sfd->fd, buff, bufsiz, sfd->para.posixmq.prio);
243 res = mq_send(sfd->fd, buff, bufsiz, sfd->para.posixmq.prio);
244 _errno = errno;
245 Debug1("mq_send() -> %d", res);
246 errno = _errno;
247 if (res < 0) {
248 Error2("mq_send(mqd=%d): %s", sfd->fd, strerror(errno));
249 return -1;
251 return bufsiz; /* success */
254 ssize_t xioread_posixmq(
255 struct single *sfd,
256 void *buff,
257 size_t bufsiz)
259 ssize_t res;
260 int _errno;
262 Debug3("mq_receive(mqd=%d, %p, "F_Zu", {} )", sfd->fd, buff, bufsiz);
263 res = mq_receive(sfd->fd, buff, bufsiz, &sfd->para.posixmq.prio);
264 _errno = errno;
265 Debug1("mq_receive() -> "F_Zd, res);
266 errno = _errno;
267 if (res < 0) {
268 Error2("mq_receive(mqd=%d): %s", sfd->fd, strerror(errno));
269 return -1;
271 if (sfd->triggerfd > 0) {
272 Close(sfd->triggerfd);
273 sfd->triggerfd = -1;
275 Info1("mq_receive() -> {prio=%u}", sfd->para.posixmq.prio);
276 xiosetenvulong("POSIXMQ_PRIO", (unsigned long)sfd->para.posixmq.prio, 1);
277 return res;
280 ssize_t xiopending_posixmq(struct single *sfd);
282 ssize_t xioclose_posixmq(
283 struct single *sfd)
285 int res;
286 Debug1("xioclose_posixmq(): mq_close(%d)", sfd->fd);
287 res = mq_close(sfd->fd);
288 if (res < 0) {
289 Warn2("xioclose_posixmq(): mq_close(%d) -> -1: %s", sfd->fd, strerror(errno));
290 } else {
291 Debug("xioclose_posixmq(): mq_close() -> 0");
293 if (sfd->opt_unlink_close) {
294 _posixmq_unlink(sfd->para.posixmq.name, E_WARN);
296 free((void *)sfd->para.posixmq.name);
297 return 0;
300 static int _posixmq_unlink(
301 const char *name,
302 int level) /* message level on error */
304 int _errno;
305 int res;
307 Debug1("mq_unlink(\"%s\")", name);
308 res = mq_unlink(name);
309 _errno = errno;
310 Debug1("mq_unlink() -> %d", res);
311 errno = _errno;
312 if (res < 0) {
313 Msg2(level, "mq_unlink(\"%s\"): %s",name, strerror(errno));
315 return res;
318 #endif /* WITH_POSIXMQ */