Fix links
[ruby.git] / thread_pthread_mn.c
blobb605d6a751137386b2f380731830fb3e713f1810
1 // included by "thread_pthread.c"
3 #if USE_MN_THREADS
5 static void timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags);
7 static bool
8 timer_thread_cancel_waiting(rb_thread_t *th)
10 bool canceled = false;
12 if (th->sched.waiting_reason.flags) {
13 rb_native_mutex_lock(&timer_th.waiting_lock);
15 if (th->sched.waiting_reason.flags) {
16 canceled = true;
17 ccan_list_del_init(&th->sched.waiting_reason.node);
18 if (th->sched.waiting_reason.flags & (thread_sched_waiting_io_read | thread_sched_waiting_io_write)) {
19 timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
21 th->sched.waiting_reason.flags = thread_sched_waiting_none;
24 rb_native_mutex_unlock(&timer_th.waiting_lock);
27 return canceled;
30 static void
31 ubf_event_waiting(void *ptr)
33 rb_thread_t *th = (rb_thread_t *)ptr;
34 struct rb_thread_sched *sched = TH_SCHED(th);
36 RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
38 VM_ASSERT(th->nt == NULL || !th_has_dedicated_nt(th));
40 // only once. it is safe because th->interrupt_lock is already acquired.
41 th->unblock.func = NULL;
42 th->unblock.arg = NULL;
44 bool canceled = timer_thread_cancel_waiting(th);
46 thread_sched_lock(sched, th);
48 if (sched->running == th) {
49 RUBY_DEBUG_LOG("not waiting yet");
51 else if (canceled) {
52 thread_sched_to_ready_common(sched, th, true, false);
54 else {
55 RUBY_DEBUG_LOG("already not waiting");
58 thread_sched_unlock(sched, th);
61 static bool timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel);
63 // return true if timed out
64 static bool
65 thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel)
67 VM_ASSERT(!th_has_dedicated_nt(th)); // on SNT
69 volatile bool timedout = false, need_cancel = false;
71 if (timer_thread_register_waiting(th, fd, events, rel)) {
72 RUBY_DEBUG_LOG("wait fd:%d", fd);
74 RB_VM_SAVE_MACHINE_CONTEXT(th);
75 setup_ubf(th, ubf_event_waiting, (void *)th);
77 RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th);
79 thread_sched_lock(sched, th);
81 if (th->sched.waiting_reason.flags == thread_sched_waiting_none) {
82 // already awaken
84 else if (RUBY_VM_INTERRUPTED(th->ec)) {
85 need_cancel = true;
87 else {
88 RUBY_DEBUG_LOG("sleep");
90 th->status = THREAD_STOPPED_FOREVER;
91 thread_sched_wakeup_next_thread(sched, th, true);
92 thread_sched_wait_running_turn(sched, th, true);
94 RUBY_DEBUG_LOG("wakeup");
97 timedout = th->sched.waiting_reason.data.result == 0;
99 thread_sched_unlock(sched, th);
101 if (need_cancel) {
102 timer_thread_cancel_waiting(th);
105 setup_ubf(th, NULL, NULL); // TODO: maybe it is already NULL?
107 th->status = THREAD_RUNNABLE;
109 else {
110 RUBY_DEBUG_LOG("can not wait fd:%d", fd);
111 return false;
114 VM_ASSERT(sched->running == th);
116 return timedout;
119 /// stack management
121 static int
122 get_sysconf_page_size(void)
124 static long page_size = 0;
126 if (UNLIKELY(page_size == 0)) {
127 page_size = sysconf(_SC_PAGESIZE);
128 VM_ASSERT(page_size < INT_MAX);
130 return (int)page_size;
133 #define MSTACK_CHUNK_SIZE (512 * 1024 * 1024) // 512MB
134 #define MSTACK_PAGE_SIZE get_sysconf_page_size()
135 #define MSTACK_CHUNK_PAGE_NUM (MSTACK_CHUNK_SIZE / MSTACK_PAGE_SIZE - 1) // 1 is start redzone
137 // 512MB chunk
138 // 131,072 pages (> 65,536)
139 // 0th page is Redzone. Start from 1st page.
142 * <--> machine stack + vm stack
143 * ----------------------------------
144 * |HD...|RZ| ... |RZ| ... ... |RZ|
145 * <------------- 512MB ------------->
148 static struct nt_stack_chunk_header {
149 struct nt_stack_chunk_header *prev_chunk;
150 struct nt_stack_chunk_header *prev_free_chunk;
152 uint16_t start_page;
153 uint16_t stack_count;
154 uint16_t uninitialized_stack_count;
156 uint16_t free_stack_pos;
157 uint16_t free_stack[];
158 } *nt_stack_chunks = NULL,
159 *nt_free_stack_chunks = NULL;
161 struct nt_machine_stack_footer {
162 struct nt_stack_chunk_header *ch;
163 size_t index;
166 static rb_nativethread_lock_t nt_machine_stack_lock = RB_NATIVETHREAD_LOCK_INIT;
168 #include <sys/mman.h>
170 // vm_stack_size + machine_stack_size + 1 * (guard page size)
171 static inline size_t
172 nt_thread_stack_size(void)
174 static size_t msz;
175 if (LIKELY(msz > 0)) return msz;
177 rb_vm_t *vm = GET_VM();
178 int sz = (int)(vm->default_params.thread_vm_stack_size + vm->default_params.thread_machine_stack_size + MSTACK_PAGE_SIZE);
179 int page_num = roomof(sz, MSTACK_PAGE_SIZE);
180 msz = (size_t)page_num * MSTACK_PAGE_SIZE;
181 return msz;
184 static struct nt_stack_chunk_header *
185 nt_alloc_thread_stack_chunk(void)
187 int mmap_flags = MAP_ANONYMOUS | MAP_PRIVATE;
188 #if defined(MAP_STACK) && !defined(__FreeBSD__) && !defined(__FreeBSD_kernel__)
189 mmap_flags |= MAP_STACK;
190 #endif
192 const char *m = (void *)mmap(NULL, MSTACK_CHUNK_SIZE, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
193 if (m == MAP_FAILED) {
194 return NULL;
197 size_t msz = nt_thread_stack_size();
198 int header_page_cnt = 1;
199 int stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
200 int ch_size = sizeof(struct nt_stack_chunk_header) + sizeof(uint16_t) * stack_count;
202 if (ch_size > MSTACK_PAGE_SIZE * header_page_cnt) {
203 header_page_cnt = (ch_size + MSTACK_PAGE_SIZE - 1) / MSTACK_PAGE_SIZE;
204 stack_count = ((MSTACK_CHUNK_PAGE_NUM - header_page_cnt) * MSTACK_PAGE_SIZE) / msz;
207 VM_ASSERT(stack_count <= UINT16_MAX);
209 struct nt_stack_chunk_header *ch = (struct nt_stack_chunk_header *)m;
211 ch->start_page = header_page_cnt;
212 ch->prev_chunk = nt_stack_chunks;
213 ch->prev_free_chunk = nt_free_stack_chunks;
214 ch->uninitialized_stack_count = ch->stack_count = (uint16_t)stack_count;
215 ch->free_stack_pos = 0;
217 RUBY_DEBUG_LOG("ch:%p start_page:%d stack_cnt:%d stack_size:%d", ch, (int)ch->start_page, (int)ch->stack_count, (int)msz);
219 return ch;
222 static void *
223 nt_stack_chunk_get_stack_start(struct nt_stack_chunk_header *ch, size_t idx)
225 const char *m = (char *)ch;
226 return (void *)(m + ch->start_page * MSTACK_PAGE_SIZE + idx * nt_thread_stack_size());
229 static struct nt_machine_stack_footer *
230 nt_stack_chunk_get_msf(const rb_vm_t *vm, const char *mstack)
232 // TODO: stack direction
233 const size_t msz = vm->default_params.thread_machine_stack_size;
234 return (struct nt_machine_stack_footer *)&mstack[msz - sizeof(struct nt_machine_stack_footer)];
237 static void *
238 nt_stack_chunk_get_stack(const rb_vm_t *vm, struct nt_stack_chunk_header *ch, size_t idx, void **vm_stack, void **machine_stack)
240 // TODO: only support stack going down
241 // [VM ... <GUARD> machine stack ...]
243 const char *vstack, *mstack;
244 const char *guard_page;
245 vstack = nt_stack_chunk_get_stack_start(ch, idx);
246 guard_page = vstack + vm->default_params.thread_vm_stack_size;
247 mstack = guard_page + MSTACK_PAGE_SIZE;
249 struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(vm, mstack);
250 msf->ch = ch;
251 msf->index = idx;
253 #if 0
254 RUBY_DEBUG_LOG("msf:%p vstack:%p-%p guard_page:%p-%p mstack:%p-%p", msf,
255 vstack, (void *)(guard_page-1),
256 guard_page, (void *)(mstack-1),
257 mstack, (void *)(msf));
258 #endif
260 *vm_stack = (void *)vstack;
261 *machine_stack = (void *)mstack;
263 return (void *)guard_page;
266 RBIMPL_ATTR_MAYBE_UNUSED()
267 static void
268 nt_stack_chunk_dump(void)
270 struct nt_stack_chunk_header *ch;
271 int i;
273 fprintf(stderr, "** nt_stack_chunks\n");
274 ch = nt_stack_chunks;
275 for (i=0; ch; i++, ch = ch->prev_chunk) {
276 fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos);
279 fprintf(stderr, "** nt_free_stack_chunks\n");
280 ch = nt_free_stack_chunks;
281 for (i=0; ch; i++, ch = ch->prev_free_chunk) {
282 fprintf(stderr, "%d %p free_pos:%d\n", i, (void *)ch, (int)ch->free_stack_pos);
286 static int
287 nt_guard_page(const char *p, size_t len)
289 if (mprotect((void *)p, len, PROT_NONE) != -1) {
290 return 0;
292 else {
293 return errno;
297 static int
298 nt_alloc_stack(rb_vm_t *vm, void **vm_stack, void **machine_stack)
300 int err = 0;
302 rb_native_mutex_lock(&nt_machine_stack_lock);
304 retry:
305 if (nt_free_stack_chunks) {
306 struct nt_stack_chunk_header *ch = nt_free_stack_chunks;
307 if (ch->free_stack_pos > 0) {
308 RUBY_DEBUG_LOG("free_stack_pos:%d", ch->free_stack_pos);
309 nt_stack_chunk_get_stack(vm, ch, ch->free_stack[--ch->free_stack_pos], vm_stack, machine_stack);
311 else if (ch->uninitialized_stack_count > 0) {
312 RUBY_DEBUG_LOG("uninitialized_stack_count:%d", ch->uninitialized_stack_count);
314 size_t idx = ch->stack_count - ch->uninitialized_stack_count--;
315 void *guard_page = nt_stack_chunk_get_stack(vm, ch, idx, vm_stack, machine_stack);
316 err = nt_guard_page(guard_page, MSTACK_PAGE_SIZE);
318 else {
319 nt_free_stack_chunks = ch->prev_free_chunk;
320 ch->prev_free_chunk = NULL;
321 goto retry;
324 else {
325 struct nt_stack_chunk_header *p = nt_alloc_thread_stack_chunk();
326 if (p == NULL) {
327 err = errno;
329 else {
330 nt_free_stack_chunks = nt_stack_chunks = p;
331 goto retry;
335 rb_native_mutex_unlock(&nt_machine_stack_lock);
337 return err;
340 static void
341 nt_free_stack(void *mstack)
343 if (!mstack) return;
345 rb_native_mutex_lock(&nt_machine_stack_lock);
347 struct nt_machine_stack_footer *msf = nt_stack_chunk_get_msf(GET_VM(), mstack);
348 struct nt_stack_chunk_header *ch = msf->ch;
349 int idx = (int)msf->index;
350 void *stack = nt_stack_chunk_get_stack_start(ch, idx);
352 RUBY_DEBUG_LOG("stack:%p mstack:%p ch:%p index:%d", stack, mstack, ch, idx);
354 if (ch->prev_free_chunk == NULL) {
355 ch->prev_free_chunk = nt_free_stack_chunks;
356 nt_free_stack_chunks = ch;
358 ch->free_stack[ch->free_stack_pos++] = idx;
360 // clear the stack pages
361 #if defined(MADV_FREE)
362 int r = madvise(stack, nt_thread_stack_size(), MADV_FREE);
363 #elif defined(MADV_DONTNEED)
364 int r = madvise(stack, nt_thread_stack_size(), MADV_DONTNEED);
365 #else
366 int r = 0;
367 #endif
369 if (r != 0) rb_bug("madvise errno:%d", errno);
371 rb_native_mutex_unlock(&nt_machine_stack_lock);
374 static int
375 native_thread_check_and_create_shared(rb_vm_t *vm)
377 bool need_to_make = false;
379 rb_native_mutex_lock(&vm->ractor.sched.lock);
381 unsigned int snt_cnt = vm->ractor.sched.snt_cnt;
382 if (!vm->ractor.main_ractor->threads.sched.enable_mn_threads) snt_cnt++; // do not need snt for main ractor
384 if (((int)snt_cnt < MINIMUM_SNT) ||
385 (snt_cnt < vm->ractor.cnt &&
386 snt_cnt < vm->ractor.sched.max_cpu)) {
388 RUBY_DEBUG_LOG("added snt:%u dnt:%u ractor_cnt:%u grq_cnt:%u",
389 vm->ractor.sched.snt_cnt,
390 vm->ractor.sched.dnt_cnt,
391 vm->ractor.cnt,
392 vm->ractor.sched.grq_cnt);
394 vm->ractor.sched.snt_cnt++;
395 need_to_make = true;
397 else {
398 RUBY_DEBUG_LOG("snt:%d ractor_cnt:%d", (int)vm->ractor.sched.snt_cnt, (int)vm->ractor.cnt);
401 rb_native_mutex_unlock(&vm->ractor.sched.lock);
403 if (need_to_make) {
404 struct rb_native_thread *nt = native_thread_alloc();
405 nt->vm = vm;
406 return native_thread_create0(nt);
408 else {
409 return 0;
413 static COROUTINE
414 co_start(struct coroutine_context *from, struct coroutine_context *self)
416 #ifdef RUBY_ASAN_ENABLED
417 __sanitizer_finish_switch_fiber(self->fake_stack,
418 (const void**)&from->stack_base, &from->stack_size);
419 #endif
421 rb_thread_t *th = (rb_thread_t *)self->argument;
422 struct rb_thread_sched *sched = TH_SCHED(th);
423 VM_ASSERT(th->nt != NULL);
424 VM_ASSERT(th == sched->running);
425 VM_ASSERT(sched->lock_owner == NULL);
427 // RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
429 thread_sched_set_lock_owner(sched, th);
430 thread_sched_add_running_thread(TH_SCHED(th), th);
431 thread_sched_unlock(sched, th);
433 RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED, th);
434 call_thread_start_func_2(th);
436 thread_sched_lock(sched, NULL);
438 RUBY_DEBUG_LOG("terminated th:%d", (int)th->serial);
440 // Thread is terminated
442 struct rb_native_thread *nt = th->nt;
443 bool is_dnt = th_has_dedicated_nt(th);
444 native_thread_assign(NULL, th);
445 rb_ractor_set_current_ec(th->ractor, NULL);
447 if (is_dnt) {
448 // SNT became DNT while running. Just return to the nt_context
450 th->sched.finished = true;
451 coroutine_transfer0(self, nt->nt_context, true);
453 else {
454 rb_vm_t *vm = th->vm;
455 bool has_ready_ractor = vm->ractor.sched.grq_cnt > 0; // at least this ractor is not queued
456 rb_thread_t *next_th = sched->running;
458 if (!has_ready_ractor && next_th && !next_th->nt) {
459 // switch to the next thread
460 thread_sched_set_lock_owner(sched, NULL);
461 thread_sched_switch0(th->sched.context, next_th, nt, true);
462 th->sched.finished = true;
464 else {
465 // switch to the next Ractor
466 th->sched.finished = true;
467 coroutine_transfer0(self, nt->nt_context, true);
471 rb_bug("unreachable");
474 static int
475 native_thread_create_shared(rb_thread_t *th)
477 // setup coroutine
478 rb_vm_t *vm = th->vm;
479 void *vm_stack = NULL, *machine_stack = NULL;
480 int err = nt_alloc_stack(vm, &vm_stack, &machine_stack);
481 if (err) return err;
483 VM_ASSERT(vm_stack < machine_stack);
485 // setup vm stack
486 size_t vm_stack_words = th->vm->default_params.thread_vm_stack_size/sizeof(VALUE);
487 rb_ec_initialize_vm_stack(th->ec, vm_stack, vm_stack_words);
489 // setup machine stack
490 size_t machine_stack_size = vm->default_params.thread_machine_stack_size - sizeof(struct nt_machine_stack_footer);
491 th->ec->machine.stack_start = (void *)((uintptr_t)machine_stack + machine_stack_size);
492 th->ec->machine.stack_maxsize = machine_stack_size; // TODO
493 th->sched.context_stack = machine_stack;
495 th->sched.context = ruby_xmalloc(sizeof(struct coroutine_context));
496 coroutine_initialize(th->sched.context, co_start, machine_stack, machine_stack_size);
497 th->sched.context->argument = th;
499 RUBY_DEBUG_LOG("th:%u vm_stack:%p machine_stack:%p", rb_th_serial(th), vm_stack, machine_stack);
500 thread_sched_to_ready(TH_SCHED(th), th);
502 // setup nt
503 return native_thread_check_and_create_shared(th->vm);
506 #else // USE_MN_THREADS
508 static int
509 native_thread_create_shared(rb_thread_t *th)
511 rb_bug("unreachable");
514 static bool
515 thread_sched_wait_events(struct rb_thread_sched *sched, rb_thread_t *th, int fd, enum thread_sched_waiting_flag events, rb_hrtime_t *rel)
517 rb_bug("unreachable");
520 #endif // USE_MN_THREADS
522 /// EPOLL/KQUEUE specific code
523 #if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS
525 static bool
526 fd_readable_nonblock(int fd)
528 struct pollfd pfd = {
529 .fd = fd,
530 .events = POLLIN,
532 return poll(&pfd, 1, 0) != 0;
535 static bool
536 fd_writable_nonblock(int fd)
538 struct pollfd pfd = {
539 .fd = fd,
540 .events = POLLOUT,
542 return poll(&pfd, 1, 0) != 0;
545 static void
546 verify_waiting_list(void)
548 #if VM_CHECK_MODE > 0
549 rb_thread_t *wth, *prev_wth = NULL;
550 ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) {
551 // fprintf(stderr, "verify_waiting_list th:%u abs:%lu\n", rb_th_serial(wth), (unsigned long)wth->sched.waiting_reason.data.timeout);
552 if (prev_wth) {
553 rb_hrtime_t timeout = wth->sched.waiting_reason.data.timeout;
554 rb_hrtime_t prev_timeout = prev_wth->sched.waiting_reason.data.timeout;
555 VM_ASSERT(timeout == 0 || prev_timeout <= timeout);
557 prev_wth = wth;
559 #endif
562 #if HAVE_SYS_EVENT_H // kqueue helpers
564 static enum thread_sched_waiting_flag
565 kqueue_translate_filter_to_flags(int16_t filter)
567 switch (filter) {
568 case EVFILT_READ:
569 return thread_sched_waiting_io_read;
570 case EVFILT_WRITE:
571 return thread_sched_waiting_io_write;
572 case EVFILT_TIMER:
573 return thread_sched_waiting_timeout;
574 default:
575 rb_bug("kevent filter:%d not supported", filter);
579 static int
580 kqueue_wait(rb_vm_t *vm)
582 struct timespec calculated_timeout;
583 struct timespec *timeout = NULL;
584 int timeout_ms = timer_thread_set_timeout(vm);
586 if (timeout_ms >= 0) {
587 calculated_timeout.tv_sec = timeout_ms / 1000;
588 calculated_timeout.tv_nsec = (timeout_ms % 1000) * 1000000;
589 timeout = &calculated_timeout;
592 return kevent(timer_th.event_fd, NULL, 0, timer_th.finished_events, KQUEUE_EVENTS_MAX, timeout);
595 static void
596 kqueue_create(void)
598 if ((timer_th.event_fd = kqueue()) == -1) rb_bug("kqueue creation failed (errno:%d)", errno);
599 int flags = fcntl(timer_th.event_fd, F_GETFD);
600 if (flags == -1) {
601 rb_bug("kqueue GETFD failed (errno:%d)", errno);
604 flags |= FD_CLOEXEC;
605 if (fcntl(timer_th.event_fd, F_SETFD, flags) == -1) {
606 rb_bug("kqueue SETFD failed (errno:%d)", errno);
610 static void
611 kqueue_unregister_waiting(int fd, enum thread_sched_waiting_flag flags)
613 if (flags) {
614 struct kevent ke[2];
615 int num_events = 0;
617 if (flags & thread_sched_waiting_io_read) {
618 EV_SET(&ke[num_events], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
619 num_events++;
621 if (flags & thread_sched_waiting_io_write) {
622 EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
623 num_events++;
625 if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) {
626 perror("kevent");
627 rb_bug("unregister/kevent fails. errno:%d", errno);
632 static bool
633 kqueue_already_registered(int fd)
635 rb_thread_t *wth, *found_wth = NULL;
636 ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) {
637 // Similar to EEXIST in epoll_ctl, but more strict because it checks fd rather than flags
638 // for simplicity
639 if (wth->sched.waiting_reason.flags && wth->sched.waiting_reason.data.fd == fd) {
640 found_wth = wth;
641 break;
644 return found_wth != NULL;
647 #endif // HAVE_SYS_EVENT_H
649 // return false if the fd is not waitable or not need to wait.
650 static bool
651 timer_thread_register_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags, rb_hrtime_t *rel)
653 RUBY_DEBUG_LOG("th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th), fd, flags, rel ? (unsigned long)*rel : 0);
655 VM_ASSERT(th == NULL || TH_SCHED(th)->running == th);
656 VM_ASSERT(flags != 0);
658 rb_hrtime_t abs = 0; // 0 means no timeout
660 if (rel) {
661 if (*rel > 0) {
662 flags |= thread_sched_waiting_timeout;
664 else {
665 return false;
669 if (rel && *rel > 0) {
670 flags |= thread_sched_waiting_timeout;
673 #if HAVE_SYS_EVENT_H
674 struct kevent ke[2];
675 int num_events = 0;
676 #else
677 uint32_t epoll_events = 0;
678 #endif
679 if (flags & thread_sched_waiting_timeout) {
680 VM_ASSERT(rel != NULL);
681 abs = rb_hrtime_add(rb_hrtime_now(), *rel);
684 if (flags & thread_sched_waiting_io_read) {
685 if (!(flags & thread_sched_waiting_io_force) && fd_readable_nonblock(fd)) {
686 RUBY_DEBUG_LOG("fd_readable_nonblock");
687 return false;
689 else {
690 VM_ASSERT(fd >= 0);
691 #if HAVE_SYS_EVENT_H
692 EV_SET(&ke[num_events], fd, EVFILT_READ, EV_ADD, 0, 0, (void *)th);
693 num_events++;
694 #else
695 epoll_events |= EPOLLIN;
696 #endif
700 if (flags & thread_sched_waiting_io_write) {
701 if (!(flags & thread_sched_waiting_io_force) && fd_writable_nonblock(fd)) {
702 RUBY_DEBUG_LOG("fd_writable_nonblock");
703 return false;
705 else {
706 VM_ASSERT(fd >= 0);
707 #if HAVE_SYS_EVENT_H
708 EV_SET(&ke[num_events], fd, EVFILT_WRITE, EV_ADD, 0, 0, (void *)th);
709 num_events++;
710 #else
711 epoll_events |= EPOLLOUT;
712 #endif
716 rb_native_mutex_lock(&timer_th.waiting_lock);
718 #if HAVE_SYS_EVENT_H
719 if (num_events > 0) {
720 if (kqueue_already_registered(fd)) {
721 rb_native_mutex_unlock(&timer_th.waiting_lock);
722 return false;
725 if (kevent(timer_th.event_fd, ke, num_events, NULL, 0, NULL) == -1) {
726 RUBY_DEBUG_LOG("failed (%d)", errno);
728 switch (errno) {
729 case EBADF:
730 // the fd is closed?
731 case EINTR:
732 // signal received? is there a sensible way to handle this?
733 default:
734 perror("kevent");
735 rb_bug("register/kevent failed(fd:%d, errno:%d)", fd, errno);
738 RUBY_DEBUG_LOG("kevent(add, fd:%d) success", fd);
740 #else
741 if (epoll_events) {
742 struct epoll_event event = {
743 .events = epoll_events,
744 .data = {
745 .ptr = (void *)th,
748 if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_ADD, fd, &event) == -1) {
749 RUBY_DEBUG_LOG("failed (%d)", errno);
751 switch (errno) {
752 case EBADF:
753 // the fd is closed?
754 case EPERM:
755 // the fd doesn't support epoll
756 case EEXIST:
757 // the fd is already registered by another thread
758 rb_native_mutex_unlock(&timer_th.waiting_lock);
759 return false;
760 default:
761 perror("epoll_ctl");
762 rb_bug("register/epoll_ctl failed(fd:%d, errno:%d)", fd, errno);
765 RUBY_DEBUG_LOG("epoll_ctl(add, fd:%d, events:%d) success", fd, epoll_events);
767 #endif
769 if (th) {
770 VM_ASSERT(th->sched.waiting_reason.flags == thread_sched_waiting_none);
772 // setup waiting information
774 th->sched.waiting_reason.flags = flags;
775 th->sched.waiting_reason.data.timeout = abs;
776 th->sched.waiting_reason.data.fd = fd;
777 th->sched.waiting_reason.data.result = 0;
780 if (abs == 0) { // no timeout
781 VM_ASSERT(!(flags & thread_sched_waiting_timeout));
782 ccan_list_add_tail(&timer_th.waiting, &th->sched.waiting_reason.node);
784 else {
785 RUBY_DEBUG_LOG("abs:%lu", (unsigned long)abs);
786 VM_ASSERT(flags & thread_sched_waiting_timeout);
788 // insert th to sorted list (TODO: O(n))
789 rb_thread_t *wth, *prev_wth = NULL;
791 ccan_list_for_each(&timer_th.waiting, wth, sched.waiting_reason.node) {
792 if ((wth->sched.waiting_reason.flags & thread_sched_waiting_timeout) &&
793 wth->sched.waiting_reason.data.timeout < abs) {
794 prev_wth = wth;
796 else {
797 break;
801 if (prev_wth) {
802 ccan_list_add_after(&timer_th.waiting, &prev_wth->sched.waiting_reason.node, &th->sched.waiting_reason.node);
804 else {
805 ccan_list_add(&timer_th.waiting, &th->sched.waiting_reason.node);
808 verify_waiting_list();
810 // update timeout seconds
811 timer_thread_wakeup();
814 else {
815 VM_ASSERT(abs == 0);
818 rb_native_mutex_unlock(&timer_th.waiting_lock);
820 return true;
823 static void
824 timer_thread_unregister_waiting(rb_thread_t *th, int fd, enum thread_sched_waiting_flag flags)
826 RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th), fd);
827 #if HAVE_SYS_EVENT_H
828 kqueue_unregister_waiting(fd, flags);
829 #else
830 // Linux 2.6.9 or later is needed to pass NULL as data.
831 if (epoll_ctl(timer_th.event_fd, EPOLL_CTL_DEL, fd, NULL) == -1) {
832 switch (errno) {
833 case EBADF:
834 // just ignore. maybe fd is closed.
835 break;
836 default:
837 perror("epoll_ctl");
838 rb_bug("unregister/epoll_ctl fails. errno:%d", errno);
841 #endif
844 static void
845 timer_thread_setup_mn(void)
847 #if HAVE_SYS_EVENT_H
848 kqueue_create();
849 RUBY_DEBUG_LOG("kqueue_fd:%d", timer_th.event_fd);
850 #else
851 if ((timer_th.event_fd = epoll_create1(EPOLL_CLOEXEC)) == -1) rb_bug("epoll_create (errno:%d)", errno);
852 RUBY_DEBUG_LOG("epoll_fd:%d", timer_th.event_fd);
853 #endif
854 RUBY_DEBUG_LOG("comm_fds:%d/%d", timer_th.comm_fds[0], timer_th.comm_fds[1]);
856 timer_thread_register_waiting(NULL, timer_th.comm_fds[0], thread_sched_waiting_io_read | thread_sched_waiting_io_force, NULL);
859 static int
860 event_wait(rb_vm_t *vm)
862 #if HAVE_SYS_EVENT_H
863 int r = kqueue_wait(vm);
864 #else
865 int r = epoll_wait(timer_th.event_fd, timer_th.finished_events, EPOLL_EVENTS_MAX, timer_thread_set_timeout(vm));
866 #endif
867 return r;
871 * The purpose of the timer thread:
873 * (1) Periodic checking
874 * (1-1) Provide time slice for active NTs
875 * (1-2) Check NT shortage
876 * (1-3) Periodic UBF (global)
877 * (1-4) Lazy GRQ deq start
878 * (2) Receive notification
879 * (2-1) async I/O termination
880 * (2-2) timeout
881 * (2-2-1) sleep(n)
882 * (2-2-2) timeout(n), I/O, ...
884 static void
885 timer_thread_polling(rb_vm_t *vm)
887 int r = event_wait(vm);
889 RUBY_DEBUG_LOG("r:%d errno:%d", r, errno);
891 switch (r) {
892 case 0: // timeout
893 RUBY_DEBUG_LOG("timeout%s", "");
895 ractor_sched_lock(vm, NULL);
897 // (1-1) timeslice
898 timer_thread_check_timeslice(vm);
900 // (1-4) lazy grq deq
901 if (vm->ractor.sched.grq_cnt > 0) {
902 RUBY_DEBUG_LOG("GRQ cnt: %u", vm->ractor.sched.grq_cnt);
903 rb_native_cond_signal(&vm->ractor.sched.cond);
906 ractor_sched_unlock(vm, NULL);
908 // (1-2)
909 native_thread_check_and_create_shared(vm);
911 break;
913 case -1:
914 switch (errno) {
915 case EINTR:
916 // simply retry
917 break;
918 default:
919 perror("event_wait");
920 rb_bug("event_wait errno:%d", errno);
922 break;
924 default:
925 RUBY_DEBUG_LOG("%d event(s)", r);
927 #if HAVE_SYS_EVENT_H
928 for (int i=0; i<r; i++) {
929 rb_thread_t *th = (rb_thread_t *)timer_th.finished_events[i].udata;
930 int fd = (int)timer_th.finished_events[i].ident;
931 int16_t filter = timer_th.finished_events[i].filter;
933 if (th == NULL) {
934 // wakeup timerthread
935 RUBY_DEBUG_LOG("comm from fd:%d", timer_th.comm_fds[1]);
936 consume_communication_pipe(timer_th.comm_fds[0]);
938 else {
939 // wakeup specific thread by IO
940 RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s",
941 rb_th_serial(th),
942 (filter == EVFILT_READ) ? "read/" : "",
943 (filter == EVFILT_WRITE) ? "write/" : "");
945 rb_native_mutex_lock(&timer_th.waiting_lock);
947 if (th->sched.waiting_reason.flags) {
948 // delete from chain
949 ccan_list_del_init(&th->sched.waiting_reason.node);
950 timer_thread_unregister_waiting(th, fd, kqueue_translate_filter_to_flags(filter));
952 th->sched.waiting_reason.flags = thread_sched_waiting_none;
953 th->sched.waiting_reason.data.fd = -1;
954 th->sched.waiting_reason.data.result = filter;
956 timer_thread_wakeup_thread(th);
958 else {
959 // already released
962 rb_native_mutex_unlock(&timer_th.waiting_lock);
965 #else
966 for (int i=0; i<r; i++) {
967 rb_thread_t *th = (rb_thread_t *)timer_th.finished_events[i].data.ptr;
969 if (th == NULL) {
970 // wakeup timerthread
971 RUBY_DEBUG_LOG("comm from fd:%d", timer_th.comm_fds[1]);
972 consume_communication_pipe(timer_th.comm_fds[0]);
974 else {
975 // wakeup specific thread by IO
976 uint32_t events = timer_th.finished_events[i].events;
978 RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s%s%s%s%s",
979 rb_th_serial(th),
980 (events & EPOLLIN) ? "in/" : "",
981 (events & EPOLLOUT) ? "out/" : "",
982 (events & EPOLLRDHUP) ? "RDHUP/" : "",
983 (events & EPOLLPRI) ? "pri/" : "",
984 (events & EPOLLERR) ? "err/" : "",
985 (events & EPOLLHUP) ? "hup/" : "");
987 rb_native_mutex_lock(&timer_th.waiting_lock);
989 if (th->sched.waiting_reason.flags) {
990 // delete from chain
991 ccan_list_del_init(&th->sched.waiting_reason.node);
992 timer_thread_unregister_waiting(th, th->sched.waiting_reason.data.fd, th->sched.waiting_reason.flags);
994 th->sched.waiting_reason.flags = thread_sched_waiting_none;
995 th->sched.waiting_reason.data.fd = -1;
996 th->sched.waiting_reason.data.result = (int)events;
998 timer_thread_wakeup_thread(th);
1000 else {
1001 // already released
1004 rb_native_mutex_unlock(&timer_th.waiting_lock);
1007 #endif
1011 #else // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H
1013 static void
1014 timer_thread_setup_mn(void)
1016 // do nothing
1019 static void
1020 timer_thread_polling(rb_vm_t *vm)
1022 int timeout = timer_thread_set_timeout(vm);
1024 struct pollfd pfd = {
1025 .fd = timer_th.comm_fds[0],
1026 .events = POLLIN,
1029 int r = poll(&pfd, 1, timeout);
1031 switch (r) {
1032 case 0: // timeout
1033 rb_native_mutex_lock(&vm->ractor.sched.lock);
1035 // (1-1) timeslice
1036 timer_thread_check_timeslice(vm);
1038 rb_native_mutex_unlock(&vm->ractor.sched.lock);
1039 break;
1041 case -1: // error
1042 switch (errno) {
1043 case EINTR:
1044 // simply retry
1045 break;
1046 default:
1047 perror("poll");
1048 rb_bug("poll errno:%d", errno);
1049 break;
1052 case 1:
1053 consume_communication_pipe(timer_th.comm_fds[0]);
1054 break;
1056 default:
1057 rb_bug("unreachbale");
1061 #endif // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H