1 // included by "thread_pthread.c"
5 static void timer_thread_unregister_waiting(rb_thread_t
*th
, int fd
, enum thread_sched_waiting_flag flags
);
8 timer_thread_cancel_waiting(rb_thread_t
*th
)
10 bool canceled
= false;
12 if (th
->sched
.waiting_reason
.flags
) {
13 rb_native_mutex_lock(&timer_th
.waiting_lock
);
15 if (th
->sched
.waiting_reason
.flags
) {
17 ccan_list_del_init(&th
->sched
.waiting_reason
.node
);
18 if (th
->sched
.waiting_reason
.flags
& (thread_sched_waiting_io_read
| thread_sched_waiting_io_write
)) {
19 timer_thread_unregister_waiting(th
, th
->sched
.waiting_reason
.data
.fd
, th
->sched
.waiting_reason
.flags
);
21 th
->sched
.waiting_reason
.flags
= thread_sched_waiting_none
;
24 rb_native_mutex_unlock(&timer_th
.waiting_lock
);
31 ubf_event_waiting(void *ptr
)
33 rb_thread_t
*th
= (rb_thread_t
*)ptr
;
34 struct rb_thread_sched
*sched
= TH_SCHED(th
);
36 RUBY_DEBUG_LOG("th:%u", rb_th_serial(th
));
38 VM_ASSERT(th
->nt
== NULL
|| !th_has_dedicated_nt(th
));
40 // only once. it is safe because th->interrupt_lock is already acquired.
41 th
->unblock
.func
= NULL
;
42 th
->unblock
.arg
= NULL
;
44 bool canceled
= timer_thread_cancel_waiting(th
);
46 thread_sched_lock(sched
, th
);
48 if (sched
->running
== th
) {
49 RUBY_DEBUG_LOG("not waiting yet");
52 thread_sched_to_ready_common(sched
, th
, true, false);
55 RUBY_DEBUG_LOG("already not waiting");
58 thread_sched_unlock(sched
, th
);
61 static bool timer_thread_register_waiting(rb_thread_t
*th
, int fd
, enum thread_sched_waiting_flag flags
, rb_hrtime_t
*rel
);
63 // return true if timed out
65 thread_sched_wait_events(struct rb_thread_sched
*sched
, rb_thread_t
*th
, int fd
, enum thread_sched_waiting_flag events
, rb_hrtime_t
*rel
)
67 VM_ASSERT(!th_has_dedicated_nt(th
)); // on SNT
69 volatile bool timedout
= false, need_cancel
= false;
71 if (timer_thread_register_waiting(th
, fd
, events
, rel
)) {
72 RUBY_DEBUG_LOG("wait fd:%d", fd
);
74 RB_VM_SAVE_MACHINE_CONTEXT(th
);
75 setup_ubf(th
, ubf_event_waiting
, (void *)th
);
77 RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED
, th
);
79 thread_sched_lock(sched
, th
);
81 if (th
->sched
.waiting_reason
.flags
== thread_sched_waiting_none
) {
84 else if (RUBY_VM_INTERRUPTED(th
->ec
)) {
88 RUBY_DEBUG_LOG("sleep");
90 th
->status
= THREAD_STOPPED_FOREVER
;
91 thread_sched_wakeup_next_thread(sched
, th
, true);
92 thread_sched_wait_running_turn(sched
, th
, true);
94 RUBY_DEBUG_LOG("wakeup");
97 timedout
= th
->sched
.waiting_reason
.data
.result
== 0;
99 thread_sched_unlock(sched
, th
);
102 timer_thread_cancel_waiting(th
);
105 setup_ubf(th
, NULL
, NULL
); // TODO: maybe it is already NULL?
107 th
->status
= THREAD_RUNNABLE
;
110 RUBY_DEBUG_LOG("can not wait fd:%d", fd
);
114 VM_ASSERT(sched
->running
== th
);
122 get_sysconf_page_size(void)
124 static long page_size
= 0;
126 if (UNLIKELY(page_size
== 0)) {
127 page_size
= sysconf(_SC_PAGESIZE
);
128 VM_ASSERT(page_size
< INT_MAX
);
130 return (int)page_size
;
133 #define MSTACK_CHUNK_SIZE (512 * 1024 * 1024) // 512MB
134 #define MSTACK_PAGE_SIZE get_sysconf_page_size()
135 #define MSTACK_CHUNK_PAGE_NUM (MSTACK_CHUNK_SIZE / MSTACK_PAGE_SIZE - 1) // 1 is start redzone
138 // 131,072 pages (> 65,536)
139 // 0th page is Redzone. Start from 1st page.
142 * <--> machine stack + vm stack
143 * ----------------------------------
144 * |HD...|RZ| ... |RZ| ... ... |RZ|
145 * <------------- 512MB ------------->
148 static struct nt_stack_chunk_header
{
149 struct nt_stack_chunk_header
*prev_chunk
;
150 struct nt_stack_chunk_header
*prev_free_chunk
;
153 uint16_t stack_count
;
154 uint16_t uninitialized_stack_count
;
156 uint16_t free_stack_pos
;
157 uint16_t free_stack
[];
158 } *nt_stack_chunks
= NULL
,
159 *nt_free_stack_chunks
= NULL
;
161 struct nt_machine_stack_footer
{
162 struct nt_stack_chunk_header
*ch
;
166 static rb_nativethread_lock_t nt_machine_stack_lock
= RB_NATIVETHREAD_LOCK_INIT
;
168 #include <sys/mman.h>
170 // vm_stack_size + machine_stack_size + 1 * (guard page size)
172 nt_thread_stack_size(void)
175 if (LIKELY(msz
> 0)) return msz
;
177 rb_vm_t
*vm
= GET_VM();
178 int sz
= (int)(vm
->default_params
.thread_vm_stack_size
+ vm
->default_params
.thread_machine_stack_size
+ MSTACK_PAGE_SIZE
);
179 int page_num
= roomof(sz
, MSTACK_PAGE_SIZE
);
180 msz
= (size_t)page_num
* MSTACK_PAGE_SIZE
;
184 static struct nt_stack_chunk_header
*
185 nt_alloc_thread_stack_chunk(void)
187 int mmap_flags
= MAP_ANONYMOUS
| MAP_PRIVATE
;
188 #if defined(MAP_STACK) && !defined(__FreeBSD__) && !defined(__FreeBSD_kernel__)
189 mmap_flags
|= MAP_STACK
;
192 const char *m
= (void *)mmap(NULL
, MSTACK_CHUNK_SIZE
, PROT_READ
| PROT_WRITE
, mmap_flags
, -1, 0);
193 if (m
== MAP_FAILED
) {
197 size_t msz
= nt_thread_stack_size();
198 int header_page_cnt
= 1;
199 int stack_count
= ((MSTACK_CHUNK_PAGE_NUM
- header_page_cnt
) * MSTACK_PAGE_SIZE
) / msz
;
200 int ch_size
= sizeof(struct nt_stack_chunk_header
) + sizeof(uint16_t) * stack_count
;
202 if (ch_size
> MSTACK_PAGE_SIZE
* header_page_cnt
) {
203 header_page_cnt
= (ch_size
+ MSTACK_PAGE_SIZE
- 1) / MSTACK_PAGE_SIZE
;
204 stack_count
= ((MSTACK_CHUNK_PAGE_NUM
- header_page_cnt
) * MSTACK_PAGE_SIZE
) / msz
;
207 VM_ASSERT(stack_count
<= UINT16_MAX
);
209 struct nt_stack_chunk_header
*ch
= (struct nt_stack_chunk_header
*)m
;
211 ch
->start_page
= header_page_cnt
;
212 ch
->prev_chunk
= nt_stack_chunks
;
213 ch
->prev_free_chunk
= nt_free_stack_chunks
;
214 ch
->uninitialized_stack_count
= ch
->stack_count
= (uint16_t)stack_count
;
215 ch
->free_stack_pos
= 0;
217 RUBY_DEBUG_LOG("ch:%p start_page:%d stack_cnt:%d stack_size:%d", ch
, (int)ch
->start_page
, (int)ch
->stack_count
, (int)msz
);
223 nt_stack_chunk_get_stack_start(struct nt_stack_chunk_header
*ch
, size_t idx
)
225 const char *m
= (char *)ch
;
226 return (void *)(m
+ ch
->start_page
* MSTACK_PAGE_SIZE
+ idx
* nt_thread_stack_size());
229 static struct nt_machine_stack_footer
*
230 nt_stack_chunk_get_msf(const rb_vm_t
*vm
, const char *mstack
)
232 // TODO: stack direction
233 const size_t msz
= vm
->default_params
.thread_machine_stack_size
;
234 return (struct nt_machine_stack_footer
*)&mstack
[msz
- sizeof(struct nt_machine_stack_footer
)];
238 nt_stack_chunk_get_stack(const rb_vm_t
*vm
, struct nt_stack_chunk_header
*ch
, size_t idx
, void **vm_stack
, void **machine_stack
)
240 // TODO: only support stack going down
241 // [VM ... <GUARD> machine stack ...]
243 const char *vstack
, *mstack
;
244 const char *guard_page
;
245 vstack
= nt_stack_chunk_get_stack_start(ch
, idx
);
246 guard_page
= vstack
+ vm
->default_params
.thread_vm_stack_size
;
247 mstack
= guard_page
+ MSTACK_PAGE_SIZE
;
249 struct nt_machine_stack_footer
*msf
= nt_stack_chunk_get_msf(vm
, mstack
);
254 RUBY_DEBUG_LOG("msf:%p vstack:%p-%p guard_page:%p-%p mstack:%p-%p", msf
,
255 vstack
, (void *)(guard_page
-1),
256 guard_page
, (void *)(mstack
-1),
257 mstack
, (void *)(msf
));
260 *vm_stack
= (void *)vstack
;
261 *machine_stack
= (void *)mstack
;
263 return (void *)guard_page
;
266 RBIMPL_ATTR_MAYBE_UNUSED()
268 nt_stack_chunk_dump(void)
270 struct nt_stack_chunk_header
*ch
;
273 fprintf(stderr
, "** nt_stack_chunks\n");
274 ch
= nt_stack_chunks
;
275 for (i
=0; ch
; i
++, ch
= ch
->prev_chunk
) {
276 fprintf(stderr
, "%d %p free_pos:%d\n", i
, (void *)ch
, (int)ch
->free_stack_pos
);
279 fprintf(stderr
, "** nt_free_stack_chunks\n");
280 ch
= nt_free_stack_chunks
;
281 for (i
=0; ch
; i
++, ch
= ch
->prev_free_chunk
) {
282 fprintf(stderr
, "%d %p free_pos:%d\n", i
, (void *)ch
, (int)ch
->free_stack_pos
);
287 nt_guard_page(const char *p
, size_t len
)
289 if (mprotect((void *)p
, len
, PROT_NONE
) != -1) {
298 nt_alloc_stack(rb_vm_t
*vm
, void **vm_stack
, void **machine_stack
)
302 rb_native_mutex_lock(&nt_machine_stack_lock
);
305 if (nt_free_stack_chunks
) {
306 struct nt_stack_chunk_header
*ch
= nt_free_stack_chunks
;
307 if (ch
->free_stack_pos
> 0) {
308 RUBY_DEBUG_LOG("free_stack_pos:%d", ch
->free_stack_pos
);
309 nt_stack_chunk_get_stack(vm
, ch
, ch
->free_stack
[--ch
->free_stack_pos
], vm_stack
, machine_stack
);
311 else if (ch
->uninitialized_stack_count
> 0) {
312 RUBY_DEBUG_LOG("uninitialized_stack_count:%d", ch
->uninitialized_stack_count
);
314 size_t idx
= ch
->stack_count
- ch
->uninitialized_stack_count
--;
315 void *guard_page
= nt_stack_chunk_get_stack(vm
, ch
, idx
, vm_stack
, machine_stack
);
316 err
= nt_guard_page(guard_page
, MSTACK_PAGE_SIZE
);
319 nt_free_stack_chunks
= ch
->prev_free_chunk
;
320 ch
->prev_free_chunk
= NULL
;
325 struct nt_stack_chunk_header
*p
= nt_alloc_thread_stack_chunk();
330 nt_free_stack_chunks
= nt_stack_chunks
= p
;
335 rb_native_mutex_unlock(&nt_machine_stack_lock
);
341 nt_free_stack(void *mstack
)
345 rb_native_mutex_lock(&nt_machine_stack_lock
);
347 struct nt_machine_stack_footer
*msf
= nt_stack_chunk_get_msf(GET_VM(), mstack
);
348 struct nt_stack_chunk_header
*ch
= msf
->ch
;
349 int idx
= (int)msf
->index
;
350 void *stack
= nt_stack_chunk_get_stack_start(ch
, idx
);
352 RUBY_DEBUG_LOG("stack:%p mstack:%p ch:%p index:%d", stack
, mstack
, ch
, idx
);
354 if (ch
->prev_free_chunk
== NULL
) {
355 ch
->prev_free_chunk
= nt_free_stack_chunks
;
356 nt_free_stack_chunks
= ch
;
358 ch
->free_stack
[ch
->free_stack_pos
++] = idx
;
360 // clear the stack pages
361 #if defined(MADV_FREE)
362 int r
= madvise(stack
, nt_thread_stack_size(), MADV_FREE
);
363 #elif defined(MADV_DONTNEED)
364 int r
= madvise(stack
, nt_thread_stack_size(), MADV_DONTNEED
);
369 if (r
!= 0) rb_bug("madvise errno:%d", errno
);
371 rb_native_mutex_unlock(&nt_machine_stack_lock
);
375 native_thread_check_and_create_shared(rb_vm_t
*vm
)
377 bool need_to_make
= false;
379 rb_native_mutex_lock(&vm
->ractor
.sched
.lock
);
381 unsigned int snt_cnt
= vm
->ractor
.sched
.snt_cnt
;
382 if (!vm
->ractor
.main_ractor
->threads
.sched
.enable_mn_threads
) snt_cnt
++; // do not need snt for main ractor
384 if (((int)snt_cnt
< MINIMUM_SNT
) ||
385 (snt_cnt
< vm
->ractor
.cnt
&&
386 snt_cnt
< vm
->ractor
.sched
.max_cpu
)) {
388 RUBY_DEBUG_LOG("added snt:%u dnt:%u ractor_cnt:%u grq_cnt:%u",
389 vm
->ractor
.sched
.snt_cnt
,
390 vm
->ractor
.sched
.dnt_cnt
,
392 vm
->ractor
.sched
.grq_cnt
);
394 vm
->ractor
.sched
.snt_cnt
++;
398 RUBY_DEBUG_LOG("snt:%d ractor_cnt:%d", (int)vm
->ractor
.sched
.snt_cnt
, (int)vm
->ractor
.cnt
);
401 rb_native_mutex_unlock(&vm
->ractor
.sched
.lock
);
404 struct rb_native_thread
*nt
= native_thread_alloc();
406 return native_thread_create0(nt
);
414 co_start(struct coroutine_context
*from
, struct coroutine_context
*self
)
416 #ifdef RUBY_ASAN_ENABLED
417 __sanitizer_finish_switch_fiber(self
->fake_stack
,
418 (const void**)&from
->stack_base
, &from
->stack_size
);
421 rb_thread_t
*th
= (rb_thread_t
*)self
->argument
;
422 struct rb_thread_sched
*sched
= TH_SCHED(th
);
423 VM_ASSERT(th
->nt
!= NULL
);
424 VM_ASSERT(th
== sched
->running
);
425 VM_ASSERT(sched
->lock_owner
== NULL
);
427 // RUBY_DEBUG_LOG("th:%u", rb_th_serial(th));
429 thread_sched_set_lock_owner(sched
, th
);
430 thread_sched_add_running_thread(TH_SCHED(th
), th
);
431 thread_sched_unlock(sched
, th
);
433 RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_RESUMED
, th
);
434 call_thread_start_func_2(th
);
436 thread_sched_lock(sched
, NULL
);
438 RUBY_DEBUG_LOG("terminated th:%d", (int)th
->serial
);
440 // Thread is terminated
442 struct rb_native_thread
*nt
= th
->nt
;
443 bool is_dnt
= th_has_dedicated_nt(th
);
444 native_thread_assign(NULL
, th
);
445 rb_ractor_set_current_ec(th
->ractor
, NULL
);
448 // SNT became DNT while running. Just return to the nt_context
450 th
->sched
.finished
= true;
451 coroutine_transfer0(self
, nt
->nt_context
, true);
454 rb_vm_t
*vm
= th
->vm
;
455 bool has_ready_ractor
= vm
->ractor
.sched
.grq_cnt
> 0; // at least this ractor is not queued
456 rb_thread_t
*next_th
= sched
->running
;
458 if (!has_ready_ractor
&& next_th
&& !next_th
->nt
) {
459 // switch to the next thread
460 thread_sched_set_lock_owner(sched
, NULL
);
461 thread_sched_switch0(th
->sched
.context
, next_th
, nt
, true);
462 th
->sched
.finished
= true;
465 // switch to the next Ractor
466 th
->sched
.finished
= true;
467 coroutine_transfer0(self
, nt
->nt_context
, true);
471 rb_bug("unreachable");
475 native_thread_create_shared(rb_thread_t
*th
)
478 rb_vm_t
*vm
= th
->vm
;
479 void *vm_stack
= NULL
, *machine_stack
= NULL
;
480 int err
= nt_alloc_stack(vm
, &vm_stack
, &machine_stack
);
483 VM_ASSERT(vm_stack
< machine_stack
);
486 size_t vm_stack_words
= th
->vm
->default_params
.thread_vm_stack_size
/sizeof(VALUE
);
487 rb_ec_initialize_vm_stack(th
->ec
, vm_stack
, vm_stack_words
);
489 // setup machine stack
490 size_t machine_stack_size
= vm
->default_params
.thread_machine_stack_size
- sizeof(struct nt_machine_stack_footer
);
491 th
->ec
->machine
.stack_start
= (void *)((uintptr_t)machine_stack
+ machine_stack_size
);
492 th
->ec
->machine
.stack_maxsize
= machine_stack_size
; // TODO
493 th
->sched
.context_stack
= machine_stack
;
495 th
->sched
.context
= ruby_xmalloc(sizeof(struct coroutine_context
));
496 coroutine_initialize(th
->sched
.context
, co_start
, machine_stack
, machine_stack_size
);
497 th
->sched
.context
->argument
= th
;
499 RUBY_DEBUG_LOG("th:%u vm_stack:%p machine_stack:%p", rb_th_serial(th
), vm_stack
, machine_stack
);
500 thread_sched_to_ready(TH_SCHED(th
), th
);
503 return native_thread_check_and_create_shared(th
->vm
);
506 #else // USE_MN_THREADS
509 native_thread_create_shared(rb_thread_t
*th
)
511 rb_bug("unreachable");
515 thread_sched_wait_events(struct rb_thread_sched
*sched
, rb_thread_t
*th
, int fd
, enum thread_sched_waiting_flag events
, rb_hrtime_t
*rel
)
517 rb_bug("unreachable");
520 #endif // USE_MN_THREADS
522 /// EPOLL/KQUEUE specific code
523 #if (HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H) && USE_MN_THREADS
526 fd_readable_nonblock(int fd
)
528 struct pollfd pfd
= {
532 return poll(&pfd
, 1, 0) != 0;
536 fd_writable_nonblock(int fd
)
538 struct pollfd pfd
= {
542 return poll(&pfd
, 1, 0) != 0;
546 verify_waiting_list(void)
548 #if VM_CHECK_MODE > 0
549 rb_thread_t
*wth
, *prev_wth
= NULL
;
550 ccan_list_for_each(&timer_th
.waiting
, wth
, sched
.waiting_reason
.node
) {
551 // fprintf(stderr, "verify_waiting_list th:%u abs:%lu\n", rb_th_serial(wth), (unsigned long)wth->sched.waiting_reason.data.timeout);
553 rb_hrtime_t timeout
= wth
->sched
.waiting_reason
.data
.timeout
;
554 rb_hrtime_t prev_timeout
= prev_wth
->sched
.waiting_reason
.data
.timeout
;
555 VM_ASSERT(timeout
== 0 || prev_timeout
<= timeout
);
562 #if HAVE_SYS_EVENT_H // kqueue helpers
564 static enum thread_sched_waiting_flag
565 kqueue_translate_filter_to_flags(int16_t filter
)
569 return thread_sched_waiting_io_read
;
571 return thread_sched_waiting_io_write
;
573 return thread_sched_waiting_timeout
;
575 rb_bug("kevent filter:%d not supported", filter
);
580 kqueue_wait(rb_vm_t
*vm
)
582 struct timespec calculated_timeout
;
583 struct timespec
*timeout
= NULL
;
584 int timeout_ms
= timer_thread_set_timeout(vm
);
586 if (timeout_ms
>= 0) {
587 calculated_timeout
.tv_sec
= timeout_ms
/ 1000;
588 calculated_timeout
.tv_nsec
= (timeout_ms
% 1000) * 1000000;
589 timeout
= &calculated_timeout
;
592 return kevent(timer_th
.event_fd
, NULL
, 0, timer_th
.finished_events
, KQUEUE_EVENTS_MAX
, timeout
);
598 if ((timer_th
.event_fd
= kqueue()) == -1) rb_bug("kqueue creation failed (errno:%d)", errno
);
599 int flags
= fcntl(timer_th
.event_fd
, F_GETFD
);
601 rb_bug("kqueue GETFD failed (errno:%d)", errno
);
605 if (fcntl(timer_th
.event_fd
, F_SETFD
, flags
) == -1) {
606 rb_bug("kqueue SETFD failed (errno:%d)", errno
);
611 kqueue_unregister_waiting(int fd
, enum thread_sched_waiting_flag flags
)
617 if (flags
& thread_sched_waiting_io_read
) {
618 EV_SET(&ke
[num_events
], fd
, EVFILT_READ
, EV_DELETE
, 0, 0, NULL
);
621 if (flags
& thread_sched_waiting_io_write
) {
622 EV_SET(&ke
[num_events
], fd
, EVFILT_WRITE
, EV_DELETE
, 0, 0, NULL
);
625 if (kevent(timer_th
.event_fd
, ke
, num_events
, NULL
, 0, NULL
) == -1) {
627 rb_bug("unregister/kevent fails. errno:%d", errno
);
633 kqueue_already_registered(int fd
)
635 rb_thread_t
*wth
, *found_wth
= NULL
;
636 ccan_list_for_each(&timer_th
.waiting
, wth
, sched
.waiting_reason
.node
) {
637 // Similar to EEXIST in epoll_ctl, but more strict because it checks fd rather than flags
639 if (wth
->sched
.waiting_reason
.flags
&& wth
->sched
.waiting_reason
.data
.fd
== fd
) {
644 return found_wth
!= NULL
;
647 #endif // HAVE_SYS_EVENT_H
649 // return false if the fd is not waitable or not need to wait.
651 timer_thread_register_waiting(rb_thread_t
*th
, int fd
, enum thread_sched_waiting_flag flags
, rb_hrtime_t
*rel
)
653 RUBY_DEBUG_LOG("th:%u fd:%d flag:%d rel:%lu", rb_th_serial(th
), fd
, flags
, rel
? (unsigned long)*rel
: 0);
655 VM_ASSERT(th
== NULL
|| TH_SCHED(th
)->running
== th
);
656 VM_ASSERT(flags
!= 0);
658 rb_hrtime_t abs
= 0; // 0 means no timeout
662 flags
|= thread_sched_waiting_timeout
;
669 if (rel
&& *rel
> 0) {
670 flags
|= thread_sched_waiting_timeout
;
677 uint32_t epoll_events
= 0;
679 if (flags
& thread_sched_waiting_timeout
) {
680 VM_ASSERT(rel
!= NULL
);
681 abs
= rb_hrtime_add(rb_hrtime_now(), *rel
);
684 if (flags
& thread_sched_waiting_io_read
) {
685 if (!(flags
& thread_sched_waiting_io_force
) && fd_readable_nonblock(fd
)) {
686 RUBY_DEBUG_LOG("fd_readable_nonblock");
692 EV_SET(&ke
[num_events
], fd
, EVFILT_READ
, EV_ADD
, 0, 0, (void *)th
);
695 epoll_events
|= EPOLLIN
;
700 if (flags
& thread_sched_waiting_io_write
) {
701 if (!(flags
& thread_sched_waiting_io_force
) && fd_writable_nonblock(fd
)) {
702 RUBY_DEBUG_LOG("fd_writable_nonblock");
708 EV_SET(&ke
[num_events
], fd
, EVFILT_WRITE
, EV_ADD
, 0, 0, (void *)th
);
711 epoll_events
|= EPOLLOUT
;
716 rb_native_mutex_lock(&timer_th
.waiting_lock
);
719 if (num_events
> 0) {
720 if (kqueue_already_registered(fd
)) {
721 rb_native_mutex_unlock(&timer_th
.waiting_lock
);
725 if (kevent(timer_th
.event_fd
, ke
, num_events
, NULL
, 0, NULL
) == -1) {
726 RUBY_DEBUG_LOG("failed (%d)", errno
);
732 // signal received? is there a sensible way to handle this?
735 rb_bug("register/kevent failed(fd:%d, errno:%d)", fd
, errno
);
738 RUBY_DEBUG_LOG("kevent(add, fd:%d) success", fd
);
742 struct epoll_event event
= {
743 .events
= epoll_events
,
748 if (epoll_ctl(timer_th
.event_fd
, EPOLL_CTL_ADD
, fd
, &event
) == -1) {
749 RUBY_DEBUG_LOG("failed (%d)", errno
);
755 // the fd doesn't support epoll
757 // the fd is already registered by another thread
758 rb_native_mutex_unlock(&timer_th
.waiting_lock
);
762 rb_bug("register/epoll_ctl failed(fd:%d, errno:%d)", fd
, errno
);
765 RUBY_DEBUG_LOG("epoll_ctl(add, fd:%d, events:%d) success", fd
, epoll_events
);
770 VM_ASSERT(th
->sched
.waiting_reason
.flags
== thread_sched_waiting_none
);
772 // setup waiting information
774 th
->sched
.waiting_reason
.flags
= flags
;
775 th
->sched
.waiting_reason
.data
.timeout
= abs
;
776 th
->sched
.waiting_reason
.data
.fd
= fd
;
777 th
->sched
.waiting_reason
.data
.result
= 0;
780 if (abs
== 0) { // no timeout
781 VM_ASSERT(!(flags
& thread_sched_waiting_timeout
));
782 ccan_list_add_tail(&timer_th
.waiting
, &th
->sched
.waiting_reason
.node
);
785 RUBY_DEBUG_LOG("abs:%lu", (unsigned long)abs
);
786 VM_ASSERT(flags
& thread_sched_waiting_timeout
);
788 // insert th to sorted list (TODO: O(n))
789 rb_thread_t
*wth
, *prev_wth
= NULL
;
791 ccan_list_for_each(&timer_th
.waiting
, wth
, sched
.waiting_reason
.node
) {
792 if ((wth
->sched
.waiting_reason
.flags
& thread_sched_waiting_timeout
) &&
793 wth
->sched
.waiting_reason
.data
.timeout
< abs
) {
802 ccan_list_add_after(&timer_th
.waiting
, &prev_wth
->sched
.waiting_reason
.node
, &th
->sched
.waiting_reason
.node
);
805 ccan_list_add(&timer_th
.waiting
, &th
->sched
.waiting_reason
.node
);
808 verify_waiting_list();
810 // update timeout seconds
811 timer_thread_wakeup();
818 rb_native_mutex_unlock(&timer_th
.waiting_lock
);
824 timer_thread_unregister_waiting(rb_thread_t
*th
, int fd
, enum thread_sched_waiting_flag flags
)
826 RUBY_DEBUG_LOG("th:%u fd:%d", rb_th_serial(th
), fd
);
828 kqueue_unregister_waiting(fd
, flags
);
830 // Linux 2.6.9 or later is needed to pass NULL as data.
831 if (epoll_ctl(timer_th
.event_fd
, EPOLL_CTL_DEL
, fd
, NULL
) == -1) {
834 // just ignore. maybe fd is closed.
838 rb_bug("unregister/epoll_ctl fails. errno:%d", errno
);
845 timer_thread_setup_mn(void)
849 RUBY_DEBUG_LOG("kqueue_fd:%d", timer_th
.event_fd
);
851 if ((timer_th
.event_fd
= epoll_create1(EPOLL_CLOEXEC
)) == -1) rb_bug("epoll_create (errno:%d)", errno
);
852 RUBY_DEBUG_LOG("epoll_fd:%d", timer_th
.event_fd
);
854 RUBY_DEBUG_LOG("comm_fds:%d/%d", timer_th
.comm_fds
[0], timer_th
.comm_fds
[1]);
856 timer_thread_register_waiting(NULL
, timer_th
.comm_fds
[0], thread_sched_waiting_io_read
| thread_sched_waiting_io_force
, NULL
);
860 event_wait(rb_vm_t
*vm
)
863 int r
= kqueue_wait(vm
);
865 int r
= epoll_wait(timer_th
.event_fd
, timer_th
.finished_events
, EPOLL_EVENTS_MAX
, timer_thread_set_timeout(vm
));
871 * The purpose of the timer thread:
873 * (1) Periodic checking
874 * (1-1) Provide time slice for active NTs
875 * (1-2) Check NT shortage
876 * (1-3) Periodic UBF (global)
877 * (1-4) Lazy GRQ deq start
878 * (2) Receive notification
879 * (2-1) async I/O termination
882 * (2-2-2) timeout(n), I/O, ...
885 timer_thread_polling(rb_vm_t
*vm
)
887 int r
= event_wait(vm
);
889 RUBY_DEBUG_LOG("r:%d errno:%d", r
, errno
);
893 RUBY_DEBUG_LOG("timeout%s", "");
895 ractor_sched_lock(vm
, NULL
);
898 timer_thread_check_timeslice(vm
);
900 // (1-4) lazy grq deq
901 if (vm
->ractor
.sched
.grq_cnt
> 0) {
902 RUBY_DEBUG_LOG("GRQ cnt: %u", vm
->ractor
.sched
.grq_cnt
);
903 rb_native_cond_signal(&vm
->ractor
.sched
.cond
);
906 ractor_sched_unlock(vm
, NULL
);
909 native_thread_check_and_create_shared(vm
);
919 perror("event_wait");
920 rb_bug("event_wait errno:%d", errno
);
925 RUBY_DEBUG_LOG("%d event(s)", r
);
928 for (int i
=0; i
<r
; i
++) {
929 rb_thread_t
*th
= (rb_thread_t
*)timer_th
.finished_events
[i
].udata
;
930 int fd
= (int)timer_th
.finished_events
[i
].ident
;
931 int16_t filter
= timer_th
.finished_events
[i
].filter
;
934 // wakeup timerthread
935 RUBY_DEBUG_LOG("comm from fd:%d", timer_th
.comm_fds
[1]);
936 consume_communication_pipe(timer_th
.comm_fds
[0]);
939 // wakeup specific thread by IO
940 RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s",
942 (filter
== EVFILT_READ
) ? "read/" : "",
943 (filter
== EVFILT_WRITE
) ? "write/" : "");
945 rb_native_mutex_lock(&timer_th
.waiting_lock
);
947 if (th
->sched
.waiting_reason
.flags
) {
949 ccan_list_del_init(&th
->sched
.waiting_reason
.node
);
950 timer_thread_unregister_waiting(th
, fd
, kqueue_translate_filter_to_flags(filter
));
952 th
->sched
.waiting_reason
.flags
= thread_sched_waiting_none
;
953 th
->sched
.waiting_reason
.data
.fd
= -1;
954 th
->sched
.waiting_reason
.data
.result
= filter
;
956 timer_thread_wakeup_thread(th
);
962 rb_native_mutex_unlock(&timer_th
.waiting_lock
);
966 for (int i
=0; i
<r
; i
++) {
967 rb_thread_t
*th
= (rb_thread_t
*)timer_th
.finished_events
[i
].data
.ptr
;
970 // wakeup timerthread
971 RUBY_DEBUG_LOG("comm from fd:%d", timer_th
.comm_fds
[1]);
972 consume_communication_pipe(timer_th
.comm_fds
[0]);
975 // wakeup specific thread by IO
976 uint32_t events
= timer_th
.finished_events
[i
].events
;
978 RUBY_DEBUG_LOG("io event. wakeup_th:%u event:%s%s%s%s%s%s",
980 (events
& EPOLLIN
) ? "in/" : "",
981 (events
& EPOLLOUT
) ? "out/" : "",
982 (events
& EPOLLRDHUP
) ? "RDHUP/" : "",
983 (events
& EPOLLPRI
) ? "pri/" : "",
984 (events
& EPOLLERR
) ? "err/" : "",
985 (events
& EPOLLHUP
) ? "hup/" : "");
987 rb_native_mutex_lock(&timer_th
.waiting_lock
);
989 if (th
->sched
.waiting_reason
.flags
) {
991 ccan_list_del_init(&th
->sched
.waiting_reason
.node
);
992 timer_thread_unregister_waiting(th
, th
->sched
.waiting_reason
.data
.fd
, th
->sched
.waiting_reason
.flags
);
994 th
->sched
.waiting_reason
.flags
= thread_sched_waiting_none
;
995 th
->sched
.waiting_reason
.data
.fd
= -1;
996 th
->sched
.waiting_reason
.data
.result
= (int)events
;
998 timer_thread_wakeup_thread(th
);
1004 rb_native_mutex_unlock(&timer_th
.waiting_lock
);
1011 #else // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H
1014 timer_thread_setup_mn(void)
1020 timer_thread_polling(rb_vm_t
*vm
)
1022 int timeout
= timer_thread_set_timeout(vm
);
1024 struct pollfd pfd
= {
1025 .fd
= timer_th
.comm_fds
[0],
1029 int r
= poll(&pfd
, 1, timeout
);
1033 rb_native_mutex_lock(&vm
->ractor
.sched
.lock
);
1036 timer_thread_check_timeslice(vm
);
1038 rb_native_mutex_unlock(&vm
->ractor
.sched
.lock
);
1048 rb_bug("poll errno:%d", errno
);
1053 consume_communication_pipe(timer_th
.comm_fds
[0]);
1057 rb_bug("unreachbale");
1061 #endif // HAVE_SYS_EPOLL_H || HAVE_SYS_EVENT_H