2 #include "call_frame.hpp"
3 #include "environment.hpp"
4 #include "object_utils.hpp"
5 #include "on_stack.hpp"
6 #include "primitives.hpp"
8 #include "thread_phase.hpp"
10 #include "class/channel.hpp"
11 #include "class/class.hpp"
12 #include "class/exception.hpp"
13 #include "class/fiber.hpp"
14 #include "class/fixnum.hpp"
15 #include "class/float.hpp"
16 #include "class/jit.hpp"
17 #include "class/location.hpp"
18 #include "class/lookup_table.hpp"
19 #include "class/native_method.hpp"
20 #include "class/string.hpp"
21 #include "class/symbol.hpp"
22 #include "class/thread.hpp"
23 #include "class/tuple.hpp"
24 #include "class/unwind_state.hpp"
26 #include "memory/collector.hpp"
28 #include "diagnostics/machine.hpp"
30 #include "dtrace/dtrace.h"
34 #include "missing/gettid.h"
45 void Thread::bootstrap(STATE
) {
46 GO(thread
).set(state
->memory()->new_class
<Class
, Thread
>(state
, "Thread"));
49 Thread
* Thread::create(STATE
, ThreadState
* thread_state
) {
50 return Thread::create(state
, G(thread
), thread_state
);
53 Thread
* Thread::create(STATE
, Class
* klass
, ThreadState
* thread_state
) {
54 Thread
* thread
= state
->memory()->new_object_pinned
<Thread
>(state
, klass
);
57 Exception::raise_thread_error(state
, "attempt to create Thread with NULL ThreadState*");
60 thread
->thread_state(thread_state
);
61 thread
->thread_id(state
, Fixnum::from(thread_state
->thread_id()));
63 thread_state
->set_thread(thread
);
65 thread
->fiber(state
, Fiber::create(state
, thread_state
));
66 thread
->current_fiber(state
, thread
->fiber());
71 Thread
* Thread::create(STATE
, ThreadState
* thread_state
, ThreadFunction function
) {
72 return Thread::create(state
, G(thread
), thread_state
, function
);
75 Thread
* Thread::create(STATE
, Object
* self
, ThreadFunction function
) {
76 return Thread::create(state
, self
, state
->threads()->create_thread_state(), function
);
79 Thread
* Thread::create(STATE
, Object
* self
, ThreadState
* thread_state
, ThreadFunction function
) {
80 Thread
* thr
= Thread::create(state
, as
<Class
>(self
), thread_state
);
82 thr
->function(function
);
84 state
->collector()->native_finalizer(state
, thr
,
85 (memory::FinalizerFunction
)&Thread::finalize
);
87 state
->metrics()->threads_created
++;
92 void Thread::finalize(STATE
, Thread
* thread
) {
93 if(state
->thread() == thread
) {
94 // Do not finalize ourselves.
98 if(state
->configuration()->log_thread_finalizer
.value
) {
99 logger::write("thread: finalizer: %s", thread
->thread_state()->name().c_str());
102 if(thread
->thread_state() && thread
->thread_state()->zombie_p()) {
103 thread
->thread_state()->discard();
104 thread
->thread_state(nullptr);
108 Object
* run_instance(STATE
) {
109 /* These are all referenced, so OnStack is not necessary. Additionally,
110 * thread is pinned, so we do not need to worry about it moving.
112 Thread
* thread
= state
->thread();
113 Array
* args
= thread
->args();
114 Object
* block
= thread
->block();
116 if(thread
->initialized()->false_p() || args
->nil_p() || block
->nil_p()) {
120 Object
* value
= block
->send(state
, G(sym_call
), args
, block
);
122 /* We explicitly set the current CallFrame reference to NULL because we
123 * are at the top of the stack in terms of managed code.
125 state
->set_call_frame(NULL
);
127 thread
->exception(state
, state
->unwind_state()->current_exception());
129 if(state
->unwind_state()->raise_reason() == cThreadKill
) {
130 thread
->value(state
, cNil
);
132 thread
->value(state
, value
);
135 Object
* mirror
= G(mirror
)->send(state
, state
->symbol("reflect"),
136 Array::from_tuple(state
, Tuple::from(state
, 1, thread
)));
137 mirror
->send(state
, state
->symbol("finish"));
142 Thread
* Thread::s_new(STATE
, Object
* self
, Array
* args
, Object
* stack_size
, Object
* block
) {
143 Thread
* thread
= Thread::create(state
, self
, run_instance
);
144 OnStack
<1> os(state
, thread
);
146 if(Fixnum
* size
= try_as
<Fixnum
>(stack_size
)) {
147 state
->validate_stack_size(state
, size
->to_native());
148 thread
->stack_size(state
, size
);
151 if(state
->configuration()->log_thread_lifetime
.value
) {
152 const std::regex
& filter
= state
->configuration()->log_thread_filter();
154 if(CallFrame
* call_frame
= state
->get_filtered_frame(state
, filter
)) {
155 std::ostringstream source
;
157 source
<< call_frame
->file(state
)->cpp_str(state
).c_str()
158 << ":" << call_frame
->line(state
);
160 logger::write("thread: new: %s, %s",
161 thread
->thread_state()->name().c_str(), source
.str().c_str());
163 thread
->source(state
, String::create(state
, source
.str().c_str()));
165 logger::write("thread: new: %s", thread
->thread_state()->name().c_str());
169 if(!thread
->send(state
, state
->symbol("initialize"), args
, block
, true)) {
170 state
->threads()->remove_thread_state(thread
->thread_state());
171 thread
->thread_state()->set_thread_exception();
180 Thread
* Thread::s_start(STATE
, Object
* self
, Array
* args
, Object
* stack_size
, Object
* block
) {
181 Thread
* thread
= Thread::create(state
, self
, run_instance
);
182 OnStack
<1> os(state
, thread
);
184 if(Fixnum
* size
= try_as
<Fixnum
>(stack_size
)) {
185 state
->validate_stack_size(state
, size
->to_native());
186 thread
->stack_size(state
, size
);
189 if(state
->configuration()->log_thread_lifetime
.value
) {
190 const std::regex
& filter
= state
->configuration()->log_thread_filter();
192 if(CallFrame
* call_frame
= state
->get_filtered_frame(state
, filter
)) {
193 std::ostringstream source
;
195 source
<< call_frame
->file(state
)->cpp_str(state
).c_str()
196 << ":" << call_frame
->line(state
);
198 logger::write("thread: start: %s, %s",
199 thread
->thread_state()->name().c_str(), source
.str().c_str());
201 thread
->source(state
, String::create(state
, source
.str().c_str()));
203 logger::write("thread: start: %s", thread
->thread_state()->name().c_str());
207 if(!thread
->send(state
, state
->symbol("__thread_initialize__"), args
, block
, true)) {
208 state
->threads()->remove_thread_state(thread
->thread_state());
209 thread
->thread_state()->set_thread_exception();
218 Thread
* Thread::current(STATE
) {
219 return state
->thread();
222 Object
* Thread::variable_get(STATE
, Symbol
* key
) {
223 return locals()->aref(state
, key
);
226 Object
* Thread::variable_set(STATE
, Symbol
* key
, Object
* value
) {
227 return locals()->store(state
, key
, value
);
230 Object
* Thread::variable_key_p(STATE
, Symbol
* key
) {
231 return locals()->has_key(state
, key
);
234 Array
* Thread::variables(STATE
) {
235 return locals()->all_keys(state
);
238 Array
* Thread::fiber_list(STATE
) {
239 Array
* fibers
= Array::create(state
, 0);
241 state
->threads()->each(state
, [this, fibers
](STATE
, ThreadState
* thread_state
) {
242 if(thread_state
->kind() == ThreadState::eFiber
243 && !thread_state
->fiber()->nil_p()
244 && thread_state
->fiber()->status() != Fiber::eDead
245 && thread_state
->fiber()->thread() == this) {
246 fibers
->append(state
, thread_state
->fiber());
253 Object
* Thread::fiber_variable_get(STATE
, Symbol
* key
) {
254 return current_fiber()->locals()->aref(state
, key
);
257 Object
* Thread::fiber_variable_set(STATE
, Symbol
* key
, Object
* value
) {
259 return current_fiber()->locals()->store(state
, key
, value
);
262 Object
* Thread::fiber_variable_key_p(STATE
, Symbol
* key
) {
263 return current_fiber()->locals()->has_key(state
, key
);
266 Array
* Thread::fiber_variables(STATE
) {
267 return current_fiber()->locals()->all_keys(state
);
270 Object
* Thread::status(STATE
) {
271 return Fixnum::from(thread_state()->thread_status());
274 int Thread::start_thread(STATE
, void* (*function
)(void*)) {
276 OnStack
<1> os(state
, self
);
278 pthread_attr_t attrs
;
279 pthread_attr_init(&attrs
);
280 pthread_attr_setstacksize(&attrs
, self
->stack_size()->to_native());
281 pthread_attr_setdetachstate(&attrs
, PTHREAD_CREATE_DETACHED
);
283 int status
= pthread_create(&self
->thread_state()->os_thread(), &attrs
,
284 function
, (void*)self
->thread_state());
286 pthread_attr_destroy(&attrs
);
291 void Thread::stop(STATE
, ThreadState
* thread_state
) {
295 void* Thread::run(void* ptr
) {
296 ThreadState
* state
= reinterpret_cast<ThreadState
*>(ptr
);
297 Object
* value
= nullptr;
299 state
->set_stack_bounds(state
->thread()->stack_size()->to_native());
300 state
->set_current_thread();
301 state
->set_start_time();
303 SET_THREAD_UNWIND(state
);
305 if(!state
->thread_unwinding_p()) {
306 RUBINIUS_THREAD_START(
307 const_cast<RBX_DTRACE_CHAR_P
>(state
->name().c_str()), state
->thread_id(), 0);
309 state
->thread()->pid(state
, Fixnum::from(gettid()));
311 if(state
->configuration()->log_thread_lifetime
.value
) {
312 logger::write("thread: run: %s, %d",
313 state
->name().c_str(), state
->thread()->pid()->to_native());
316 state
->metrics()->start_reporting(state
);
318 NativeMethod::init_thread(state
);
320 state
->managed_phase();
322 value
= state
->thread()->function()(state
);
323 state
->set_call_frame(nullptr);
326 std::unique_lock
<std::mutex
> lk(state
->join_lock());
328 if(state
->thread()->thread_state()->unwind_state()->raise_reason() == cException
) {
329 state
->set_thread_exception();
331 state
->set_thread_dead();
334 state
->join_cond().notify_all();
337 if(state
->lock_owned_p()) {
338 logger::write("thread: exiting while owning Threads lock: %s",
339 state
->name().c_str());
344 if(state
->configuration()->log_thread_lifetime
.value
) {
345 logger::write("thread: exit: %s %fs", state
->name().c_str(), state
->run_time());
348 if(state
->main_thread_p()
349 || (!value
&& state
->unwind_state()->raise_reason() == cSystemExit
)) {
350 state
->machine()->halt(state
, state
->unwind_state()->raise_value());
353 state
->unmanaged_phase();
355 NativeMethod::cleanup_thread(state
);
357 state
->threads()->remove_thread_state(state
);
359 RUBINIUS_THREAD_STOP(
360 const_cast<RBX_DTRACE_CHAR_P
>(state
->name().c_str()), state
->thread_id(), 0);
365 Object
* Thread::name(STATE
) {
366 return String::create(state
, thread_state()->name().c_str());
369 Object
* Thread::set_name(STATE
, String
* name
) {
370 thread_state()->set_name(state
, name
->c_str(state
));
375 void Thread::fork(STATE
) {
376 if(int error
= start_thread(state
, Thread::run
)) {
377 char buf
[RBX_STRERROR_BUFSIZE
];
378 char* err
= RBX_STRERROR(error
, buf
, RBX_STRERROR_BUFSIZE
);
379 Exception::raise_thread_error(state
, err
);
383 Object
* Thread::pass(STATE
) {
384 (void)std::this_thread::yield
;
388 Array
* Thread::list(STATE
) {
389 Array
* threads
= Array::create(state
, 0);
391 state
->threads()->each(state
, [threads
](STATE
, ThreadState
* thread_state
) {
392 Thread
*thread
= thread_state
->thread();
394 if(thread_state
->kind() == ThreadState::eThread
395 &&!thread
->nil_p() && !thread
->thread_state()->zombie_p()) {
396 threads
->append(state
, thread
);
403 Fixnum
* Thread::count(STATE
) {
406 state
->threads()->each(state
, [&](STATE
, ThreadState
* thread_state
) {
407 Thread
*thread
= thread_state
->thread();
409 if(thread_state
->kind() == ThreadState::eThread
410 &&!thread
->nil_p() && !thread
->thread_state()->zombie_p()) {
415 return Fixnum::from(count
);
418 Object
* Thread::set_priority(STATE
, Fixnum
* new_priority
) {
419 priority(state
, new_priority
);
423 Object
* Thread::get_priority(STATE
) {
427 Object
* Thread::raise(STATE
, Exception
* exc
) {
428 std::lock_guard
<std::mutex
> guard(thread_state()->thread_lock());
430 if(thread_state()->zombie_p()) return cNil
;
432 current_fiber()->thread_state()->register_raise(state
, exc
);
433 thread_state()->wakeup();
438 Object
* Thread::kill(STATE
) {
439 std::lock_guard
<std::mutex
> guard(thread_state()->thread_lock());
441 if(thread_state()->zombie_p()) return cNil
;
443 if(state
->thread() == this) {
444 thread_state()->unwind_state()->raise_thread_kill();
447 current_fiber()->thread_state()->register_kill(state
);
448 thread_state()->wakeup();
453 Object
* Thread::suspend(STATE
, Object
* duration
) {
454 if(Fixnum
* fix
= try_as
<Fixnum
>(duration
)) {
455 if(!fix
->positive_p()) {
456 Exception::raise_argument_error(state
, "time interval must be positive");
458 } else if(Float
* flt
= try_as
<Float
>(duration
)) {
459 if(flt
->value() < 0.0) {
460 Exception::raise_argument_error(state
, "time interval must be positive");
462 } else if(duration
== G(undefined
)) {
464 } else if(!duration
->nil_p()) {
465 return Primitives::failure();
468 auto start
= std::chrono::high_resolution_clock::now();
470 state
->thread()->thread_state()->sleep(duration
);
472 if(state
->thread()->thread_state()->thread_interrupted_p()) {
476 auto end
= std::chrono::high_resolution_clock::now();
477 auto elapsed
= std::chrono::duration_cast
<std::chrono::seconds
>(end
- start
);
479 return Fixnum::from(elapsed
.count());
482 Thread
* Thread::wakeup(STATE
) {
483 std::lock_guard
<std::mutex
> guard(thread_state()->thread_lock());
485 if(thread_state()->zombie_p()) {
486 Exception::raise_thread_error(state
, "attempting to wake a dead Thread");
490 thread_state()->wakeup();
495 Tuple
* Thread::context(STATE
) {
496 std::lock_guard
<std::mutex
> guard(thread_state()->thread_lock());
498 if(thread_state()->zombie_p()) return nil
<Tuple
>();
500 CallFrame
* call_frame
= thread_state()->get_ruby_frame();
501 VariableScope
* scope
= call_frame
->promote_scope(state
);
503 return Tuple::from(state
, 3, Fixnum::from(call_frame
->ip()),
504 call_frame
->compiled_code
, scope
);
507 Array
* Thread::mri_backtrace(STATE
) {
508 std::lock_guard
<std::mutex
> guard(thread_state()->thread_lock());
510 if(thread_state()->zombie_p()) return nil
<Array
>();
512 return Location::mri_backtrace(state
);
515 Thread
* Thread::join(STATE
, Object
* timeout
) {
516 if(thread_state()->zombie_p()) return this;
519 OnStack
<1> os(state
, self
);
521 // Pull this variable out before `this` may be invalid
522 ThreadState
* this_state
= thread_state();
525 UnmanagedPhase
unmanaged(state
);
526 std::unique_lock
<std::mutex
> lock(this_state
->join_lock());
528 if(timeout
->nil_p()) {
529 this_state
->join_cond().wait(lock
,
530 [&]{ return this_state
->zombie_p(); });
532 std::chrono::duration
<double> pause(as
<Float
>(timeout
)->value());
534 if(this_state
->join_cond().wait_for(lock
, pause
) == std::cv_status::timeout
) {
535 return nil
<Thread
>();