Clean up some ThreadNexus remnants.
[rubinius.git] / machine / class / thread.cpp
blob5567fe67a09172c53a9a0731edd12cf127d128ca
1 #include "memory.hpp"
2 #include "call_frame.hpp"
3 #include "environment.hpp"
4 #include "object_utils.hpp"
5 #include "on_stack.hpp"
6 #include "primitives.hpp"
7 #include "signals.hpp"
8 #include "thread_phase.hpp"
10 #include "class/channel.hpp"
11 #include "class/class.hpp"
12 #include "class/exception.hpp"
13 #include "class/fiber.hpp"
14 #include "class/fixnum.hpp"
15 #include "class/float.hpp"
16 #include "class/jit.hpp"
17 #include "class/location.hpp"
18 #include "class/lookup_table.hpp"
19 #include "class/native_method.hpp"
20 #include "class/string.hpp"
21 #include "class/symbol.hpp"
22 #include "class/thread.hpp"
23 #include "class/tuple.hpp"
24 #include "class/unwind_state.hpp"
26 #include "memory/collector.hpp"
28 #include "diagnostics/machine.hpp"
30 #include "dtrace/dtrace.h"
32 #include "logger.hpp"
34 #include "missing/gettid.h"
36 #include <atomic>
37 #include <chrono>
38 #include <mutex>
39 #include <sstream>
40 #include <regex>
41 #include <string>
42 #include <thread>
44 namespace rubinius {
45 void Thread::bootstrap(STATE) {
46 GO(thread).set(state->memory()->new_class<Class, Thread>(state, "Thread"));
49 Thread* Thread::create(STATE, ThreadState* thread_state) {
50 return Thread::create(state, G(thread), thread_state);
53 Thread* Thread::create(STATE, Class* klass, ThreadState* thread_state) {
54 Thread* thread = state->memory()->new_object_pinned<Thread>(state, klass);
56 if(!thread_state) {
57 Exception::raise_thread_error(state, "attempt to create Thread with NULL ThreadState*");
60 thread->thread_state(thread_state);
61 thread->thread_id(state, Fixnum::from(thread_state->thread_id()));
63 thread_state->set_thread(thread);
65 thread->fiber(state, Fiber::create(state, thread_state));
66 thread->current_fiber(state, thread->fiber());
68 return thread;
71 Thread* Thread::create(STATE, ThreadState* thread_state, ThreadFunction function) {
72 return Thread::create(state, G(thread), thread_state, function);
75 Thread* Thread::create(STATE, Object* self, ThreadFunction function) {
76 return Thread::create(state, self, state->threads()->create_thread_state(), function);
79 Thread* Thread::create(STATE, Object* self, ThreadState* thread_state, ThreadFunction function) {
80 Thread* thr = Thread::create(state, as<Class>(self), thread_state);
82 thr->function(function);
84 state->collector()->native_finalizer(state, thr,
85 (memory::FinalizerFunction)&Thread::finalize);
87 state->metrics()->threads_created++;
89 return thr;
92 void Thread::finalize(STATE, Thread* thread) {
93 if(state->thread() == thread) {
94 // Do not finalize ourselves.
95 return;
98 if(state->configuration()->log_thread_finalizer.value) {
99 logger::write("thread: finalizer: %s", thread->thread_state()->name().c_str());
102 if(thread->thread_state() && thread->thread_state()->zombie_p()) {
103 thread->thread_state()->discard();
104 thread->thread_state(nullptr);
108 Object* run_instance(STATE) {
109 /* These are all referenced, so OnStack is not necessary. Additionally,
110 * thread is pinned, so we do not need to worry about it moving.
112 Thread* thread = state->thread();
113 Array* args = thread->args();
114 Object* block = thread->block();
116 if(thread->initialized()->false_p() || args->nil_p() || block->nil_p()) {
117 return cNil;
120 Object* value = block->send(state, G(sym_call), args, block);
122 /* We explicitly set the current CallFrame reference to NULL because we
123 * are at the top of the stack in terms of managed code.
125 state->set_call_frame(NULL);
127 thread->exception(state, state->unwind_state()->current_exception());
129 if(state->unwind_state()->raise_reason() == cThreadKill) {
130 thread->value(state, cNil);
131 } else if(value) {
132 thread->value(state, value);
135 Object* mirror = G(mirror)->send(state, state->symbol("reflect"),
136 Array::from_tuple(state, Tuple::from(state, 1, thread)));
137 mirror->send(state, state->symbol("finish"));
139 return value;
142 Thread* Thread::s_new(STATE, Object* self, Array* args, Object* stack_size, Object* block) {
143 Thread* thread = Thread::create(state, self, run_instance);
144 OnStack<1> os(state, thread);
146 if(Fixnum* size = try_as<Fixnum>(stack_size)) {
147 state->validate_stack_size(state, size->to_native());
148 thread->stack_size(state, size);
151 if(state->configuration()->log_thread_lifetime.value) {
152 const std::regex& filter = state->configuration()->log_thread_filter();
154 if(CallFrame* call_frame = state->get_filtered_frame(state, filter)) {
155 std::ostringstream source;
157 source << call_frame->file(state)->cpp_str(state).c_str()
158 << ":" << call_frame->line(state);
160 logger::write("thread: new: %s, %s",
161 thread->thread_state()->name().c_str(), source.str().c_str());
163 thread->source(state, String::create(state, source.str().c_str()));
164 } else {
165 logger::write("thread: new: %s", thread->thread_state()->name().c_str());
169 if(!thread->send(state, state->symbol("initialize"), args, block, true)) {
170 state->threads()->remove_thread_state(thread->thread_state());
171 thread->thread_state()->set_thread_exception();
172 return nullptr;
175 thread->fork(state);
177 return thread;
180 Thread* Thread::s_start(STATE, Object* self, Array* args, Object* stack_size, Object* block) {
181 Thread* thread = Thread::create(state, self, run_instance);
182 OnStack<1> os(state, thread);
184 if(Fixnum* size = try_as<Fixnum>(stack_size)) {
185 state->validate_stack_size(state, size->to_native());
186 thread->stack_size(state, size);
189 if(state->configuration()->log_thread_lifetime.value) {
190 const std::regex& filter = state->configuration()->log_thread_filter();
192 if(CallFrame* call_frame = state->get_filtered_frame(state, filter)) {
193 std::ostringstream source;
195 source << call_frame->file(state)->cpp_str(state).c_str()
196 << ":" << call_frame->line(state);
198 logger::write("thread: start: %s, %s",
199 thread->thread_state()->name().c_str(), source.str().c_str());
201 thread->source(state, String::create(state, source.str().c_str()));
202 } else {
203 logger::write("thread: start: %s", thread->thread_state()->name().c_str());
207 if(!thread->send(state, state->symbol("__thread_initialize__"), args, block, true)) {
208 state->threads()->remove_thread_state(thread->thread_state());
209 thread->thread_state()->set_thread_exception();
210 return nullptr;
213 thread->fork(state);
215 return thread;
218 Thread* Thread::current(STATE) {
219 return state->thread();
222 Object* Thread::variable_get(STATE, Symbol* key) {
223 return locals()->aref(state, key);
226 Object* Thread::variable_set(STATE, Symbol* key, Object* value) {
227 return locals()->store(state, key, value);
230 Object* Thread::variable_key_p(STATE, Symbol* key) {
231 return locals()->has_key(state, key);
234 Array* Thread::variables(STATE) {
235 return locals()->all_keys(state);
238 Array* Thread::fiber_list(STATE) {
239 Array* fibers = Array::create(state, 0);
241 state->threads()->each(state, [this, fibers](STATE, ThreadState* thread_state) {
242 if(thread_state->kind() == ThreadState::eFiber
243 && !thread_state->fiber()->nil_p()
244 && thread_state->fiber()->status() != Fiber::eDead
245 && thread_state->fiber()->thread() == this) {
246 fibers->append(state, thread_state->fiber());
250 return fibers;
253 Object* Thread::fiber_variable_get(STATE, Symbol* key) {
254 return current_fiber()->locals()->aref(state, key);
257 Object* Thread::fiber_variable_set(STATE, Symbol* key, Object* value) {
258 check_frozen(state);
259 return current_fiber()->locals()->store(state, key, value);
262 Object* Thread::fiber_variable_key_p(STATE, Symbol* key) {
263 return current_fiber()->locals()->has_key(state, key);
266 Array* Thread::fiber_variables(STATE) {
267 return current_fiber()->locals()->all_keys(state);
270 Object* Thread::status(STATE) {
271 return Fixnum::from(thread_state()->thread_status());
274 int Thread::start_thread(STATE, void* (*function)(void*)) {
275 Thread* self = this;
276 OnStack<1> os(state, self);
278 pthread_attr_t attrs;
279 pthread_attr_init(&attrs);
280 pthread_attr_setstacksize(&attrs, self->stack_size()->to_native());
281 pthread_attr_setdetachstate(&attrs, PTHREAD_CREATE_DETACHED);
283 int status = pthread_create(&self->thread_state()->os_thread(), &attrs,
284 function, (void*)self->thread_state());
286 pthread_attr_destroy(&attrs);
288 return status;
291 void Thread::stop(STATE, ThreadState* thread_state) {
292 // TODO: Thread
295 void* Thread::run(void* ptr) {
296 ThreadState* state = reinterpret_cast<ThreadState*>(ptr);
297 Object* value = nullptr;
299 state->set_stack_bounds(state->thread()->stack_size()->to_native());
300 state->set_current_thread();
301 state->set_start_time();
303 SET_THREAD_UNWIND(state);
305 if(!state->thread_unwinding_p()) {
306 RUBINIUS_THREAD_START(
307 const_cast<RBX_DTRACE_CHAR_P>(state->name().c_str()), state->thread_id(), 0);
309 state->thread()->pid(state, Fixnum::from(gettid()));
311 if(state->configuration()->log_thread_lifetime.value) {
312 logger::write("thread: run: %s, %d",
313 state->name().c_str(), state->thread()->pid()->to_native());
316 state->metrics()->start_reporting(state);
318 NativeMethod::init_thread(state);
320 state->managed_phase();
322 value = state->thread()->function()(state);
323 state->set_call_frame(nullptr);
326 std::unique_lock<std::mutex> lk(state->join_lock());
328 if(state->thread()->thread_state()->unwind_state()->raise_reason() == cException) {
329 state->set_thread_exception();
330 } else {
331 state->set_thread_dead();
334 state->join_cond().notify_all();
337 if(state->lock_owned_p()) {
338 logger::write("thread: exiting while owning Threads lock: %s",
339 state->name().c_str());
340 state->unlock();
344 if(state->configuration()->log_thread_lifetime.value) {
345 logger::write("thread: exit: %s %fs", state->name().c_str(), state->run_time());
348 if(state->main_thread_p()
349 || (!value && state->unwind_state()->raise_reason() == cSystemExit)) {
350 state->machine()->halt(state, state->unwind_state()->raise_value());
353 state->unmanaged_phase();
355 NativeMethod::cleanup_thread(state);
357 state->threads()->remove_thread_state(state);
359 RUBINIUS_THREAD_STOP(
360 const_cast<RBX_DTRACE_CHAR_P>(state->name().c_str()), state->thread_id(), 0);
362 return 0;
365 Object* Thread::name(STATE) {
366 return String::create(state, thread_state()->name().c_str());
369 Object* Thread::set_name(STATE, String* name) {
370 thread_state()->set_name(state, name->c_str(state));
372 return name;
375 void Thread::fork(STATE) {
376 if(int error = start_thread(state, Thread::run)) {
377 char buf[RBX_STRERROR_BUFSIZE];
378 char* err = RBX_STRERROR(error, buf, RBX_STRERROR_BUFSIZE);
379 Exception::raise_thread_error(state, err);
383 Object* Thread::pass(STATE) {
384 (void)std::this_thread::yield;
385 return cNil;
388 Array* Thread::list(STATE) {
389 Array* threads = Array::create(state, 0);
391 state->threads()->each(state, [threads](STATE, ThreadState* thread_state) {
392 Thread *thread = thread_state->thread();
394 if(thread_state->kind() == ThreadState::eThread
395 &&!thread->nil_p() && !thread->thread_state()->zombie_p()) {
396 threads->append(state, thread);
400 return threads;
403 Fixnum* Thread::count(STATE) {
404 intptr_t count = 0;
406 state->threads()->each(state, [&](STATE, ThreadState* thread_state) {
407 Thread *thread = thread_state->thread();
409 if(thread_state->kind() == ThreadState::eThread
410 &&!thread->nil_p() && !thread->thread_state()->zombie_p()) {
411 count++;
415 return Fixnum::from(count);
418 Object* Thread::set_priority(STATE, Fixnum* new_priority) {
419 priority(state, new_priority);
420 return new_priority;
423 Object* Thread::get_priority(STATE) {
424 return priority();
427 Object* Thread::raise(STATE, Exception* exc) {
428 std::lock_guard<std::mutex> guard(thread_state()->thread_lock());
430 if(thread_state()->zombie_p()) return cNil;
432 current_fiber()->thread_state()->register_raise(state, exc);
433 thread_state()->wakeup();
435 return exc;
438 Object* Thread::kill(STATE) {
439 std::lock_guard<std::mutex> guard(thread_state()->thread_lock());
441 if(thread_state()->zombie_p()) return cNil;
443 if(state->thread() == this) {
444 thread_state()->unwind_state()->raise_thread_kill();
445 return nullptr;
446 } else {
447 current_fiber()->thread_state()->register_kill(state);
448 thread_state()->wakeup();
449 return this;
453 Object* Thread::suspend(STATE, Object* duration) {
454 if(Fixnum* fix = try_as<Fixnum>(duration)) {
455 if(!fix->positive_p()) {
456 Exception::raise_argument_error(state, "time interval must be positive");
458 } else if(Float* flt = try_as<Float>(duration)) {
459 if(flt->value() < 0.0) {
460 Exception::raise_argument_error(state, "time interval must be positive");
462 } else if(duration == G(undefined)) {
463 duration = cNil;
464 } else if(!duration->nil_p()) {
465 return Primitives::failure();
468 auto start = std::chrono::high_resolution_clock::now();
470 state->thread()->thread_state()->sleep(duration);
472 if(state->thread()->thread_state()->thread_interrupted_p()) {
473 return nullptr;
476 auto end = std::chrono::high_resolution_clock::now();
477 auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(end - start);
479 return Fixnum::from(elapsed.count());
482 Thread* Thread::wakeup(STATE) {
483 std::lock_guard<std::mutex> guard(thread_state()->thread_lock());
485 if(thread_state()->zombie_p()) {
486 Exception::raise_thread_error(state, "attempting to wake a dead Thread");
487 return nullptr;
490 thread_state()->wakeup();
492 return this;
495 Tuple* Thread::context(STATE) {
496 std::lock_guard<std::mutex> guard(thread_state()->thread_lock());
498 if(thread_state()->zombie_p()) return nil<Tuple>();
500 CallFrame* call_frame = thread_state()->get_ruby_frame();
501 VariableScope* scope = call_frame->promote_scope(state);
503 return Tuple::from(state, 3, Fixnum::from(call_frame->ip()),
504 call_frame->compiled_code, scope);
507 Array* Thread::mri_backtrace(STATE) {
508 std::lock_guard<std::mutex> guard(thread_state()->thread_lock());
510 if(thread_state()->zombie_p()) return nil<Array>();
512 return Location::mri_backtrace(state);
515 Thread* Thread::join(STATE, Object* timeout) {
516 if(thread_state()->zombie_p()) return this;
518 Thread* self = this;
519 OnStack<1> os(state, self);
521 // Pull this variable out before `this` may be invalid
522 ThreadState* this_state = thread_state();
525 UnmanagedPhase unmanaged(state);
526 std::unique_lock<std::mutex> lock(this_state->join_lock());
528 if(timeout->nil_p()) {
529 this_state->join_cond().wait(lock,
530 [&]{ return this_state->zombie_p(); });
531 } else {
532 std::chrono::duration<double> pause(as<Float>(timeout)->value());
534 if(this_state->join_cond().wait_for(lock, pause) == std::cv_status::timeout) {
535 return nil<Thread>();
540 return self;