1/********************************************************************** 2 3 thread.c - 4 5 $Author: nagachika $ 6 7 Copyright (C) 2004-2007 Koichi Sasada 8 9**********************************************************************/ 10 11/* 12 YARV Thread Design 13 14 model 1: Userlevel Thread 15 Same as traditional ruby thread. 16 17 model 2: Native Thread with Global VM lock 18 Using pthread (or Windows thread) and Ruby threads run concurrent. 19 20 model 3: Native Thread with fine grain lock 21 Using pthread and Ruby threads run concurrent or parallel. 22 23------------------------------------------------------------------------ 24 25 model 2: 26 A thread has mutex (GVL: Global VM Lock or Giant VM Lock) can run. 27 When thread scheduling, running thread release GVL. If running thread 28 try blocking operation, this thread must release GVL and another 29 thread can continue this flow. After blocking operation, thread 30 must check interrupt (RUBY_VM_CHECK_INTS). 31 32 Every VM can run parallel. 33 34 Ruby threads are scheduled by OS thread scheduler. 35 36------------------------------------------------------------------------ 37 38 model 3: 39 Every threads run concurrent or parallel and to access shared object 40 exclusive access control is needed. For example, to access String 41 object or Array object, fine grain lock must be locked every time. 42 */ 43 44 45/* 46 * FD_SET, FD_CLR and FD_ISSET have a small sanity check when using glibc 47 * 2.15 or later and set _FORTIFY_SOURCE > 0. 48 * However, the implementation is wrong. Even though Linux's select(2) 49 * support large fd size (>FD_SETSIZE), it wrongly assume fd is always 50 * less than FD_SETSIZE (i.e. 1024). And then when enabling HAVE_RB_FD_INIT, 51 * it doesn't work correctly and makes program abort. Therefore we need to 52 * disable FORTY_SOURCE until glibc fixes it. 53 */ 54#undef _FORTIFY_SOURCE 55#undef __USE_FORTIFY_LEVEL 56#define __USE_FORTIFY_LEVEL 0 57 58/* for model 2 */ 59 60#include "eval_intern.h" 61#include "gc.h" 62#include "internal.h" 63#include "ruby/io.h" 64#include "ruby/thread.h" 65 66#ifndef USE_NATIVE_THREAD_PRIORITY 67#define USE_NATIVE_THREAD_PRIORITY 0 68#define RUBY_THREAD_PRIORITY_MAX 3 69#define RUBY_THREAD_PRIORITY_MIN -3 70#endif 71 72#ifndef THREAD_DEBUG 73#define THREAD_DEBUG 0 74#endif 75 76#define TIMET_MAX (~(time_t)0 <= 0 ? (time_t)((~(unsigned_time_t)0) >> 1) : (time_t)(~(unsigned_time_t)0)) 77#define TIMET_MIN (~(time_t)0 <= 0 ? (time_t)(((unsigned_time_t)1) << (sizeof(time_t) * CHAR_BIT - 1)) : (time_t)0) 78 79VALUE rb_cMutex; 80VALUE rb_cThreadShield; 81 82static VALUE sym_immediate; 83static VALUE sym_on_blocking; 84static VALUE sym_never; 85 86static void sleep_timeval(rb_thread_t *th, struct timeval time, int spurious_check); 87static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check); 88static void sleep_forever(rb_thread_t *th, int nodeadlock, int spurious_check); 89static double timeofday(void); 90static int rb_threadptr_dead(rb_thread_t *th); 91static void rb_check_deadlock(rb_vm_t *vm); 92static int rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th); 93 94#define eKillSignal INT2FIX(0) 95#define eTerminateSignal INT2FIX(1) 96static volatile int system_working = 1; 97 98#define closed_stream_error GET_VM()->special_exceptions[ruby_error_closed_stream] 99 100inline static void 101st_delete_wrap(st_table *table, st_data_t key) 102{ 103 st_delete(table, &key, 0); 104} 105 106/********************************************************************************/ 107 108#define THREAD_SYSTEM_DEPENDENT_IMPLEMENTATION 109 110struct rb_blocking_region_buffer { 111 enum rb_thread_status prev_status; 112 struct rb_unblock_callback oldubf; 113}; 114 115static int set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, 116 struct rb_unblock_callback *old, int fail_if_interrupted); 117static void reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old); 118 119static inline int blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, 120 rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted); 121static inline void blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region); 122 123#ifdef __ia64 124#define RB_GC_SAVE_MACHINE_REGISTER_STACK(th) \ 125 do{(th)->machine_register_stack_end = rb_ia64_bsp();}while(0) 126#else 127#define RB_GC_SAVE_MACHINE_REGISTER_STACK(th) 128#endif 129#define RB_GC_SAVE_MACHINE_CONTEXT(th) \ 130 do { \ 131 FLUSH_REGISTER_WINDOWS; \ 132 RB_GC_SAVE_MACHINE_REGISTER_STACK(th); \ 133 setjmp((th)->machine_regs); \ 134 SET_MACHINE_STACK_END(&(th)->machine_stack_end); \ 135 } while (0) 136 137#define GVL_UNLOCK_BEGIN() do { \ 138 rb_thread_t *_th_stored = GET_THREAD(); \ 139 RB_GC_SAVE_MACHINE_CONTEXT(_th_stored); \ 140 gvl_release(_th_stored->vm); 141 142#define GVL_UNLOCK_END() \ 143 gvl_acquire(_th_stored->vm, _th_stored); \ 144 rb_thread_set_current(_th_stored); \ 145} while(0) 146 147#ifdef __GNUC__ 148#define only_if_constant(expr, notconst) (__builtin_constant_p(expr) ? (expr) : (notconst)) 149#else 150#define only_if_constant(expr, notconst) notconst 151#endif 152#define BLOCKING_REGION(exec, ubf, ubfarg, fail_if_interrupted) do { \ 153 rb_thread_t *__th = GET_THREAD(); \ 154 struct rb_blocking_region_buffer __region; \ 155 if (blocking_region_begin(__th, &__region, (ubf), (ubfarg), fail_if_interrupted) || \ 156 /* always return true unless fail_if_interrupted */ \ 157 !only_if_constant(fail_if_interrupted, TRUE)) { \ 158 exec; \ 159 blocking_region_end(__th, &__region); \ 160 }; \ 161} while(0) 162 163#if THREAD_DEBUG 164#ifdef HAVE_VA_ARGS_MACRO 165void rb_thread_debug(const char *file, int line, const char *fmt, ...); 166#define thread_debug(fmt, ...) rb_thread_debug(__FILE__, __LINE__, fmt, ##__VA_ARGS__) 167#define POSITION_FORMAT "%s:%d:" 168#define POSITION_ARGS ,file, line 169#else 170void rb_thread_debug(const char *fmt, ...); 171#define thread_debug rb_thread_debug 172#define POSITION_FORMAT 173#define POSITION_ARGS 174#endif 175 176# if THREAD_DEBUG < 0 177static int rb_thread_debug_enabled; 178 179/* 180 * call-seq: 181 * Thread.DEBUG -> num 182 * 183 * Returns the thread debug level. Available only if compiled with 184 * THREAD_DEBUG=-1. 185 */ 186 187static VALUE 188rb_thread_s_debug(void) 189{ 190 return INT2NUM(rb_thread_debug_enabled); 191} 192 193/* 194 * call-seq: 195 * Thread.DEBUG = num 196 * 197 * Sets the thread debug level. Available only if compiled with 198 * THREAD_DEBUG=-1. 199 */ 200 201static VALUE 202rb_thread_s_debug_set(VALUE self, VALUE val) 203{ 204 rb_thread_debug_enabled = RTEST(val) ? NUM2INT(val) : 0; 205 return val; 206} 207# else 208# define rb_thread_debug_enabled THREAD_DEBUG 209# endif 210#else 211#define thread_debug if(0)printf 212#endif 213 214#ifndef __ia64 215#define thread_start_func_2(th, st, rst) thread_start_func_2(th, st) 216#endif 217NOINLINE(static int thread_start_func_2(rb_thread_t *th, VALUE *stack_start, 218 VALUE *register_stack_start)); 219static void timer_thread_function(void *); 220 221#if defined(_WIN32) 222#include "thread_win32.c" 223 224#define DEBUG_OUT() \ 225 WaitForSingleObject(&debug_mutex, INFINITE); \ 226 printf(POSITION_FORMAT"%p - %s" POSITION_ARGS, GetCurrentThreadId(), buf); \ 227 fflush(stdout); \ 228 ReleaseMutex(&debug_mutex); 229 230#elif defined(HAVE_PTHREAD_H) 231#include "thread_pthread.c" 232 233#define DEBUG_OUT() \ 234 pthread_mutex_lock(&debug_mutex); \ 235 printf(POSITION_FORMAT"%#"PRIxVALUE" - %s" POSITION_ARGS, (VALUE)pthread_self(), buf); \ 236 fflush(stdout); \ 237 pthread_mutex_unlock(&debug_mutex); 238 239#else 240#error "unsupported thread type" 241#endif 242 243#if THREAD_DEBUG 244static int debug_mutex_initialized = 1; 245static rb_thread_lock_t debug_mutex; 246 247void 248rb_thread_debug( 249#ifdef HAVE_VA_ARGS_MACRO 250 const char *file, int line, 251#endif 252 const char *fmt, ...) 253{ 254 va_list args; 255 char buf[BUFSIZ]; 256 257 if (!rb_thread_debug_enabled) return; 258 259 if (debug_mutex_initialized == 1) { 260 debug_mutex_initialized = 0; 261 native_mutex_initialize(&debug_mutex); 262 } 263 264 va_start(args, fmt); 265 vsnprintf(buf, BUFSIZ, fmt, args); 266 va_end(args); 267 268 DEBUG_OUT(); 269} 270#endif 271 272void 273rb_vm_gvl_destroy(rb_vm_t *vm) 274{ 275 gvl_release(vm); 276 gvl_destroy(vm); 277 native_mutex_destroy(&vm->thread_destruct_lock); 278} 279 280void 281rb_thread_lock_unlock(rb_thread_lock_t *lock) 282{ 283 native_mutex_unlock(lock); 284} 285 286void 287rb_thread_lock_destroy(rb_thread_lock_t *lock) 288{ 289 native_mutex_destroy(lock); 290} 291 292static int 293set_unblock_function(rb_thread_t *th, rb_unblock_function_t *func, void *arg, 294 struct rb_unblock_callback *old, int fail_if_interrupted) 295{ 296 check_ints: 297 if (fail_if_interrupted) { 298 if (RUBY_VM_INTERRUPTED_ANY(th)) { 299 return FALSE; 300 } 301 } 302 else { 303 RUBY_VM_CHECK_INTS(th); 304 } 305 306 native_mutex_lock(&th->interrupt_lock); 307 if (RUBY_VM_INTERRUPTED_ANY(th)) { 308 native_mutex_unlock(&th->interrupt_lock); 309 goto check_ints; 310 } 311 else { 312 if (old) *old = th->unblock; 313 th->unblock.func = func; 314 th->unblock.arg = arg; 315 } 316 native_mutex_unlock(&th->interrupt_lock); 317 318 return TRUE; 319} 320 321static void 322reset_unblock_function(rb_thread_t *th, const struct rb_unblock_callback *old) 323{ 324 native_mutex_lock(&th->interrupt_lock); 325 th->unblock = *old; 326 native_mutex_unlock(&th->interrupt_lock); 327} 328 329static void 330rb_threadptr_interrupt_common(rb_thread_t *th, int trap) 331{ 332 native_mutex_lock(&th->interrupt_lock); 333 if (trap) 334 RUBY_VM_SET_TRAP_INTERRUPT(th); 335 else 336 RUBY_VM_SET_INTERRUPT(th); 337 if (th->unblock.func) { 338 (th->unblock.func)(th->unblock.arg); 339 } 340 else { 341 /* none */ 342 } 343 native_mutex_unlock(&th->interrupt_lock); 344} 345 346void 347rb_threadptr_interrupt(rb_thread_t *th) 348{ 349 rb_threadptr_interrupt_common(th, 0); 350} 351 352void 353rb_threadptr_trap_interrupt(rb_thread_t *th) 354{ 355 rb_threadptr_interrupt_common(th, 1); 356} 357 358static int 359terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread) 360{ 361 VALUE thval = key; 362 rb_thread_t *th; 363 GetThreadPtr(thval, th); 364 365 if (th != main_thread) { 366 thread_debug("terminate_i: %p\n", (void *)th); 367 rb_threadptr_pending_interrupt_enque(th, eTerminateSignal); 368 rb_threadptr_interrupt(th); 369 } 370 else { 371 thread_debug("terminate_i: main thread (%p)\n", (void *)th); 372 } 373 return ST_CONTINUE; 374} 375 376typedef struct rb_mutex_struct 377{ 378 rb_thread_lock_t lock; 379 rb_thread_cond_t cond; 380 struct rb_thread_struct volatile *th; 381 int cond_waiting; 382 struct rb_mutex_struct *next_mutex; 383 int allow_trap; 384} rb_mutex_t; 385 386static void rb_mutex_abandon_all(rb_mutex_t *mutexes); 387static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th); 388static void rb_mutex_abandon_locking_mutex(rb_thread_t *th); 389static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th); 390 391void 392rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th) 393{ 394 const char *err; 395 rb_mutex_t *mutex; 396 rb_mutex_t *mutexes = th->keeping_mutexes; 397 398 while (mutexes) { 399 mutex = mutexes; 400 /* rb_warn("mutex #<%p> remains to be locked by terminated thread", 401 mutexes); */ 402 mutexes = mutex->next_mutex; 403 err = rb_mutex_unlock_th(mutex, th); 404 if (err) rb_bug("invalid keeping_mutexes: %s", err); 405 } 406} 407 408void 409rb_thread_terminate_all(void) 410{ 411 rb_thread_t *th = GET_THREAD(); /* main thread */ 412 rb_vm_t *vm = th->vm; 413 414 if (vm->main_thread != th) { 415 rb_bug("rb_thread_terminate_all: called by child thread (%p, %p)", 416 (void *)vm->main_thread, (void *)th); 417 } 418 419 /* unlock all locking mutexes */ 420 rb_threadptr_unlock_all_locking_mutexes(th); 421 422 retry: 423 thread_debug("rb_thread_terminate_all (main thread: %p)\n", (void *)th); 424 st_foreach(vm->living_threads, terminate_i, (st_data_t)th); 425 426 while (!rb_thread_alone()) { 427 int state; 428 429 TH_PUSH_TAG(th); 430 if ((state = TH_EXEC_TAG()) == 0) { 431 native_sleep(th, 0); 432 RUBY_VM_CHECK_INTS_BLOCKING(th); 433 } 434 TH_POP_TAG(); 435 436 if (state) { 437 goto retry; 438 } 439 } 440} 441 442static void 443thread_cleanup_func_before_exec(void *th_ptr) 444{ 445 rb_thread_t *th = th_ptr; 446 th->status = THREAD_KILLED; 447 th->machine_stack_start = th->machine_stack_end = 0; 448#ifdef __ia64 449 th->machine_register_stack_start = th->machine_register_stack_end = 0; 450#endif 451} 452 453static void 454thread_cleanup_func(void *th_ptr, int atfork) 455{ 456 rb_thread_t *th = th_ptr; 457 458 th->locking_mutex = Qfalse; 459 thread_cleanup_func_before_exec(th_ptr); 460 461 /* 462 * Unfortunately, we can't release native threading resource at fork 463 * because libc may have unstable locking state therefore touching 464 * a threading resource may cause a deadlock. 465 */ 466 if (atfork) 467 return; 468 469 native_mutex_destroy(&th->interrupt_lock); 470 native_thread_destroy(th); 471} 472 473static VALUE rb_threadptr_raise(rb_thread_t *, int, VALUE *); 474 475void 476ruby_thread_init_stack(rb_thread_t *th) 477{ 478 native_thread_init_stack(th); 479} 480 481static int 482thread_start_func_2(rb_thread_t *th, VALUE *stack_start, VALUE *register_stack_start) 483{ 484 int state; 485 VALUE args = th->first_args; 486 rb_proc_t *proc; 487 rb_thread_list_t *join_list; 488 rb_thread_t *main_th; 489 VALUE errinfo = Qnil; 490# ifdef USE_SIGALTSTACK 491 void rb_register_sigaltstack(rb_thread_t *th); 492 493 rb_register_sigaltstack(th); 494# endif 495 496 if (th == th->vm->main_thread) 497 rb_bug("thread_start_func_2 must not used for main thread"); 498 499 ruby_thread_set_native(th); 500 501 th->machine_stack_start = stack_start; 502#ifdef __ia64 503 th->machine_register_stack_start = register_stack_start; 504#endif 505 thread_debug("thread start: %p\n", (void *)th); 506 507 gvl_acquire(th->vm, th); 508 { 509 thread_debug("thread start (get lock): %p\n", (void *)th); 510 rb_thread_set_current(th); 511 512 TH_PUSH_TAG(th); 513 if ((state = EXEC_TAG()) == 0) { 514 SAVE_ROOT_JMPBUF(th, { 515 if (!th->first_func) { 516 GetProcPtr(th->first_proc, proc); 517 th->errinfo = Qnil; 518 th->root_lep = rb_vm_ep_local_ep(proc->block.ep); 519 th->root_svar = Qnil; 520 EXEC_EVENT_HOOK(th, RUBY_EVENT_THREAD_BEGIN, th->self, 0, 0, Qundef); 521 th->value = rb_vm_invoke_proc(th, proc, (int)RARRAY_LEN(args), RARRAY_PTR(args), 0); 522 EXEC_EVENT_HOOK(th, RUBY_EVENT_THREAD_END, th->self, 0, 0, Qundef); 523 } 524 else { 525 th->value = (*th->first_func)((void *)args); 526 } 527 }); 528 } 529 else { 530 errinfo = th->errinfo; 531 if (state == TAG_FATAL) { 532 /* fatal error within this thread, need to stop whole script */ 533 } 534 else if (th->safe_level >= 4) { 535 /* Ignore it. Main thread shouldn't be harmed from untrusted thread. */ 536 errinfo = Qnil; 537 } 538 else if (rb_obj_is_kind_of(errinfo, rb_eSystemExit)) { 539 /* exit on main_thread. */ 540 } 541 else if (th->vm->thread_abort_on_exception || 542 th->abort_on_exception || RTEST(ruby_debug)) { 543 /* exit on main_thread */ 544 } 545 else { 546 errinfo = Qnil; 547 } 548 th->value = Qnil; 549 } 550 551 th->status = THREAD_KILLED; 552 thread_debug("thread end: %p\n", (void *)th); 553 554 main_th = th->vm->main_thread; 555 if (RB_TYPE_P(errinfo, T_OBJECT)) { 556 /* treat with normal error object */ 557 rb_threadptr_raise(main_th, 1, &errinfo); 558 } 559 TH_POP_TAG(); 560 561 /* locking_mutex must be Qfalse */ 562 if (th->locking_mutex != Qfalse) { 563 rb_bug("thread_start_func_2: locking_mutex must not be set (%p:%"PRIxVALUE")", 564 (void *)th, th->locking_mutex); 565 } 566 567 /* delete self other than main thread from living_threads */ 568 st_delete_wrap(th->vm->living_threads, th->self); 569 if (rb_thread_alone()) { 570 /* I'm last thread. wake up main thread from rb_thread_terminate_all */ 571 rb_threadptr_interrupt(main_th); 572 } 573 574 /* wake up joining threads */ 575 join_list = th->join_list; 576 while (join_list) { 577 rb_threadptr_interrupt(join_list->th); 578 switch (join_list->th->status) { 579 case THREAD_STOPPED: case THREAD_STOPPED_FOREVER: 580 join_list->th->status = THREAD_RUNNABLE; 581 default: break; 582 } 583 join_list = join_list->next; 584 } 585 586 rb_threadptr_unlock_all_locking_mutexes(th); 587 rb_check_deadlock(th->vm); 588 589 if (!th->root_fiber) { 590 rb_thread_recycle_stack_release(th->stack); 591 th->stack = 0; 592 } 593 } 594 native_mutex_lock(&th->vm->thread_destruct_lock); 595 /* make sure vm->running_thread never point me after this point.*/ 596 th->vm->running_thread = NULL; 597 native_mutex_unlock(&th->vm->thread_destruct_lock); 598 thread_cleanup_func(th, FALSE); 599 gvl_release(th->vm); 600 601 return 0; 602} 603 604static VALUE 605thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS)) 606{ 607 rb_thread_t *th, *current_th = GET_THREAD(); 608 int err; 609 610 if (OBJ_FROZEN(GET_THREAD()->thgroup)) { 611 rb_raise(rb_eThreadError, 612 "can't start a new thread (frozen ThreadGroup)"); 613 } 614 GetThreadPtr(thval, th); 615 616 /* setup thread environment */ 617 th->first_func = fn; 618 th->first_proc = fn ? Qfalse : rb_block_proc(); 619 th->first_args = args; /* GC: shouldn't put before above line */ 620 621 th->priority = current_th->priority; 622 th->thgroup = current_th->thgroup; 623 624 th->pending_interrupt_queue = rb_ary_tmp_new(0); 625 th->pending_interrupt_queue_checked = 0; 626 th->pending_interrupt_mask_stack = rb_ary_dup(current_th->pending_interrupt_mask_stack); 627 RBASIC(th->pending_interrupt_mask_stack)->klass = 0; 628 629 th->interrupt_mask = 0; 630 631 native_mutex_initialize(&th->interrupt_lock); 632 633 /* kick thread */ 634 err = native_thread_create(th); 635 if (err) { 636 th->status = THREAD_KILLED; 637 rb_raise(rb_eThreadError, "can't create Thread (%d)", err); 638 } 639 st_insert(th->vm->living_threads, thval, (st_data_t) th->thread_id); 640 return thval; 641} 642 643/* 644 * call-seq: 645 * Thread.new { ... } -> thread 646 * Thread.new(*args, &proc) -> thread 647 * Thread.new(*args) { |args| ... } -> thread 648 * 649 * Creates a new thread executing the given block. 650 * 651 * Any +args+ given to ::new will be passed to the block: 652 * 653 * arr = [] 654 * a, b, c = 1, 2, 3 655 * Thread.new(a,b,c) { |d,e,f| arr << d << e << f }.join 656 * arr #=> [1, 2, 3] 657 * 658 * A ThreadError exception is raised if ::new is called without a block. 659 * 660 * If you're going to subclass Thread, be sure to call super in your 661 * +initialize+ method, otherwise a ThreadError will be raised. 662 */ 663static VALUE 664thread_s_new(int argc, VALUE *argv, VALUE klass) 665{ 666 rb_thread_t *th; 667 VALUE thread = rb_thread_alloc(klass); 668 669 if (GET_VM()->main_thread->status == THREAD_KILLED) 670 rb_raise(rb_eThreadError, "can't alloc thread"); 671 672 rb_obj_call_init(thread, argc, argv); 673 GetThreadPtr(thread, th); 674 if (!th->first_args) { 675 rb_raise(rb_eThreadError, "uninitialized thread - check `%s#initialize'", 676 rb_class2name(klass)); 677 } 678 return thread; 679} 680 681/* 682 * call-seq: 683 * Thread.start([args]*) {|args| block } -> thread 684 * Thread.fork([args]*) {|args| block } -> thread 685 * 686 * Basically the same as ::new. However, if class Thread is subclassed, then 687 * calling +start+ in that subclass will not invoke the subclass's 688 * +initialize+ method. 689 */ 690 691static VALUE 692thread_start(VALUE klass, VALUE args) 693{ 694 return thread_create_core(rb_thread_alloc(klass), args, 0); 695} 696 697/* :nodoc: */ 698static VALUE 699thread_initialize(VALUE thread, VALUE args) 700{ 701 rb_thread_t *th; 702 if (!rb_block_given_p()) { 703 rb_raise(rb_eThreadError, "must be called with a block"); 704 } 705 GetThreadPtr(thread, th); 706 if (th->first_args) { 707 VALUE proc = th->first_proc, line, loc; 708 const char *file; 709 if (!proc || !RTEST(loc = rb_proc_location(proc))) { 710 rb_raise(rb_eThreadError, "already initialized thread"); 711 } 712 file = RSTRING_PTR(RARRAY_PTR(loc)[0]); 713 if (NIL_P(line = RARRAY_PTR(loc)[1])) { 714 rb_raise(rb_eThreadError, "already initialized thread - %s", 715 file); 716 } 717 rb_raise(rb_eThreadError, "already initialized thread - %s:%d", 718 file, NUM2INT(line)); 719 } 720 return thread_create_core(thread, args, 0); 721} 722 723VALUE 724rb_thread_create(VALUE (*fn)(ANYARGS), void *arg) 725{ 726 return thread_create_core(rb_thread_alloc(rb_cThread), (VALUE)arg, fn); 727} 728 729 730/* +infty, for this purpose */ 731#define DELAY_INFTY 1E30 732 733struct join_arg { 734 rb_thread_t *target, *waiting; 735 double limit; 736 int forever; 737}; 738 739static VALUE 740remove_from_join_list(VALUE arg) 741{ 742 struct join_arg *p = (struct join_arg *)arg; 743 rb_thread_t *target_th = p->target, *th = p->waiting; 744 745 if (target_th->status != THREAD_KILLED) { 746 rb_thread_list_t **p = &target_th->join_list; 747 748 while (*p) { 749 if ((*p)->th == th) { 750 *p = (*p)->next; 751 break; 752 } 753 p = &(*p)->next; 754 } 755 } 756 757 return Qnil; 758} 759 760static VALUE 761thread_join_sleep(VALUE arg) 762{ 763 struct join_arg *p = (struct join_arg *)arg; 764 rb_thread_t *target_th = p->target, *th = p->waiting; 765 double now, limit = p->limit; 766 767 while (target_th->status != THREAD_KILLED) { 768 if (p->forever) { 769 sleep_forever(th, 1, 0); 770 } 771 else { 772 now = timeofday(); 773 if (now > limit) { 774 thread_debug("thread_join: timeout (thid: %p)\n", 775 (void *)target_th->thread_id); 776 return Qfalse; 777 } 778 sleep_wait_for_interrupt(th, limit - now, 0); 779 } 780 thread_debug("thread_join: interrupted (thid: %p)\n", 781 (void *)target_th->thread_id); 782 } 783 return Qtrue; 784} 785 786static VALUE 787thread_join(rb_thread_t *target_th, double delay) 788{ 789 rb_thread_t *th = GET_THREAD(); 790 struct join_arg arg; 791 792 if (th == target_th) { 793 rb_raise(rb_eThreadError, "Target thread must not be current thread"); 794 } 795 if (GET_VM()->main_thread == target_th) { 796 rb_raise(rb_eThreadError, "Target thread must not be main thread"); 797 } 798 799 arg.target = target_th; 800 arg.waiting = th; 801 arg.limit = timeofday() + delay; 802 arg.forever = delay == DELAY_INFTY; 803 804 thread_debug("thread_join (thid: %p)\n", (void *)target_th->thread_id); 805 806 if (target_th->status != THREAD_KILLED) { 807 rb_thread_list_t list; 808 list.next = target_th->join_list; 809 list.th = th; 810 target_th->join_list = &list; 811 if (!rb_ensure(thread_join_sleep, (VALUE)&arg, 812 remove_from_join_list, (VALUE)&arg)) { 813 return Qnil; 814 } 815 } 816 817 thread_debug("thread_join: success (thid: %p)\n", 818 (void *)target_th->thread_id); 819 820 if (target_th->errinfo != Qnil) { 821 VALUE err = target_th->errinfo; 822 823 if (FIXNUM_P(err)) { 824 /* */ 825 } 826 else if (RB_TYPE_P(target_th->errinfo, T_NODE)) { 827 rb_exc_raise(rb_vm_make_jump_tag_but_local_jump( 828 GET_THROWOBJ_STATE(err), GET_THROWOBJ_VAL(err))); 829 } 830 else { 831 /* normal exception */ 832 rb_exc_raise(err); 833 } 834 } 835 return target_th->self; 836} 837 838/* 839 * call-seq: 840 * thr.join -> thr 841 * thr.join(limit) -> thr 842 * 843 * The calling thread will suspend execution and run <i>thr</i>. Does not 844 * return until <i>thr</i> exits or until <i>limit</i> seconds have passed. If 845 * the time limit expires, <code>nil</code> will be returned, otherwise 846 * <i>thr</i> is returned. 847 * 848 * Any threads not joined will be killed when the main program exits. If 849 * <i>thr</i> had previously raised an exception and the 850 * <code>abort_on_exception</code> and <code>$DEBUG</code> flags are not set 851 * (so the exception has not yet been processed) it will be processed at this 852 * time. 853 * 854 * a = Thread.new { print "a"; sleep(10); print "b"; print "c" } 855 * x = Thread.new { print "x"; Thread.pass; print "y"; print "z" } 856 * x.join # Let x thread finish, a will be killed on exit. 857 * 858 * <em>produces:</em> 859 * 860 * axyz 861 * 862 * The following example illustrates the <i>limit</i> parameter. 863 * 864 * y = Thread.new { 4.times { sleep 0.1; puts 'tick... ' }} 865 * puts "Waiting" until y.join(0.15) 866 * 867 * <em>produces:</em> 868 * 869 * tick... 870 * Waiting 871 * tick... 872 * Waitingtick... 873 * 874 * 875 * tick... 876 */ 877 878static VALUE 879thread_join_m(int argc, VALUE *argv, VALUE self) 880{ 881 rb_thread_t *target_th; 882 double delay = DELAY_INFTY; 883 VALUE limit; 884 885 GetThreadPtr(self, target_th); 886 887 rb_scan_args(argc, argv, "01", &limit); 888 if (!NIL_P(limit)) { 889 delay = rb_num2dbl(limit); 890 } 891 892 return thread_join(target_th, delay); 893} 894 895/* 896 * call-seq: 897 * thr.value -> obj 898 * 899 * Waits for <i>thr</i> to complete (via <code>Thread#join</code>) and returns 900 * its value. 901 * 902 * a = Thread.new { 2 + 2 } 903 * a.value #=> 4 904 */ 905 906static VALUE 907thread_value(VALUE self) 908{ 909 rb_thread_t *th; 910 GetThreadPtr(self, th); 911 thread_join(th, DELAY_INFTY); 912 return th->value; 913} 914 915/* 916 * Thread Scheduling 917 */ 918 919static struct timeval 920double2timeval(double d) 921{ 922 struct timeval time; 923 924 if (isinf(d)) { 925 time.tv_sec = TIMET_MAX; 926 time.tv_usec = 0; 927 return time; 928 } 929 930 time.tv_sec = (int)d; 931 time.tv_usec = (int)((d - (int)d) * 1e6); 932 if (time.tv_usec < 0) { 933 time.tv_usec += (int)1e6; 934 time.tv_sec -= 1; 935 } 936 return time; 937} 938 939static void 940sleep_forever(rb_thread_t *th, int deadlockable, int spurious_check) 941{ 942 enum rb_thread_status prev_status = th->status; 943 enum rb_thread_status status = deadlockable ? THREAD_STOPPED_FOREVER : THREAD_STOPPED; 944 945 th->status = status; 946 RUBY_VM_CHECK_INTS_BLOCKING(th); 947 while (th->status == status) { 948 if (deadlockable) { 949 th->vm->sleeper++; 950 rb_check_deadlock(th->vm); 951 } 952 native_sleep(th, 0); 953 if (deadlockable) { 954 th->vm->sleeper--; 955 } 956 RUBY_VM_CHECK_INTS_BLOCKING(th); 957 if (!spurious_check) 958 break; 959 } 960 th->status = prev_status; 961} 962 963static void 964getclockofday(struct timeval *tp) 965{ 966#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 967 struct timespec ts; 968 969 if (clock_gettime(CLOCK_MONOTONIC, &ts) == 0) { 970 tp->tv_sec = ts.tv_sec; 971 tp->tv_usec = ts.tv_nsec / 1000; 972 } else 973#endif 974 { 975 gettimeofday(tp, NULL); 976 } 977} 978 979static void 980sleep_timeval(rb_thread_t *th, struct timeval tv, int spurious_check) 981{ 982 struct timeval to, tvn; 983 enum rb_thread_status prev_status = th->status; 984 985 getclockofday(&to); 986 if (TIMET_MAX - tv.tv_sec < to.tv_sec) 987 to.tv_sec = TIMET_MAX; 988 else 989 to.tv_sec += tv.tv_sec; 990 if ((to.tv_usec += tv.tv_usec) >= 1000000) { 991 if (to.tv_sec == TIMET_MAX) 992 to.tv_usec = 999999; 993 else { 994 to.tv_sec++; 995 to.tv_usec -= 1000000; 996 } 997 } 998 999 th->status = THREAD_STOPPED; 1000 RUBY_VM_CHECK_INTS_BLOCKING(th); 1001 while (th->status == THREAD_STOPPED) { 1002 native_sleep(th, &tv); 1003 RUBY_VM_CHECK_INTS_BLOCKING(th); 1004 getclockofday(&tvn); 1005 if (to.tv_sec < tvn.tv_sec) break; 1006 if (to.tv_sec == tvn.tv_sec && to.tv_usec <= tvn.tv_usec) break; 1007 thread_debug("sleep_timeval: %ld.%.6ld > %ld.%.6ld\n", 1008 (long)to.tv_sec, (long)to.tv_usec, 1009 (long)tvn.tv_sec, (long)tvn.tv_usec); 1010 tv.tv_sec = to.tv_sec - tvn.tv_sec; 1011 if ((tv.tv_usec = to.tv_usec - tvn.tv_usec) < 0) { 1012 --tv.tv_sec; 1013 tv.tv_usec += 1000000; 1014 } 1015 if (!spurious_check) 1016 break; 1017 } 1018 th->status = prev_status; 1019} 1020 1021void 1022rb_thread_sleep_forever(void) 1023{ 1024 thread_debug("rb_thread_sleep_forever\n"); 1025 sleep_forever(GET_THREAD(), 0, 1); 1026} 1027 1028static void 1029rb_thread_sleep_deadly(void) 1030{ 1031 thread_debug("rb_thread_sleep_deadly\n"); 1032 sleep_forever(GET_THREAD(), 1, 1); 1033} 1034 1035static double 1036timeofday(void) 1037{ 1038#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_MONOTONIC) 1039 struct timespec tp; 1040 1041 if (clock_gettime(CLOCK_MONOTONIC, &tp) == 0) { 1042 return (double)tp.tv_sec + (double)tp.tv_nsec * 1e-9; 1043 } else 1044#endif 1045 { 1046 struct timeval tv; 1047 gettimeofday(&tv, NULL); 1048 return (double)tv.tv_sec + (double)tv.tv_usec * 1e-6; 1049 } 1050} 1051 1052static void 1053sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check) 1054{ 1055 sleep_timeval(th, double2timeval(sleepsec), spurious_check); 1056} 1057 1058static void 1059sleep_for_polling(rb_thread_t *th) 1060{ 1061 struct timeval time; 1062 time.tv_sec = 0; 1063 time.tv_usec = 100 * 1000; /* 0.1 sec */ 1064 sleep_timeval(th, time, 1); 1065} 1066 1067void 1068rb_thread_wait_for(struct timeval time) 1069{ 1070 rb_thread_t *th = GET_THREAD(); 1071 sleep_timeval(th, time, 1); 1072} 1073 1074void 1075rb_thread_polling(void) 1076{ 1077 if (!rb_thread_alone()) { 1078 rb_thread_t *th = GET_THREAD(); 1079 RUBY_VM_CHECK_INTS_BLOCKING(th); 1080 sleep_for_polling(th); 1081 } 1082} 1083 1084/* 1085 * CAUTION: This function causes thread switching. 1086 * rb_thread_check_ints() check ruby's interrupts. 1087 * some interrupt needs thread switching/invoke handlers, 1088 * and so on. 1089 */ 1090 1091void 1092rb_thread_check_ints(void) 1093{ 1094 RUBY_VM_CHECK_INTS_BLOCKING(GET_THREAD()); 1095} 1096 1097/* 1098 * Hidden API for tcl/tk wrapper. 1099 * There is no guarantee to perpetuate it. 1100 */ 1101int 1102rb_thread_check_trap_pending(void) 1103{ 1104 return rb_signal_buff_size() != 0; 1105} 1106 1107/* This function can be called in blocking region. */ 1108int 1109rb_thread_interrupted(VALUE thval) 1110{ 1111 rb_thread_t *th; 1112 GetThreadPtr(thval, th); 1113 return (int)RUBY_VM_INTERRUPTED(th); 1114} 1115 1116void 1117rb_thread_sleep(int sec) 1118{ 1119 rb_thread_wait_for(rb_time_timeval(INT2FIX(sec))); 1120} 1121 1122static void 1123rb_thread_schedule_limits(unsigned long limits_us) 1124{ 1125 thread_debug("rb_thread_schedule\n"); 1126 if (!rb_thread_alone()) { 1127 rb_thread_t *th = GET_THREAD(); 1128 1129 if (th->running_time_us >= limits_us) { 1130 thread_debug("rb_thread_schedule/switch start\n"); 1131 RB_GC_SAVE_MACHINE_CONTEXT(th); 1132 gvl_yield(th->vm, th); 1133 rb_thread_set_current(th); 1134 thread_debug("rb_thread_schedule/switch done\n"); 1135 } 1136 } 1137} 1138 1139void 1140rb_thread_schedule(void) 1141{ 1142 rb_thread_t *cur_th = GET_THREAD(); 1143 rb_thread_schedule_limits(0); 1144 1145 if (UNLIKELY(RUBY_VM_INTERRUPTED_ANY(cur_th))) { 1146 rb_threadptr_execute_interrupts(cur_th, 0); 1147 } 1148} 1149 1150/* blocking region */ 1151 1152static inline int 1153blocking_region_begin(rb_thread_t *th, struct rb_blocking_region_buffer *region, 1154 rb_unblock_function_t *ubf, void *arg, int fail_if_interrupted) 1155{ 1156 region->prev_status = th->status; 1157 if (set_unblock_function(th, ubf, arg, ®ion->oldubf, fail_if_interrupted)) { 1158 th->blocking_region_buffer = region; 1159 th->status = THREAD_STOPPED; 1160 thread_debug("enter blocking region (%p)\n", (void *)th); 1161 RB_GC_SAVE_MACHINE_CONTEXT(th); 1162 gvl_release(th->vm); 1163 return TRUE; 1164 } 1165 else { 1166 return FALSE; 1167 } 1168} 1169 1170static inline void 1171blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region) 1172{ 1173 gvl_acquire(th->vm, th); 1174 rb_thread_set_current(th); 1175 thread_debug("leave blocking region (%p)\n", (void *)th); 1176 remove_signal_thread_list(th); 1177 th->blocking_region_buffer = 0; 1178 reset_unblock_function(th, ®ion->oldubf); 1179 if (th->status == THREAD_STOPPED) { 1180 th->status = region->prev_status; 1181 } 1182} 1183 1184struct rb_blocking_region_buffer * 1185rb_thread_blocking_region_begin(void) 1186{ 1187 rb_thread_t *th = GET_THREAD(); 1188 struct rb_blocking_region_buffer *region = ALLOC(struct rb_blocking_region_buffer); 1189 blocking_region_begin(th, region, ubf_select, th, FALSE); 1190 return region; 1191} 1192 1193void 1194rb_thread_blocking_region_end(struct rb_blocking_region_buffer *region) 1195{ 1196 int saved_errno = errno; 1197 rb_thread_t *th = ruby_thread_from_native(); 1198 blocking_region_end(th, region); 1199 xfree(region); 1200 RUBY_VM_CHECK_INTS_BLOCKING(th); 1201 errno = saved_errno; 1202} 1203 1204static void * 1205call_without_gvl(void *(*func)(void *), void *data1, 1206 rb_unblock_function_t *ubf, void *data2, int fail_if_interrupted) 1207{ 1208 void *val = 0; 1209 1210 rb_thread_t *th = GET_THREAD(); 1211 int saved_errno = 0; 1212 1213 th->waiting_fd = -1; 1214 if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { 1215 ubf = ubf_select; 1216 data2 = th; 1217 } 1218 1219 BLOCKING_REGION({ 1220 val = func(data1); 1221 saved_errno = errno; 1222 }, ubf, data2, fail_if_interrupted); 1223 1224 if (!fail_if_interrupted) { 1225 RUBY_VM_CHECK_INTS_BLOCKING(th); 1226 } 1227 1228 errno = saved_errno; 1229 1230 return val; 1231} 1232 1233/* 1234 * rb_thread_call_without_gvl - permit concurrent/parallel execution. 1235 * rb_thread_call_without_gvl2 - permit concurrent/parallel execution 1236 * without interrupt proceess. 1237 * 1238 * rb_thread_call_without_gvl() does: 1239 * (1) Check interrupts. 1240 * (2) release GVL. 1241 * Other Ruby threads may run in parallel. 1242 * (3) call func with data1 1243 * (4) acquire GVL. 1244 * Other Ruby threads can not run in parallel any more. 1245 * (5) Check interrupts. 1246 * 1247 * rb_thread_call_without_gvl2() does: 1248 * (1) Check interrupt and return if interrupted. 1249 * (2) release GVL. 1250 * (3) call func with data1 and a pointer to the flags. 1251 * (4) acquire GVL. 1252 * 1253 * If another thread interrupts this thread (Thread#kill, signal delivery, 1254 * VM-shutdown request, and so on), `ubf()' is called (`ubf()' means 1255 * "un-blocking function"). `ubf()' should interrupt `func()' execution by 1256 * toggling a cancellation flag, canceling the invocation of a call inside 1257 * `func()' or similar. Note that `ubf()' may not be called with the GVL. 1258 * 1259 * There are built-in ubfs and you can specify these ubfs: 1260 * 1261 * * RUBY_UBF_IO: ubf for IO operation 1262 * * RUBY_UBF_PROCESS: ubf for process operation 1263 * 1264 * However, we can not guarantee our built-in ubfs interrupt your `func()' 1265 * correctly. Be careful to use rb_thread_call_without_gvl(). If you don't 1266 * provide proper ubf(), your program will not stop for Control+C or other 1267 * shutdown events. 1268 * 1269 * "Check interrupts" on above list means that check asynchronous 1270 * interrupt events (such as Thread#kill, signal delivery, VM-shutdown 1271 * request, and so on) and call corresponding procedures 1272 * (such as `trap' for signals, raise an exception for Thread#raise). 1273 * If `func()' finished and receive interrupts, you may skip interrupt 1274 * checking. For example, assume the following func() it read data from file. 1275 * 1276 * read_func(...) { 1277 * // (a) before read 1278 * read(buffer); // (b) reading 1279 * // (c) after read 1280 * } 1281 * 1282 * If an interrupt occurs at (a) or (b), then `ubf()' cancels this 1283 * `read_func()' and interrupts are checked. However, if an interrupt occurs 1284 * at (c), after *read* operation is completed, check intterrupts is harmful 1285 * because it causes irrevocable side-effect, the read data will vanish. To 1286 * avoid such problem, the `read_func()' should be used with 1287 * `rb_thread_call_without_gvl2()'. 1288 * 1289 * If `rb_thread_call_without_gvl2()' detects interrupt, return its execution 1290 * immediately. This function does not show when the execution was interrupted. 1291 * For example, there are 4 possible timing (a), (b), (c) and before calling 1292 * read_func(). You need to record progress of a read_func() and check 1293 * the progress after `rb_thread_call_without_gvl2()'. You may need to call 1294 * `rb_thread_check_ints()' correctly or your program can not process proper 1295 * process such as `trap' and so on. 1296 * 1297 * NOTE: You can not execute most of Ruby C API and touch Ruby 1298 * objects in `func()' and `ubf()', including raising an 1299 * exception, because current thread doesn't acquire GVL 1300 * (it causes synchronization problems). If you need to 1301 * call ruby functions either use rb_thread_call_with_gvl() 1302 * or read source code of C APIs and confirm safety by 1303 * yourself. 1304 * 1305 * NOTE: In short, this API is difficult to use safely. I recommend you 1306 * use other ways if you have. We lack experiences to use this API. 1307 * Please report your problem related on it. 1308 * 1309 * NOTE: Releasing GVL and re-acquiring GVL may be expensive operations 1310 * for a short running `func()'. Be sure to benchmark and use this 1311 * mechanism when `func()' consumes enough time. 1312 * 1313 * Safe C API: 1314 * * rb_thread_interrupted() - check interrupt flag 1315 * * ruby_xmalloc(), ruby_xrealloc(), ruby_xfree() - 1316 * they will work without GVL, and may acquire GVL when GC is needed. 1317 */ 1318void * 1319rb_thread_call_without_gvl2(void *(*func)(void *), void *data1, 1320 rb_unblock_function_t *ubf, void *data2) 1321{ 1322 return call_without_gvl(func, data1, ubf, data2, TRUE); 1323} 1324 1325void * 1326rb_thread_call_without_gvl(void *(*func)(void *data), void *data1, 1327 rb_unblock_function_t *ubf, void *data2) 1328{ 1329 return call_without_gvl(func, data1, ubf, data2, FALSE); 1330} 1331 1332VALUE 1333rb_thread_io_blocking_region(rb_blocking_function_t *func, void *data1, int fd) 1334{ 1335 VALUE val = Qundef; /* shouldn't be used */ 1336 rb_thread_t *th = GET_THREAD(); 1337 int saved_errno = 0; 1338 int state; 1339 1340 th->waiting_fd = fd; 1341 1342 TH_PUSH_TAG(th); 1343 if ((state = EXEC_TAG()) == 0) { 1344 BLOCKING_REGION({ 1345 val = func(data1); 1346 saved_errno = errno; 1347 }, ubf_select, th, FALSE); 1348 } 1349 TH_POP_TAG(); 1350 1351 /* clear waitinf_fd anytime */ 1352 th->waiting_fd = -1; 1353 1354 if (state) { 1355 JUMP_TAG(state); 1356 } 1357 /* TODO: check func() */ 1358 RUBY_VM_CHECK_INTS_BLOCKING(th); 1359 1360 errno = saved_errno; 1361 1362 return val; 1363} 1364 1365VALUE 1366rb_thread_blocking_region( 1367 rb_blocking_function_t *func, void *data1, 1368 rb_unblock_function_t *ubf, void *data2) 1369{ 1370 void *(*f)(void*) = (void *(*)(void*))func; 1371 return (VALUE)rb_thread_call_without_gvl(f, data1, ubf, data2); 1372} 1373 1374/* 1375 * rb_thread_call_with_gvl - re-enter the Ruby world after GVL release. 1376 * 1377 * After releasing GVL using rb_thread_blocking_region() or 1378 * rb_thread_call_without_gvl() you can not access Ruby values or invoke 1379 * methods. If you need to access Ruby you must use this function 1380 * rb_thread_call_with_gvl(). 1381 * 1382 * This function rb_thread_call_with_gvl() does: 1383 * (1) acquire GVL. 1384 * (2) call passed function `func'. 1385 * (3) release GVL. 1386 * (4) return a value which is returned at (2). 1387 * 1388 * NOTE: You should not return Ruby object at (2) because such Object 1389 * will not marked. 1390 * 1391 * NOTE: If an exception is raised in `func', this function DOES NOT 1392 * protect (catch) the exception. If you have any resources 1393 * which should free before throwing exception, you need use 1394 * rb_protect() in `func' and return a value which represents 1395 * exception is raised. 1396 * 1397 * NOTE: This function should not be called by a thread which was not 1398 * created as Ruby thread (created by Thread.new or so). In other 1399 * words, this function *DOES NOT* associate or convert a NON-Ruby 1400 * thread to a Ruby thread. 1401 */ 1402void * 1403rb_thread_call_with_gvl(void *(*func)(void *), void *data1) 1404{ 1405 rb_thread_t *th = ruby_thread_from_native(); 1406 struct rb_blocking_region_buffer *brb; 1407 struct rb_unblock_callback prev_unblock; 1408 void *r; 1409 1410 if (th == 0) { 1411 /* Error is occurred, but we can't use rb_bug() 1412 * because this thread is not Ruby's thread. 1413 * What should we do? 1414 */ 1415 1416 fprintf(stderr, "[BUG] rb_thread_call_with_gvl() is called by non-ruby thread\n"); 1417 exit(EXIT_FAILURE); 1418 } 1419 1420 brb = (struct rb_blocking_region_buffer *)th->blocking_region_buffer; 1421 prev_unblock = th->unblock; 1422 1423 if (brb == 0) { 1424 rb_bug("rb_thread_call_with_gvl: called by a thread which has GVL."); 1425 } 1426 1427 blocking_region_end(th, brb); 1428 /* enter to Ruby world: You can access Ruby values, methods and so on. */ 1429 r = (*func)(data1); 1430 /* leave from Ruby world: You can not access Ruby values, etc. */ 1431 blocking_region_begin(th, brb, prev_unblock.func, prev_unblock.arg, FALSE); 1432 return r; 1433} 1434 1435/* 1436 * ruby_thread_has_gvl_p - check if current native thread has GVL. 1437 * 1438 *** 1439 *** This API is EXPERIMENTAL! 1440 *** We do not guarantee that this API remains in ruby 1.9.2 or later. 1441 *** 1442 */ 1443 1444int 1445ruby_thread_has_gvl_p(void) 1446{ 1447 rb_thread_t *th = ruby_thread_from_native(); 1448 1449 if (th && th->blocking_region_buffer == 0) { 1450 return 1; 1451 } 1452 else { 1453 return 0; 1454 } 1455} 1456 1457/* 1458 * call-seq: 1459 * Thread.pass -> nil 1460 * 1461 * Give the thread scheduler a hint to pass execution to another thread. 1462 * A running thread may or may not switch, it depends on OS and processor. 1463 */ 1464 1465static VALUE 1466thread_s_pass(VALUE klass) 1467{ 1468 rb_thread_schedule(); 1469 return Qnil; 1470} 1471 1472/*****************************************************/ 1473 1474/* 1475 * rb_threadptr_pending_interrupt_* - manage asynchronous error queue 1476 * 1477 * Async events such as an exception throwed by Thread#raise, 1478 * Thread#kill and thread termination (after main thread termination) 1479 * will be queued to th->pending_interrupt_queue. 1480 * - clear: clear the queue. 1481 * - enque: enque err object into queue. 1482 * - deque: deque err object from queue. 1483 * - active_p: return 1 if the queue should be checked. 1484 * 1485 * All rb_threadptr_pending_interrupt_* functions are called by 1486 * a GVL acquired thread, of course. 1487 * Note that all "rb_" prefix APIs need GVL to call. 1488 */ 1489 1490void 1491rb_threadptr_pending_interrupt_clear(rb_thread_t *th) 1492{ 1493 rb_ary_clear(th->pending_interrupt_queue); 1494} 1495 1496void 1497rb_threadptr_pending_interrupt_enque(rb_thread_t *th, VALUE v) 1498{ 1499 rb_ary_push(th->pending_interrupt_queue, v); 1500 th->pending_interrupt_queue_checked = 0; 1501} 1502 1503enum handle_interrupt_timing { 1504 INTERRUPT_NONE, 1505 INTERRUPT_IMMEDIATE, 1506 INTERRUPT_ON_BLOCKING, 1507 INTERRUPT_NEVER 1508}; 1509 1510static enum handle_interrupt_timing 1511rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err) 1512{ 1513 VALUE mask; 1514 long mask_stack_len = RARRAY_LEN(th->pending_interrupt_mask_stack); 1515 VALUE *mask_stack = RARRAY_PTR(th->pending_interrupt_mask_stack); 1516 VALUE ancestors = rb_mod_ancestors(err); /* TODO: GC guard */ 1517 long ancestors_len = RARRAY_LEN(ancestors); 1518 VALUE *ancestors_ptr = RARRAY_PTR(ancestors); 1519 int i, j; 1520 1521 for (i=0; i<mask_stack_len; i++) { 1522 mask = mask_stack[mask_stack_len-(i+1)]; 1523 1524 for (j=0; j<ancestors_len; j++) { 1525 VALUE klass = ancestors_ptr[j]; 1526 VALUE sym; 1527 1528 /* TODO: remove rb_intern() */ 1529 if ((sym = rb_hash_aref(mask, klass)) != Qnil) { 1530 if (sym == sym_immediate) { 1531 return INTERRUPT_IMMEDIATE; 1532 } 1533 else if (sym == sym_on_blocking) { 1534 return INTERRUPT_ON_BLOCKING; 1535 } 1536 else if (sym == sym_never) { 1537 return INTERRUPT_NEVER; 1538 } 1539 else { 1540 rb_raise(rb_eThreadError, "unknown mask signature"); 1541 } 1542 } 1543 } 1544 /* try to next mask */ 1545 } 1546 return INTERRUPT_NONE; 1547} 1548 1549static int 1550rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th) 1551{ 1552 return RARRAY_LEN(th->pending_interrupt_queue) == 0; 1553} 1554 1555static int 1556rb_threadptr_pending_interrupt_include_p(rb_thread_t *th, VALUE err) 1557{ 1558 int i; 1559 for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) { 1560 VALUE e = RARRAY_PTR(th->pending_interrupt_queue)[i]; 1561 if (rb_class_inherited_p(e, err)) { 1562 return TRUE; 1563 } 1564 } 1565 return FALSE; 1566} 1567 1568static VALUE 1569rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timing timing) 1570{ 1571#if 1 /* 1 to enable Thread#handle_interrupt, 0 to ignore it */ 1572 int i; 1573 1574 for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) { 1575 VALUE err = RARRAY_PTR(th->pending_interrupt_queue)[i]; 1576 1577 enum handle_interrupt_timing mask_timing = rb_threadptr_pending_interrupt_check_mask(th, CLASS_OF(err)); 1578 1579 switch (mask_timing) { 1580 case INTERRUPT_ON_BLOCKING: 1581 if (timing != INTERRUPT_ON_BLOCKING) { 1582 break; 1583 } 1584 /* fall through */ 1585 case INTERRUPT_NONE: /* default: IMMEDIATE */ 1586 case INTERRUPT_IMMEDIATE: 1587 rb_ary_delete_at(th->pending_interrupt_queue, i); 1588 return err; 1589 case INTERRUPT_NEVER: 1590 break; 1591 } 1592 } 1593 1594 th->pending_interrupt_queue_checked = 1; 1595 return Qundef; 1596#else 1597 VALUE err = rb_ary_shift(th->pending_interrupt_queue); 1598 if (rb_threadptr_pending_interrupt_empty_p(th)) { 1599 th->pending_interrupt_queue_checked = 1; 1600 } 1601 return err; 1602#endif 1603} 1604 1605int 1606rb_threadptr_pending_interrupt_active_p(rb_thread_t *th) 1607{ 1608 /* 1609 * For optimization, we don't check async errinfo queue 1610 * if it nor a thread interrupt mask were not changed 1611 * since last check. 1612 */ 1613 if (th->pending_interrupt_queue_checked) { 1614 return 0; 1615 } 1616 1617 if (rb_threadptr_pending_interrupt_empty_p(th)) { 1618 return 0; 1619 } 1620 1621 return 1; 1622} 1623 1624static int 1625handle_interrupt_arg_check_i(VALUE key, VALUE val) 1626{ 1627 if (val != sym_immediate && val != sym_on_blocking && val != sym_never) { 1628 rb_raise(rb_eArgError, "unknown mask signature"); 1629 } 1630 1631 return ST_CONTINUE; 1632} 1633 1634/* 1635 * call-seq: 1636 * Thread.handle_interrupt(hash) { ... } -> result of the block 1637 * 1638 * Changes asynchronous interrupt timing. 1639 * 1640 * _interrupt_ means asynchronous event and corresponding procedure 1641 * by Thread#raise, Thread#kill, signal trap (not supported yet) 1642 * and main thread termination (if main thread terminates, then all 1643 * other thread will be killed). 1644 * 1645 * The given +hash+ has pairs like <code>ExceptionClass => 1646 * :TimingSymbol</code>. Where the ExceptionClass is the interrupt handled by 1647 * the given block. The TimingSymbol can be one of the following symbols: 1648 * 1649 * [+:immediate+] Invoke interrupts immediately. 1650 * [+:on_blocking+] Invoke interrupts while _BlockingOperation_. 1651 * [+:never+] Never invoke all interrupts. 1652 * 1653 * _BlockingOperation_ means that the operation will block the calling thread, 1654 * such as read and write. On CRuby implementation, _BlockingOperation_ is any 1655 * operation executed without GVL. 1656 * 1657 * Masked asynchronous interrupts are delayed until they are enabled. 1658 * This method is similar to sigprocmask(3). 1659 * 1660 * === NOTE 1661 * 1662 * Asynchronous interrupts are difficult to use. 1663 * 1664 * If you need to communicate between threads, please consider to use another way such as Queue. 1665 * 1666 * Or use them with deep understanding about this method. 1667 * 1668 * === Usage 1669 * 1670 * In this example, we can guard from Thread#raise exceptions. 1671 * 1672 * Using the +:never+ TimingSymbol the RuntimeError exception will always be 1673 * ignored in the first block of the main thread. In the second 1674 * ::handle_interrupt block we can purposefully handle RuntimeError exceptions. 1675 * 1676 * th = Thread.new do 1677 * Thead.handle_interrupt(RuntimeError => :never) { 1678 * begin 1679 * # You can write resource allocation code safely. 1680 * Thread.handle_interrupt(RuntimeError => :immediate) { 1681 * # ... 1682 * } 1683 * ensure 1684 * # You can write resource deallocation code safely. 1685 * end 1686 * } 1687 * end 1688 * Thread.pass 1689 * # ... 1690 * th.raise "stop" 1691 * 1692 * While we are ignoring the RuntimeError exception, it's safe to write our 1693 * resource allocation code. Then, the ensure block is where we can safely 1694 * deallocate your resources. 1695 * 1696 * ==== Guarding from TimeoutError 1697 * 1698 * In the next example, we will guard from the TimeoutError exception. This 1699 * will help prevent from leaking resources when TimeoutError exceptions occur 1700 * during normal ensure clause. For this example we use the help of the 1701 * standard library Timeout, from lib/timeout.rb 1702 * 1703 * require 'timeout' 1704 * Thread.handle_interrupt(TimeoutError => :never) { 1705 * timeout(10){ 1706 * # TimeoutError doesn't occur here 1707 * Thread.handle_interrupt(TimeoutError => :on_blocking) { 1708 * # possible to be killed by TimeoutError 1709 * # while blocking operation 1710 * } 1711 * # TimeoutError doesn't occur here 1712 * } 1713 * } 1714 * 1715 * In the first part of the +timeout+ block, we can rely on TimeoutError being 1716 * ignored. Then in the <code>TimeoutError => :on_blocking</code> block, any 1717 * operation that will block the calling thread is susceptible to a 1718 * TimeoutError exception being raised. 1719 * 1720 * ==== Stack control settings 1721 * 1722 * It's possible to stack multiple levels of ::handle_interrupt blocks in order 1723 * to control more than one ExceptionClass and TimingSymbol at a time. 1724 * 1725 * Thread.handle_interrupt(FooError => :never) { 1726 * Thread.handle_interrupt(BarError => :never) { 1727 * # FooError and BarError are prohibited. 1728 * } 1729 * } 1730 * 1731 * ==== Inheritance with ExceptionClass 1732 * 1733 * All exceptions inherited from the ExceptionClass parameter will be considered. 1734 * 1735 * Thread.handle_interrupt(Exception => :never) { 1736 * # all exceptions inherited from Exception are prohibited. 1737 * } 1738 * 1739 */ 1740static VALUE 1741rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg) 1742{ 1743 VALUE mask; 1744 rb_thread_t *th = GET_THREAD(); 1745 VALUE r = Qnil; 1746 int state; 1747 1748 if (!rb_block_given_p()) { 1749 rb_raise(rb_eArgError, "block is needed."); 1750 } 1751 1752 mask = rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash"); 1753 rb_hash_foreach(mask, handle_interrupt_arg_check_i, 0); 1754 rb_ary_push(th->pending_interrupt_mask_stack, mask); 1755 if (!rb_threadptr_pending_interrupt_empty_p(th)) { 1756 th->pending_interrupt_queue_checked = 0; 1757 RUBY_VM_SET_INTERRUPT(th); 1758 } 1759 1760 TH_PUSH_TAG(th); 1761 if ((state = EXEC_TAG()) == 0) { 1762 r = rb_yield(Qnil); 1763 } 1764 TH_POP_TAG(); 1765 1766 rb_ary_pop(th->pending_interrupt_mask_stack); 1767 if (!rb_threadptr_pending_interrupt_empty_p(th)) { 1768 th->pending_interrupt_queue_checked = 0; 1769 RUBY_VM_SET_INTERRUPT(th); 1770 } 1771 1772 RUBY_VM_CHECK_INTS(th); 1773 1774 if (state) { 1775 JUMP_TAG(state); 1776 } 1777 1778 return r; 1779} 1780 1781/* 1782 * call-seq: 1783 * target_thread.pending_interrupt?(error = nil) -> true/false 1784 * 1785 * Returns whether or not the asychronous queue is empty for the target thread. 1786 * 1787 * If +error+ is given, then check only for +error+ type deferred events. 1788 * 1789 * See ::pending_interrupt? for more information. 1790 */ 1791static VALUE 1792rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread) 1793{ 1794 rb_thread_t *target_th; 1795 1796 GetThreadPtr(target_thread, target_th); 1797 1798 if (rb_threadptr_pending_interrupt_empty_p(target_th)) { 1799 return Qfalse; 1800 } 1801 else { 1802 if (argc == 1) { 1803 VALUE err; 1804 rb_scan_args(argc, argv, "01", &err); 1805 if (!rb_obj_is_kind_of(err, rb_cModule)) { 1806 rb_raise(rb_eTypeError, "class or module required for rescue clause"); 1807 } 1808 if (rb_threadptr_pending_interrupt_include_p(target_th, err)) { 1809 return Qtrue; 1810 } 1811 else { 1812 return Qfalse; 1813 } 1814 } 1815 return Qtrue; 1816 } 1817} 1818 1819/* 1820 * call-seq: 1821 * Thread.pending_interrupt?(error = nil) -> true/false 1822 * 1823 * Returns whether or not the asynchronous queue is empty. 1824 * 1825 * Since Thread::handle_interrupt can be used to defer asynchronous events. 1826 * This method can be used to determine if there are any deferred events. 1827 * 1828 * If you find this method returns true, then you may finish +:never+ blocks. 1829 * 1830 * For example, the following method processes deferred asynchronous events 1831 * immediately. 1832 * 1833 * def Thread.kick_interrupt_immediately 1834 * Thread.handle_interrupt(Object => :immediate) { 1835 * Thread.pass 1836 * } 1837 * end 1838 * 1839 * If +error+ is given, then check only for +error+ type deferred events. 1840 * 1841 * === Usage 1842 * 1843 * th = Thread.new{ 1844 * Thread.handle_interrupt(RuntimeError => :on_blocking){ 1845 * while true 1846 * ... 1847 * # reach safe point to invoke interrupt 1848 * if Thread.pending_interrupt? 1849 * Thread.handle_interrupt(Object => :immediate){} 1850 * end 1851 * ... 1852 * end 1853 * } 1854 * } 1855 * ... 1856 * th.raise # stop thread 1857 * 1858 * This example can also be written as the following, which you should use to 1859 * avoid asynchronous interrupts. 1860 * 1861 * flag = true 1862 * th = Thread.new{ 1863 * Thread.handle_interrupt(RuntimeError => :on_blocking){ 1864 * while true 1865 * ... 1866 * # reach safe point to invoke interrupt 1867 * break if flag == false 1868 * ... 1869 * end 1870 * } 1871 * } 1872 * ... 1873 * flag = false # stop thread 1874 */ 1875 1876static VALUE 1877rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self) 1878{ 1879 return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self); 1880} 1881 1882static void 1883rb_threadptr_to_kill(rb_thread_t *th) 1884{ 1885 rb_threadptr_pending_interrupt_clear(th); 1886 th->status = THREAD_RUNNABLE; 1887 th->to_kill = 1; 1888 th->errinfo = INT2FIX(TAG_FATAL); 1889 TH_JUMP_TAG(th, TAG_FATAL); 1890} 1891 1892void 1893rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing) 1894{ 1895 if (th->raised_flag) return; 1896 1897 while (1) { 1898 rb_atomic_t interrupt; 1899 rb_atomic_t old; 1900 int sig; 1901 int timer_interrupt; 1902 int pending_interrupt; 1903 int finalizer_interrupt; 1904 int trap_interrupt; 1905 1906 do { 1907 interrupt = th->interrupt_flag; 1908 old = ATOMIC_CAS(th->interrupt_flag, interrupt, interrupt & th->interrupt_mask); 1909 } while (old != interrupt); 1910 1911 interrupt &= (rb_atomic_t)~th->interrupt_mask; 1912 if (!interrupt) 1913 return; 1914 1915 timer_interrupt = interrupt & TIMER_INTERRUPT_MASK; 1916 pending_interrupt = interrupt & PENDING_INTERRUPT_MASK; 1917 finalizer_interrupt = interrupt & FINALIZER_INTERRUPT_MASK; 1918 trap_interrupt = interrupt & TRAP_INTERRUPT_MASK; 1919 1920 /* signal handling */ 1921 if (trap_interrupt && (th == th->vm->main_thread)) { 1922 enum rb_thread_status prev_status = th->status; 1923 th->status = THREAD_RUNNABLE; 1924 while ((sig = rb_get_next_signal()) != 0) { 1925 rb_signal_exec(th, sig); 1926 } 1927 th->status = prev_status; 1928 } 1929 1930 /* exception from another thread */ 1931 if (pending_interrupt && rb_threadptr_pending_interrupt_active_p(th)) { 1932 VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE); 1933 thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err); 1934 1935 if (err == Qundef) { 1936 /* no error */ 1937 } 1938 else if (err == eKillSignal /* Thread#kill receieved */ || 1939 err == eTerminateSignal /* Terminate thread */ || 1940 err == INT2FIX(TAG_FATAL) /* Thread.exit etc. */ ) { 1941 rb_threadptr_to_kill(th); 1942 } 1943 else { 1944 /* set runnable if th was slept. */ 1945 if (th->status == THREAD_STOPPED || 1946 th->status == THREAD_STOPPED_FOREVER) 1947 th->status = THREAD_RUNNABLE; 1948 rb_exc_raise(err); 1949 } 1950 } 1951 1952 if (finalizer_interrupt) { 1953 rb_gc_finalize_deferred(); 1954 } 1955 1956 if (timer_interrupt) { 1957 unsigned long limits_us = TIME_QUANTUM_USEC; 1958 1959 if (th->priority > 0) 1960 limits_us <<= th->priority; 1961 else 1962 limits_us >>= -th->priority; 1963 1964 if (th->status == THREAD_RUNNABLE) 1965 th->running_time_us += TIME_QUANTUM_USEC; 1966 1967 EXEC_EVENT_HOOK(th, RUBY_EVENT_SWITCH, th->cfp->self, 0, 0, Qundef); 1968 1969 rb_thread_schedule_limits(limits_us); 1970 } 1971 } 1972} 1973 1974void 1975rb_thread_execute_interrupts(VALUE thval) 1976{ 1977 rb_thread_t *th; 1978 GetThreadPtr(thval, th); 1979 rb_threadptr_execute_interrupts(th, 1); 1980} 1981 1982static void 1983rb_threadptr_ready(rb_thread_t *th) 1984{ 1985 rb_threadptr_interrupt(th); 1986} 1987 1988static VALUE 1989rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv) 1990{ 1991 VALUE exc; 1992 1993 if (rb_threadptr_dead(th)) { 1994 return Qnil; 1995 } 1996 1997 if (argc == 0) { 1998 exc = rb_exc_new(rb_eRuntimeError, 0, 0); 1999 } 2000 else { 2001 exc = rb_make_exception(argc, argv); 2002 } 2003 rb_threadptr_pending_interrupt_enque(th, exc); 2004 rb_threadptr_interrupt(th); 2005 return Qnil; 2006} 2007 2008void 2009rb_threadptr_signal_raise(rb_thread_t *th, int sig) 2010{ 2011 VALUE argv[2]; 2012 2013 argv[0] = rb_eSignal; 2014 argv[1] = INT2FIX(sig); 2015 rb_threadptr_raise(th->vm->main_thread, 2, argv); 2016} 2017 2018void 2019rb_threadptr_signal_exit(rb_thread_t *th) 2020{ 2021 VALUE argv[2]; 2022 2023 argv[0] = rb_eSystemExit; 2024 argv[1] = rb_str_new2("exit"); 2025 rb_threadptr_raise(th->vm->main_thread, 2, argv); 2026} 2027 2028#if defined(POSIX_SIGNAL) && defined(SIGSEGV) && defined(HAVE_SIGALTSTACK) 2029#define USE_SIGALTSTACK 2030#endif 2031 2032void 2033ruby_thread_stack_overflow(rb_thread_t *th) 2034{ 2035 th->raised_flag = 0; 2036#ifdef USE_SIGALTSTACK 2037 rb_exc_raise(sysstack_error); 2038#else 2039 th->errinfo = sysstack_error; 2040 TH_JUMP_TAG(th, TAG_RAISE); 2041#endif 2042} 2043 2044int 2045rb_threadptr_set_raised(rb_thread_t *th) 2046{ 2047 if (th->raised_flag & RAISED_EXCEPTION) { 2048 return 1; 2049 } 2050 th->raised_flag |= RAISED_EXCEPTION; 2051 return 0; 2052} 2053 2054int 2055rb_threadptr_reset_raised(rb_thread_t *th) 2056{ 2057 if (!(th->raised_flag & RAISED_EXCEPTION)) { 2058 return 0; 2059 } 2060 th->raised_flag &= ~RAISED_EXCEPTION; 2061 return 1; 2062} 2063 2064static int 2065thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data) 2066{ 2067 int fd = (int)data; 2068 rb_thread_t *th; 2069 GetThreadPtr((VALUE)key, th); 2070 2071 if (th->waiting_fd == fd) { 2072 VALUE err = th->vm->special_exceptions[ruby_error_closed_stream]; 2073 rb_threadptr_pending_interrupt_enque(th, err); 2074 rb_threadptr_interrupt(th); 2075 } 2076 return ST_CONTINUE; 2077} 2078 2079void 2080rb_thread_fd_close(int fd) 2081{ 2082 st_foreach(GET_THREAD()->vm->living_threads, thread_fd_close_i, (st_index_t)fd); 2083} 2084 2085/* 2086 * call-seq: 2087 * thr.raise 2088 * thr.raise(string) 2089 * thr.raise(exception [, string [, array]]) 2090 * 2091 * Raises an exception (see <code>Kernel::raise</code>) from <i>thr</i>. The 2092 * caller does not have to be <i>thr</i>. 2093 * 2094 * Thread.abort_on_exception = true 2095 * a = Thread.new { sleep(200) } 2096 * a.raise("Gotcha") 2097 * 2098 * <em>produces:</em> 2099 * 2100 * prog.rb:3: Gotcha (RuntimeError) 2101 * from prog.rb:2:in `initialize' 2102 * from prog.rb:2:in `new' 2103 * from prog.rb:2 2104 */ 2105 2106static VALUE 2107thread_raise_m(int argc, VALUE *argv, VALUE self) 2108{ 2109 rb_thread_t *target_th; 2110 rb_thread_t *th = GET_THREAD(); 2111 GetThreadPtr(self, target_th); 2112 rb_threadptr_raise(target_th, argc, argv); 2113 2114 /* To perform Thread.current.raise as Kernel.raise */ 2115 if (th == target_th) { 2116 RUBY_VM_CHECK_INTS(th); 2117 } 2118 return Qnil; 2119} 2120 2121 2122/* 2123 * call-seq: 2124 * thr.exit -> thr or nil 2125 * thr.kill -> thr or nil 2126 * thr.terminate -> thr or nil 2127 * 2128 * Terminates <i>thr</i> and schedules another thread to be run. If this thread 2129 * is already marked to be killed, <code>exit</code> returns the 2130 * <code>Thread</code>. If this is the main thread, or the last thread, exits 2131 * the process. 2132 */ 2133 2134VALUE 2135rb_thread_kill(VALUE thread) 2136{ 2137 rb_thread_t *th; 2138 2139 GetThreadPtr(thread, th); 2140 2141 if (th != GET_THREAD() && th->safe_level < 4) { 2142 rb_secure(4); 2143 } 2144 if (th->to_kill || th->status == THREAD_KILLED) { 2145 return thread; 2146 } 2147 if (th == th->vm->main_thread) { 2148 rb_exit(EXIT_SUCCESS); 2149 } 2150 2151 thread_debug("rb_thread_kill: %p (%p)\n", (void *)th, (void *)th->thread_id); 2152 2153 if (th == GET_THREAD()) { 2154 /* kill myself immediately */ 2155 rb_threadptr_to_kill(th); 2156 } 2157 else { 2158 rb_threadptr_pending_interrupt_enque(th, eKillSignal); 2159 rb_threadptr_interrupt(th); 2160 } 2161 return thread; 2162} 2163 2164 2165/* 2166 * call-seq: 2167 * Thread.kill(thread) -> thread 2168 * 2169 * Causes the given <em>thread</em> to exit (see <code>Thread::exit</code>). 2170 * 2171 * count = 0 2172 * a = Thread.new { loop { count += 1 } } 2173 * sleep(0.1) #=> 0 2174 * Thread.kill(a) #=> #<Thread:0x401b3d30 dead> 2175 * count #=> 93947 2176 * a.alive? #=> false 2177 */ 2178 2179static VALUE 2180rb_thread_s_kill(VALUE obj, VALUE th) 2181{ 2182 return rb_thread_kill(th); 2183} 2184 2185 2186/* 2187 * call-seq: 2188 * Thread.exit -> thread 2189 * 2190 * Terminates the currently running thread and schedules another thread to be 2191 * run. If this thread is already marked to be killed, <code>exit</code> 2192 * returns the <code>Thread</code>. If this is the main thread, or the last 2193 * thread, exit the process. 2194 */ 2195 2196static VALUE 2197rb_thread_exit(void) 2198{ 2199 rb_thread_t *th = GET_THREAD(); 2200 return rb_thread_kill(th->self); 2201} 2202 2203 2204/* 2205 * call-seq: 2206 * thr.wakeup -> thr 2207 * 2208 * Marks <i>thr</i> as eligible for scheduling (it may still remain blocked on 2209 * I/O, however). Does not invoke the scheduler (see <code>Thread#run</code>). 2210 * 2211 * c = Thread.new { Thread.stop; puts "hey!" } 2212 * sleep 0.1 while c.status!='sleep' 2213 * c.wakeup 2214 * c.join 2215 * 2216 * <em>produces:</em> 2217 * 2218 * hey! 2219 */ 2220 2221VALUE 2222rb_thread_wakeup(VALUE thread) 2223{ 2224 if (!RTEST(rb_thread_wakeup_alive(thread))) { 2225 rb_raise(rb_eThreadError, "killed thread"); 2226 } 2227 return thread; 2228} 2229 2230VALUE 2231rb_thread_wakeup_alive(VALUE thread) 2232{ 2233 rb_thread_t *th; 2234 GetThreadPtr(thread, th); 2235 2236 if (th->status == THREAD_KILLED) { 2237 return Qnil; 2238 } 2239 rb_threadptr_ready(th); 2240 if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER) 2241 th->status = THREAD_RUNNABLE; 2242 return thread; 2243} 2244 2245 2246/* 2247 * call-seq: 2248 * thr.run -> thr 2249 * 2250 * Wakes up <i>thr</i>, making it eligible for scheduling. 2251 * 2252 * a = Thread.new { puts "a"; Thread.stop; puts "c" } 2253 * sleep 0.1 while a.status!='sleep' 2254 * puts "Got here" 2255 * a.run 2256 * a.join 2257 * 2258 * <em>produces:</em> 2259 * 2260 * a 2261 * Got here 2262 * c 2263 */ 2264 2265VALUE 2266rb_thread_run(VALUE thread) 2267{ 2268 rb_thread_wakeup(thread); 2269 rb_thread_schedule(); 2270 return thread; 2271} 2272 2273 2274/* 2275 * call-seq: 2276 * Thread.stop -> nil 2277 * 2278 * Stops execution of the current thread, putting it into a ``sleep'' state, 2279 * and schedules execution of another thread. 2280 * 2281 * a = Thread.new { print "a"; Thread.stop; print "c" } 2282 * sleep 0.1 while a.status!='sleep' 2283 * print "b" 2284 * a.run 2285 * a.join 2286 * 2287 * <em>produces:</em> 2288 * 2289 * abc 2290 */ 2291 2292VALUE 2293rb_thread_stop(void) 2294{ 2295 if (rb_thread_alone()) { 2296 rb_raise(rb_eThreadError, 2297 "stopping only thread\n\tnote: use sleep to stop forever"); 2298 } 2299 rb_thread_sleep_deadly(); 2300 return Qnil; 2301} 2302 2303static int 2304thread_list_i(st_data_t key, st_data_t val, void *data) 2305{ 2306 VALUE ary = (VALUE)data; 2307 rb_thread_t *th; 2308 GetThreadPtr((VALUE)key, th); 2309 2310 switch (th->status) { 2311 case THREAD_RUNNABLE: 2312 case THREAD_STOPPED: 2313 case THREAD_STOPPED_FOREVER: 2314 rb_ary_push(ary, th->self); 2315 default: 2316 break; 2317 } 2318 return ST_CONTINUE; 2319} 2320 2321/********************************************************************/ 2322 2323/* 2324 * call-seq: 2325 * Thread.list -> array 2326 * 2327 * Returns an array of <code>Thread</code> objects for all threads that are 2328 * either runnable or stopped. 2329 * 2330 * Thread.new { sleep(200) } 2331 * Thread.new { 1000000.times {|i| i*i } } 2332 * Thread.new { Thread.stop } 2333 * Thread.list.each {|t| p t} 2334 * 2335 * <em>produces:</em> 2336 * 2337 * #<Thread:0x401b3e84 sleep> 2338 * #<Thread:0x401b3f38 run> 2339 * #<Thread:0x401b3fb0 sleep> 2340 * #<Thread:0x401bdf4c run> 2341 */ 2342 2343VALUE 2344rb_thread_list(void) 2345{ 2346 VALUE ary = rb_ary_new(); 2347 st_foreach(GET_THREAD()->vm->living_threads, thread_list_i, ary); 2348 return ary; 2349} 2350 2351VALUE 2352rb_thread_current(void) 2353{ 2354 return GET_THREAD()->self; 2355} 2356 2357/* 2358 * call-seq: 2359 * Thread.current -> thread 2360 * 2361 * Returns the currently executing thread. 2362 * 2363 * Thread.current #=> #<Thread:0x401bdf4c run> 2364 */ 2365 2366static VALUE 2367thread_s_current(VALUE klass) 2368{ 2369 return rb_thread_current(); 2370} 2371 2372VALUE 2373rb_thread_main(void) 2374{ 2375 return GET_THREAD()->vm->main_thread->self; 2376} 2377 2378/* 2379 * call-seq: 2380 * Thread.main -> thread 2381 * 2382 * Returns the main thread. 2383 */ 2384 2385static VALUE 2386rb_thread_s_main(VALUE klass) 2387{ 2388 return rb_thread_main(); 2389} 2390 2391 2392/* 2393 * call-seq: 2394 * Thread.abort_on_exception -> true or false 2395 * 2396 * Returns the status of the global ``abort on exception'' condition. The 2397 * default is <code>false</code>. When set to <code>true</code>, or if the 2398 * global <code>$DEBUG</code> flag is <code>true</code> (perhaps because the 2399 * command line option <code>-d</code> was specified) all threads will abort 2400 * (the process will <code>exit(0)</code>) if an exception is raised in any 2401 * thread. See also <code>Thread::abort_on_exception=</code>. 2402 */ 2403 2404static VALUE 2405rb_thread_s_abort_exc(void) 2406{ 2407 return GET_THREAD()->vm->thread_abort_on_exception ? Qtrue : Qfalse; 2408} 2409 2410 2411/* 2412 * call-seq: 2413 * Thread.abort_on_exception= boolean -> true or false 2414 * 2415 * When set to <code>true</code>, all threads will abort if an exception is 2416 * raised. Returns the new state. 2417 * 2418 * Thread.abort_on_exception = true 2419 * t1 = Thread.new do 2420 * puts "In new thread" 2421 * raise "Exception from thread" 2422 * end 2423 * sleep(1) 2424 * puts "not reached" 2425 * 2426 * <em>produces:</em> 2427 * 2428 * In new thread 2429 * prog.rb:4: Exception from thread (RuntimeError) 2430 * from prog.rb:2:in `initialize' 2431 * from prog.rb:2:in `new' 2432 * from prog.rb:2 2433 */ 2434 2435static VALUE 2436rb_thread_s_abort_exc_set(VALUE self, VALUE val) 2437{ 2438 rb_secure(4); 2439 GET_THREAD()->vm->thread_abort_on_exception = RTEST(val); 2440 return val; 2441} 2442 2443 2444/* 2445 * call-seq: 2446 * thr.abort_on_exception -> true or false 2447 * 2448 * Returns the status of the thread-local ``abort on exception'' condition for 2449 * <i>thr</i>. The default is <code>false</code>. See also 2450 * <code>Thread::abort_on_exception=</code>. 2451 */ 2452 2453static VALUE 2454rb_thread_abort_exc(VALUE thread) 2455{ 2456 rb_thread_t *th; 2457 GetThreadPtr(thread, th); 2458 return th->abort_on_exception ? Qtrue : Qfalse; 2459} 2460 2461 2462/* 2463 * call-seq: 2464 * thr.abort_on_exception= boolean -> true or false 2465 * 2466 * When set to <code>true</code>, causes all threads (including the main 2467 * program) to abort if an exception is raised in <i>thr</i>. The process will 2468 * effectively <code>exit(0)</code>. 2469 */ 2470 2471static VALUE 2472rb_thread_abort_exc_set(VALUE thread, VALUE val) 2473{ 2474 rb_thread_t *th; 2475 rb_secure(4); 2476 2477 GetThreadPtr(thread, th); 2478 th->abort_on_exception = RTEST(val); 2479 return val; 2480} 2481 2482 2483/* 2484 * call-seq: 2485 * thr.group -> thgrp or nil 2486 * 2487 * Returns the <code>ThreadGroup</code> which contains <i>thr</i>, or nil if 2488 * the thread is not a member of any group. 2489 * 2490 * Thread.main.group #=> #<ThreadGroup:0x4029d914> 2491 */ 2492 2493VALUE 2494rb_thread_group(VALUE thread) 2495{ 2496 rb_thread_t *th; 2497 VALUE group; 2498 GetThreadPtr(thread, th); 2499 group = th->thgroup; 2500 2501 if (!group) { 2502 group = Qnil; 2503 } 2504 return group; 2505} 2506 2507static const char * 2508thread_status_name(rb_thread_t *th) 2509{ 2510 switch (th->status) { 2511 case THREAD_RUNNABLE: 2512 if (th->to_kill) 2513 return "aborting"; 2514 else 2515 return "run"; 2516 case THREAD_STOPPED: 2517 case THREAD_STOPPED_FOREVER: 2518 return "sleep"; 2519 case THREAD_KILLED: 2520 return "dead"; 2521 default: 2522 return "unknown"; 2523 } 2524} 2525 2526static int 2527rb_threadptr_dead(rb_thread_t *th) 2528{ 2529 return th->status == THREAD_KILLED; 2530} 2531 2532 2533/* 2534 * call-seq: 2535 * thr.status -> string, false or nil 2536 * 2537 * Returns the status of <i>thr</i>: ``<code>sleep</code>'' if <i>thr</i> is 2538 * sleeping or waiting on I/O, ``<code>run</code>'' if <i>thr</i> is executing, 2539 * ``<code>aborting</code>'' if <i>thr</i> is aborting, <code>false</code> if 2540 * <i>thr</i> terminated normally, and <code>nil</code> if <i>thr</i> 2541 * terminated with an exception. 2542 * 2543 * a = Thread.new { raise("die now") } 2544 * b = Thread.new { Thread.stop } 2545 * c = Thread.new { Thread.exit } 2546 * d = Thread.new { sleep } 2547 * d.kill #=> #<Thread:0x401b3678 aborting> 2548 * a.status #=> nil 2549 * b.status #=> "sleep" 2550 * c.status #=> false 2551 * d.status #=> "aborting" 2552 * Thread.current.status #=> "run" 2553 */ 2554 2555static VALUE 2556rb_thread_status(VALUE thread) 2557{ 2558 rb_thread_t *th; 2559 GetThreadPtr(thread, th); 2560 2561 if (rb_threadptr_dead(th)) { 2562 if (!NIL_P(th->errinfo) && !FIXNUM_P(th->errinfo) 2563 /* TODO */ ) { 2564 return Qnil; 2565 } 2566 return Qfalse; 2567 } 2568 return rb_str_new2(thread_status_name(th)); 2569} 2570 2571 2572/* 2573 * call-seq: 2574 * thr.alive? -> true or false 2575 * 2576 * Returns <code>true</code> if <i>thr</i> is running or sleeping. 2577 * 2578 * thr = Thread.new { } 2579 * thr.join #=> #<Thread:0x401b3fb0 dead> 2580 * Thread.current.alive? #=> true 2581 * thr.alive? #=> false 2582 */ 2583 2584static VALUE 2585rb_thread_alive_p(VALUE thread) 2586{ 2587 rb_thread_t *th; 2588 GetThreadPtr(thread, th); 2589 2590 if (rb_threadptr_dead(th)) 2591 return Qfalse; 2592 return Qtrue; 2593} 2594 2595/* 2596 * call-seq: 2597 * thr.stop? -> true or false 2598 * 2599 * Returns <code>true</code> if <i>thr</i> is dead or sleeping. 2600 * 2601 * a = Thread.new { Thread.stop } 2602 * b = Thread.current 2603 * a.stop? #=> true 2604 * b.stop? #=> false 2605 */ 2606 2607static VALUE 2608rb_thread_stop_p(VALUE thread) 2609{ 2610 rb_thread_t *th; 2611 GetThreadPtr(thread, th); 2612 2613 if (rb_threadptr_dead(th)) 2614 return Qtrue; 2615 if (th->status == THREAD_STOPPED || th->status == THREAD_STOPPED_FOREVER) 2616 return Qtrue; 2617 return Qfalse; 2618} 2619 2620/* 2621 * call-seq: 2622 * thr.safe_level -> integer 2623 * 2624 * Returns the safe level in effect for <i>thr</i>. Setting thread-local safe 2625 * levels can help when implementing sandboxes which run insecure code. 2626 * 2627 * thr = Thread.new { $SAFE = 3; sleep } 2628 * Thread.current.safe_level #=> 0 2629 * thr.safe_level #=> 3 2630 */ 2631 2632static VALUE 2633rb_thread_safe_level(VALUE thread) 2634{ 2635 rb_thread_t *th; 2636 GetThreadPtr(thread, th); 2637 2638 return INT2NUM(th->safe_level); 2639} 2640 2641/* 2642 * call-seq: 2643 * thr.inspect -> string 2644 * 2645 * Dump the name, id, and status of _thr_ to a string. 2646 */ 2647 2648static VALUE 2649rb_thread_inspect(VALUE thread) 2650{ 2651 const char *cname = rb_obj_classname(thread); 2652 rb_thread_t *th; 2653 const char *status; 2654 VALUE str; 2655 2656 GetThreadPtr(thread, th); 2657 status = thread_status_name(th); 2658 str = rb_sprintf("#<%s:%p %s>", cname, (void *)thread, status); 2659 OBJ_INFECT(str, thread); 2660 2661 return str; 2662} 2663 2664VALUE 2665rb_thread_local_aref(VALUE thread, ID id) 2666{ 2667 rb_thread_t *th; 2668 st_data_t val; 2669 2670 GetThreadPtr(thread, th); 2671 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 2672 rb_raise(rb_eSecurityError, "Insecure: thread locals"); 2673 } 2674 if (!th->local_storage) { 2675 return Qnil; 2676 } 2677 if (st_lookup(th->local_storage, id, &val)) { 2678 return (VALUE)val; 2679 } 2680 return Qnil; 2681} 2682 2683/* 2684 * call-seq: 2685 * thr[sym] -> obj or nil 2686 * 2687 * Attribute Reference---Returns the value of a fiber-local variable (current thread's root fiber 2688 * if not explicitely inside a Fiber), using either a symbol or a string name. 2689 * If the specified variable does not exist, returns <code>nil</code>. 2690 * 2691 * [ 2692 * Thread.new { Thread.current["name"] = "A" }, 2693 * Thread.new { Thread.current[:name] = "B" }, 2694 * Thread.new { Thread.current["name"] = "C" } 2695 * ].each do |th| 2696 * th.join 2697 * puts "#{th.inspect}: #{th[:name]}" 2698 * end 2699 * 2700 * <em>produces:</em> 2701 * 2702 * #<Thread:0x00000002a54220 dead>: A 2703 * #<Thread:0x00000002a541a8 dead>: B 2704 * #<Thread:0x00000002a54130 dead>: C 2705 * 2706 * Thread#[] and Thread#[]= are not thread-local but fiber-local. 2707 * This confusion did not exist in Ruby 1.8 because 2708 * fibers were only available since Ruby 1.9. 2709 * Ruby 1.9 chooses that the methods behaves fiber-local to save 2710 * following idiom for dynamic scope. 2711 * 2712 * def meth(newvalue) 2713 * begin 2714 * oldvalue = Thread.current[:name] 2715 * Thread.current[:name] = newvalue 2716 * yield 2717 * ensure 2718 * Thread.current[:name] = oldvalue 2719 * end 2720 * end 2721 * 2722 * The idiom may not work as dynamic scope if the methods are thread-local 2723 * and a given block switches fiber. 2724 * 2725 * f = Fiber.new { 2726 * meth(1) { 2727 * Fiber.yield 2728 * } 2729 * } 2730 * meth(2) { 2731 * f.resume 2732 * } 2733 * f.resume 2734 * p Thread.current[:name] 2735 * #=> nil if fiber-local 2736 * #=> 2 if thread-local (The value 2 is leaked to outside of meth method.) 2737 * 2738 * For thread-local variables, please see <code>Thread#thread_local_get</code> 2739 * and <code>Thread#thread_local_set</code>. 2740 * 2741 */ 2742 2743static VALUE 2744rb_thread_aref(VALUE thread, VALUE id) 2745{ 2746 return rb_thread_local_aref(thread, rb_to_id(id)); 2747} 2748 2749VALUE 2750rb_thread_local_aset(VALUE thread, ID id, VALUE val) 2751{ 2752 rb_thread_t *th; 2753 GetThreadPtr(thread, th); 2754 2755 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 2756 rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals"); 2757 } 2758 if (OBJ_FROZEN(thread)) { 2759 rb_error_frozen("thread locals"); 2760 } 2761 if (!th->local_storage) { 2762 th->local_storage = st_init_numtable(); 2763 } 2764 if (NIL_P(val)) { 2765 st_delete_wrap(th->local_storage, id); 2766 return Qnil; 2767 } 2768 st_insert(th->local_storage, id, val); 2769 return val; 2770} 2771 2772/* 2773 * call-seq: 2774 * thr[sym] = obj -> obj 2775 * 2776 * Attribute Assignment---Sets or creates the value of a fiber-local variable, 2777 * using either a symbol or a string. See also <code>Thread#[]</code>. For 2778 * thread-local variables, please see <code>Thread#thread_variable_set</code> 2779 * and <code>Thread#thread_variable_get</code>. 2780 */ 2781 2782static VALUE 2783rb_thread_aset(VALUE self, VALUE id, VALUE val) 2784{ 2785 return rb_thread_local_aset(self, rb_to_id(id), val); 2786} 2787 2788/* 2789 * call-seq: 2790 * thr.thread_variable_get(key) -> obj or nil 2791 * 2792 * Returns the value of a thread local variable that has been set. Note that 2793 * these are different than fiber local values. For fiber local values, 2794 * please see Thread#[] and Thread#[]=. 2795 * 2796 * Thread local values are carried along with threads, and do not respect 2797 * fibers. For example: 2798 * 2799 * Thread.new { 2800 * Thread.current.thread_variable_set("foo", "bar") # set a thread local 2801 * Thread.current["foo"] = "bar" # set a fiber local 2802 * 2803 * Fiber.new { 2804 * Fiber.yield [ 2805 * Thread.current.thread_variable_get("foo"), # get the thread local 2806 * Thread.current["foo"], # get the fiber local 2807 * ] 2808 * }.resume 2809 * }.join.value # => ['bar', nil] 2810 * 2811 * The value "bar" is returned for the thread local, where nil is returned 2812 * for the fiber local. The fiber is executed in the same thread, so the 2813 * thread local values are available. 2814 * 2815 * See also Thread#[] 2816 */ 2817 2818static VALUE 2819rb_thread_variable_get(VALUE thread, VALUE id) 2820{ 2821 VALUE locals; 2822 rb_thread_t *th; 2823 2824 GetThreadPtr(thread, th); 2825 2826 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 2827 rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals"); 2828 } 2829 2830 locals = rb_iv_get(thread, "locals"); 2831 return rb_hash_aref(locals, ID2SYM(rb_to_id(id))); 2832} 2833 2834/* 2835 * call-seq: 2836 * thr.thread_variable_set(key, value) 2837 * 2838 * Sets a thread local with +key+ to +value+. Note that these are local to 2839 * threads, and not to fibers. Please see Thread#thread_variable_get and 2840 * Thread#[] for more information. 2841 */ 2842 2843static VALUE 2844rb_thread_variable_set(VALUE thread, VALUE id, VALUE val) 2845{ 2846 VALUE locals; 2847 rb_thread_t *th; 2848 2849 GetThreadPtr(thread, th); 2850 2851 if (rb_safe_level() >= 4 && th != GET_THREAD()) { 2852 rb_raise(rb_eSecurityError, "Insecure: can't modify thread locals"); 2853 } 2854 if (OBJ_FROZEN(thread)) { 2855 rb_error_frozen("thread locals"); 2856 } 2857 2858 locals = rb_iv_get(thread, "locals"); 2859 return rb_hash_aset(locals, ID2SYM(rb_to_id(id)), val); 2860} 2861 2862/* 2863 * call-seq: 2864 * thr.key?(sym) -> true or false 2865 * 2866 * Returns <code>true</code> if the given string (or symbol) exists as a 2867 * fiber-local variable. 2868 * 2869 * me = Thread.current 2870 * me[:oliver] = "a" 2871 * me.key?(:oliver) #=> true 2872 * me.key?(:stanley) #=> false 2873 */ 2874 2875static VALUE 2876rb_thread_key_p(VALUE self, VALUE key) 2877{ 2878 rb_thread_t *th; 2879 ID id = rb_to_id(key); 2880 2881 GetThreadPtr(self, th); 2882 2883 if (!th->local_storage) { 2884 return Qfalse; 2885 } 2886 if (st_lookup(th->local_storage, id, 0)) { 2887 return Qtrue; 2888 } 2889 return Qfalse; 2890} 2891 2892static int 2893thread_keys_i(ID key, VALUE value, VALUE ary) 2894{ 2895 rb_ary_push(ary, ID2SYM(key)); 2896 return ST_CONTINUE; 2897} 2898 2899static int 2900vm_living_thread_num(rb_vm_t *vm) 2901{ 2902 return (int)vm->living_threads->num_entries; 2903} 2904 2905int 2906rb_thread_alone(void) 2907{ 2908 int num = 1; 2909 if (GET_THREAD()->vm->living_threads) { 2910 num = vm_living_thread_num(GET_THREAD()->vm); 2911 thread_debug("rb_thread_alone: %d\n", num); 2912 } 2913 return num == 1; 2914} 2915 2916/* 2917 * call-seq: 2918 * thr.keys -> array 2919 * 2920 * Returns an an array of the names of the fiber-local variables (as Symbols). 2921 * 2922 * thr = Thread.new do 2923 * Thread.current[:cat] = 'meow' 2924 * Thread.current["dog"] = 'woof' 2925 * end 2926 * thr.join #=> #<Thread:0x401b3f10 dead> 2927 * thr.keys #=> [:dog, :cat] 2928 */ 2929 2930static VALUE 2931rb_thread_keys(VALUE self) 2932{ 2933 rb_thread_t *th; 2934 VALUE ary = rb_ary_new(); 2935 GetThreadPtr(self, th); 2936 2937 if (th->local_storage) { 2938 st_foreach(th->local_storage, thread_keys_i, ary); 2939 } 2940 return ary; 2941} 2942 2943static int 2944keys_i(VALUE key, VALUE value, VALUE ary) 2945{ 2946 rb_ary_push(ary, key); 2947 return ST_CONTINUE; 2948} 2949 2950/* 2951 * call-seq: 2952 * thr.thread_variables -> array 2953 * 2954 * Returns an an array of the names of the thread-local variables (as Symbols). 2955 * 2956 * thr = Thread.new do 2957 * Thread.current.thread_variable_set(:cat, 'meow') 2958 * Thread.current.thread_variable_set("dog", 'woof') 2959 * end 2960 * thr.join #=> #<Thread:0x401b3f10 dead> 2961 * thr.thread_variables #=> [:dog, :cat] 2962 * 2963 * Note that these are not fiber local variables. Please see Thread#[] and 2964 * Thread#thread_variable_get for more details. 2965 */ 2966 2967static VALUE 2968rb_thread_variables(VALUE thread) 2969{ 2970 VALUE locals; 2971 VALUE ary; 2972 2973 locals = rb_iv_get(thread, "locals"); 2974 ary = rb_ary_new(); 2975 rb_hash_foreach(locals, keys_i, ary); 2976 2977 return ary; 2978} 2979 2980/* 2981 * call-seq: 2982 * thr.thread_variable?(key) -> true or false 2983 * 2984 * Returns <code>true</code> if the given string (or symbol) exists as a 2985 * thread-local variable. 2986 * 2987 * me = Thread.current 2988 * me.thread_variable_set(:oliver, "a") 2989 * me.thread_variable?(:oliver) #=> true 2990 * me.thread_variable?(:stanley) #=> false 2991 * 2992 * Note that these are not fiber local variables. Please see Thread#[] and 2993 * Thread#thread_variable_get for more details. 2994 */ 2995 2996static VALUE 2997rb_thread_variable_p(VALUE thread, VALUE key) 2998{ 2999 VALUE locals; 3000 3001 locals = rb_iv_get(thread, "locals"); 3002 3003 if (!RHASH(locals)->ntbl) 3004 return Qfalse; 3005 3006 if (st_lookup(RHASH(locals)->ntbl, ID2SYM(rb_to_id(key)), 0)) { 3007 return Qtrue; 3008 } 3009 3010 return Qfalse; 3011} 3012 3013/* 3014 * call-seq: 3015 * thr.priority -> integer 3016 * 3017 * Returns the priority of <i>thr</i>. Default is inherited from the 3018 * current thread which creating the new thread, or zero for the 3019 * initial main thread; higher-priority thread will run more frequently 3020 * than lower-priority threads (but lower-priority threads can also run). 3021 * 3022 * This is just hint for Ruby thread scheduler. It may be ignored on some 3023 * platform. 3024 * 3025 * Thread.current.priority #=> 0 3026 */ 3027 3028static VALUE 3029rb_thread_priority(VALUE thread) 3030{ 3031 rb_thread_t *th; 3032 GetThreadPtr(thread, th); 3033 return INT2NUM(th->priority); 3034} 3035 3036 3037/* 3038 * call-seq: 3039 * thr.priority= integer -> thr 3040 * 3041 * Sets the priority of <i>thr</i> to <i>integer</i>. Higher-priority threads 3042 * will run more frequently than lower-priority threads (but lower-priority 3043 * threads can also run). 3044 * 3045 * This is just hint for Ruby thread scheduler. It may be ignored on some 3046 * platform. 3047 * 3048 * count1 = count2 = 0 3049 * a = Thread.new do 3050 * loop { count1 += 1 } 3051 * end 3052 * a.priority = -1 3053 * 3054 * b = Thread.new do 3055 * loop { count2 += 1 } 3056 * end 3057 * b.priority = -2 3058 * sleep 1 #=> 1 3059 * count1 #=> 622504 3060 * count2 #=> 5832 3061 */ 3062 3063static VALUE 3064rb_thread_priority_set(VALUE thread, VALUE prio) 3065{ 3066 rb_thread_t *th; 3067 int priority; 3068 GetThreadPtr(thread, th); 3069 3070 rb_secure(4); 3071 3072#if USE_NATIVE_THREAD_PRIORITY 3073 th->priority = NUM2INT(prio); 3074 native_thread_apply_priority(th); 3075#else 3076 priority = NUM2INT(prio); 3077 if (priority > RUBY_THREAD_PRIORITY_MAX) { 3078 priority = RUBY_THREAD_PRIORITY_MAX; 3079 } 3080 else if (priority < RUBY_THREAD_PRIORITY_MIN) { 3081 priority = RUBY_THREAD_PRIORITY_MIN; 3082 } 3083 th->priority = priority; 3084#endif 3085 return INT2NUM(th->priority); 3086} 3087 3088/* for IO */ 3089 3090#if defined(NFDBITS) && defined(HAVE_RB_FD_INIT) 3091 3092/* 3093 * several Unix platforms support file descriptors bigger than FD_SETSIZE 3094 * in select(2) system call. 3095 * 3096 * - Linux 2.2.12 (?) 3097 * - NetBSD 1.2 (src/sys/kern/sys_generic.c:1.25) 3098 * select(2) documents how to allocate fd_set dynamically. 3099 * http://netbsd.gw.com/cgi-bin/man-cgi?select++NetBSD-4.0 3100 * - FreeBSD 2.2 (src/sys/kern/sys_generic.c:1.19) 3101 * - OpenBSD 2.0 (src/sys/kern/sys_generic.c:1.4) 3102 * select(2) documents how to allocate fd_set dynamically. 3103 * http://www.openbsd.org/cgi-bin/man.cgi?query=select&manpath=OpenBSD+4.4 3104 * - HP-UX documents how to allocate fd_set dynamically. 3105 * http://docs.hp.com/en/B2355-60105/select.2.html 3106 * - Solaris 8 has select_large_fdset 3107 * - Mac OS X 10.7 (Lion) 3108 * select(2) returns EINVAL if nfds is greater than FD_SET_SIZE and 3109 * _DARWIN_UNLIMITED_SELECT (or _DARWIN_C_SOURCE) isn't defined. 3110 * http://developer.apple.com/library/mac/#releasenotes/Darwin/SymbolVariantsRelNotes/_index.html 3111 * 3112 * When fd_set is not big enough to hold big file descriptors, 3113 * it should be allocated dynamically. 3114 * Note that this assumes fd_set is structured as bitmap. 3115 * 3116 * rb_fd_init allocates the memory. 3117 * rb_fd_term free the memory. 3118 * rb_fd_set may re-allocates bitmap. 3119 * 3120 * So rb_fd_set doesn't reject file descriptors bigger than FD_SETSIZE. 3121 */ 3122 3123void 3124rb_fd_init(rb_fdset_t *fds) 3125{ 3126 fds->maxfd = 0; 3127 fds->fdset = ALLOC(fd_set); 3128 FD_ZERO(fds->fdset); 3129} 3130 3131void 3132rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) 3133{ 3134 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 3135 3136 if (size < sizeof(fd_set)) 3137 size = sizeof(fd_set); 3138 dst->maxfd = src->maxfd; 3139 dst->fdset = xmalloc(size); 3140 memcpy(dst->fdset, src->fdset, size); 3141} 3142 3143void 3144rb_fd_term(rb_fdset_t *fds) 3145{ 3146 if (fds->fdset) xfree(fds->fdset); 3147 fds->maxfd = 0; 3148 fds->fdset = 0; 3149} 3150 3151void 3152rb_fd_zero(rb_fdset_t *fds) 3153{ 3154 if (fds->fdset) 3155 MEMZERO(fds->fdset, fd_mask, howmany(fds->maxfd, NFDBITS)); 3156} 3157 3158static void 3159rb_fd_resize(int n, rb_fdset_t *fds) 3160{ 3161 size_t m = howmany(n + 1, NFDBITS) * sizeof(fd_mask); 3162 size_t o = howmany(fds->maxfd, NFDBITS) * sizeof(fd_mask); 3163 3164 if (m < sizeof(fd_set)) m = sizeof(fd_set); 3165 if (o < sizeof(fd_set)) o = sizeof(fd_set); 3166 3167 if (m > o) { 3168 fds->fdset = xrealloc(fds->fdset, m); 3169 memset((char *)fds->fdset + o, 0, m - o); 3170 } 3171 if (n >= fds->maxfd) fds->maxfd = n + 1; 3172} 3173 3174void 3175rb_fd_set(int n, rb_fdset_t *fds) 3176{ 3177 rb_fd_resize(n, fds); 3178 FD_SET(n, fds->fdset); 3179} 3180 3181void 3182rb_fd_clr(int n, rb_fdset_t *fds) 3183{ 3184 if (n >= fds->maxfd) return; 3185 FD_CLR(n, fds->fdset); 3186} 3187 3188int 3189rb_fd_isset(int n, const rb_fdset_t *fds) 3190{ 3191 if (n >= fds->maxfd) return 0; 3192 return FD_ISSET(n, fds->fdset) != 0; /* "!= 0" avoids FreeBSD PR 91421 */ 3193} 3194 3195void 3196rb_fd_copy(rb_fdset_t *dst, const fd_set *src, int max) 3197{ 3198 size_t size = howmany(max, NFDBITS) * sizeof(fd_mask); 3199 3200 if (size < sizeof(fd_set)) size = sizeof(fd_set); 3201 dst->maxfd = max; 3202 dst->fdset = xrealloc(dst->fdset, size); 3203 memcpy(dst->fdset, src, size); 3204} 3205 3206static void 3207rb_fd_rcopy(fd_set *dst, rb_fdset_t *src) 3208{ 3209 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 3210 3211 if (size > sizeof(fd_set)) { 3212 rb_raise(rb_eArgError, "too large fdsets"); 3213 } 3214 memcpy(dst, rb_fd_ptr(src), sizeof(fd_set)); 3215} 3216 3217void 3218rb_fd_dup(rb_fdset_t *dst, const rb_fdset_t *src) 3219{ 3220 size_t size = howmany(rb_fd_max(src), NFDBITS) * sizeof(fd_mask); 3221 3222 if (size < sizeof(fd_set)) 3223 size = sizeof(fd_set); 3224 dst->maxfd = src->maxfd; 3225 dst->fdset = xrealloc(dst->fdset, size); 3226 memcpy(dst->fdset, src->fdset, size); 3227} 3228 3229#ifdef __native_client__ 3230int select(int nfds, fd_set *readfds, fd_set *writefds, 3231 fd_set *exceptfds, struct timeval *timeout); 3232#endif 3233 3234int 3235rb_fd_select(int n, rb_fdset_t *readfds, rb_fdset_t *writefds, rb_fdset_t *exceptfds, struct timeval *timeout) 3236{ 3237 fd_set *r = NULL, *w = NULL, *e = NULL; 3238 if (readfds) { 3239 rb_fd_resize(n - 1, readfds); 3240 r = rb_fd_ptr(readfds); 3241 } 3242 if (writefds) { 3243 rb_fd_resize(n - 1, writefds); 3244 w = rb_fd_ptr(writefds); 3245 } 3246 if (exceptfds) { 3247 rb_fd_resize(n - 1, exceptfds); 3248 e = rb_fd_ptr(exceptfds); 3249 } 3250 return select(n, r, w, e, timeout); 3251} 3252 3253#undef FD_ZERO 3254#undef FD_SET 3255#undef FD_CLR 3256#undef FD_ISSET 3257 3258#define FD_ZERO(f) rb_fd_zero(f) 3259#define FD_SET(i, f) rb_fd_set((i), (f)) 3260#define FD_CLR(i, f) rb_fd_clr((i), (f)) 3261#define FD_ISSET(i, f) rb_fd_isset((i), (f)) 3262 3263#elif defined(_WIN32) 3264 3265void 3266rb_fd_init(rb_fdset_t *set) 3267{ 3268 set->capa = FD_SETSIZE; 3269 set->fdset = ALLOC(fd_set); 3270 FD_ZERO(set->fdset); 3271} 3272 3273void 3274rb_fd_init_copy(rb_fdset_t *dst, rb_fdset_t *src) 3275{ 3276 rb_fd_init(dst); 3277 rb_fd_dup(dst, src); 3278} 3279 3280static void 3281rb_fd_rcopy(fd_set *dst, rb_fdset_t *src) 3282{ 3283 int max = rb_fd_max(src); 3284 3285 /* we assume src is the result of select() with dst, so dst should be 3286 * larger or equal than src. */ 3287 if (max > FD_SETSIZE || (UINT)max > dst->fd_count) { 3288 rb_raise(rb_eArgError, "too large fdsets"); 3289 } 3290 3291 memcpy(dst->fd_array, src->fdset->fd_array, max); 3292 dst->fd_count = max; 3293} 3294 3295void 3296rb_fd_term(rb_fdset_t *set) 3297{ 3298 xfree(set->fdset); 3299 set->fdset = NULL; 3300 set->capa = 0; 3301} 3302 3303void 3304rb_fd_set(int fd, rb_fdset_t *set) 3305{ 3306 unsigned int i; 3307 SOCKET s = rb_w32_get_osfhandle(fd); 3308 3309 for (i = 0; i < set->fdset->fd_count; i++) { 3310 if (set->fdset->fd_array[i] == s) { 3311 return; 3312 } 3313 } 3314 if (set->fdset->fd_count >= (unsigned)set->capa) { 3315 set->capa = (set->fdset->fd_count / FD_SETSIZE + 1) * FD_SETSIZE; 3316 set->fdset = xrealloc(set->fdset, sizeof(unsigned int) + sizeof(SOCKET) * set->capa); 3317 } 3318 set->fdset->fd_array[set->fdset->fd_count++] = s; 3319} 3320 3321#undef FD_ZERO 3322#undef FD_SET 3323#undef FD_CLR 3324#undef FD_ISSET 3325 3326#define FD_ZERO(f) rb_fd_zero(f) 3327#define FD_SET(i, f) rb_fd_set((i), (f)) 3328#define FD_CLR(i, f) rb_fd_clr((i), (f)) 3329#define FD_ISSET(i, f) rb_fd_isset((i), (f)) 3330 3331#else 3332#define rb_fd_rcopy(d, s) (*(d) = *(s)) 3333#endif 3334 3335static int 3336do_select(int n, rb_fdset_t *read, rb_fdset_t *write, rb_fdset_t *except, 3337 struct timeval *timeout) 3338{ 3339 int UNINITIALIZED_VAR(result); 3340 int lerrno; 3341 rb_fdset_t UNINITIALIZED_VAR(orig_read); 3342 rb_fdset_t UNINITIALIZED_VAR(orig_write); 3343 rb_fdset_t UNINITIALIZED_VAR(orig_except); 3344 double limit = 0; 3345 struct timeval wait_rest; 3346 rb_thread_t *th = GET_THREAD(); 3347 3348 if (timeout) { 3349 limit = timeofday(); 3350 limit += (double)timeout->tv_sec+(double)timeout->tv_usec*1e-6; 3351 wait_rest = *timeout; 3352 timeout = &wait_rest; 3353 } 3354 3355 if (read) 3356 rb_fd_init_copy(&orig_read, read); 3357 if (write) 3358 rb_fd_init_copy(&orig_write, write); 3359 if (except) 3360 rb_fd_init_copy(&orig_except, except); 3361 3362 retry: 3363 lerrno = 0; 3364 3365 BLOCKING_REGION({ 3366 result = native_fd_select(n, read, write, except, timeout, th); 3367 if (result < 0) lerrno = errno; 3368 }, ubf_select, th, FALSE); 3369 3370 RUBY_VM_CHECK_INTS_BLOCKING(th); 3371 3372 errno = lerrno; 3373 3374 if (result < 0) { 3375 switch (errno) { 3376 case EINTR: 3377#ifdef ERESTART 3378 case ERESTART: 3379#endif 3380 if (read) 3381 rb_fd_dup(read, &orig_read); 3382 if (write) 3383 rb_fd_dup(write, &orig_write); 3384 if (except) 3385 rb_fd_dup(except, &orig_except); 3386 3387 if (timeout) { 3388 double d = limit - timeofday(); 3389 3390 wait_rest.tv_sec = (time_t)d; 3391 wait_rest.tv_usec = (int)((d-(double)wait_rest.tv_sec)*1e6); 3392 if (wait_rest.tv_sec < 0) wait_rest.tv_sec = 0; 3393 if (wait_rest.tv_usec < 0) wait_rest.tv_usec = 0; 3394 } 3395 3396 goto retry; 3397 default: 3398 break; 3399 } 3400 } 3401 3402 if (read) 3403 rb_fd_term(&orig_read); 3404 if (write) 3405 rb_fd_term(&orig_write); 3406 if (except) 3407 rb_fd_term(&orig_except); 3408 3409 return result; 3410} 3411 3412static void 3413rb_thread_wait_fd_rw(int fd, int read) 3414{ 3415 int result = 0; 3416 int events = read ? RB_WAITFD_IN : RB_WAITFD_OUT; 3417 3418 thread_debug("rb_thread_wait_fd_rw(%d, %s)\n", fd, read ? "read" : "write"); 3419 3420 if (fd < 0) { 3421 rb_raise(rb_eIOError, "closed stream"); 3422 } 3423 3424 result = rb_wait_for_single_fd(fd, events, NULL); 3425 if (result < 0) { 3426 rb_sys_fail(0); 3427 } 3428 3429 thread_debug("rb_thread_wait_fd_rw(%d, %s): done\n", fd, read ? "read" : "write"); 3430} 3431 3432void 3433rb_thread_wait_fd(int fd) 3434{ 3435 rb_thread_wait_fd_rw(fd, 1); 3436} 3437 3438int 3439rb_thread_fd_writable(int fd) 3440{ 3441 rb_thread_wait_fd_rw(fd, 0); 3442 return TRUE; 3443} 3444 3445int 3446rb_thread_select(int max, fd_set * read, fd_set * write, fd_set * except, 3447 struct timeval *timeout) 3448{ 3449 rb_fdset_t fdsets[3]; 3450 rb_fdset_t *rfds = NULL; 3451 rb_fdset_t *wfds = NULL; 3452 rb_fdset_t *efds = NULL; 3453 int retval; 3454 3455 if (read) { 3456 rfds = &fdsets[0]; 3457 rb_fd_init(rfds); 3458 rb_fd_copy(rfds, read, max); 3459 } 3460 if (write) { 3461 wfds = &fdsets[1]; 3462 rb_fd_init(wfds); 3463 rb_fd_copy(wfds, write, max); 3464 } 3465 if (except) { 3466 efds = &fdsets[2]; 3467 rb_fd_init(efds); 3468 rb_fd_copy(efds, except, max); 3469 } 3470 3471 retval = rb_thread_fd_select(max, rfds, wfds, efds, timeout); 3472 3473 if (rfds) { 3474 rb_fd_rcopy(read, rfds); 3475 rb_fd_term(rfds); 3476 } 3477 if (wfds) { 3478 rb_fd_rcopy(write, wfds); 3479 rb_fd_term(wfds); 3480 } 3481 if (efds) { 3482 rb_fd_rcopy(except, efds); 3483 rb_fd_term(efds); 3484 } 3485 3486 return retval; 3487} 3488 3489int 3490rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * except, 3491 struct timeval *timeout) 3492{ 3493 if (!read && !write && !except) { 3494 if (!timeout) { 3495 rb_thread_sleep_forever(); 3496 return 0; 3497 } 3498 rb_thread_wait_for(*timeout); 3499 return 0; 3500 } 3501 3502 if (read) { 3503 rb_fd_resize(max - 1, read); 3504 } 3505 if (write) { 3506 rb_fd_resize(max - 1, write); 3507 } 3508 if (except) { 3509 rb_fd_resize(max - 1, except); 3510 } 3511 return do_select(max, read, write, except, timeout); 3512} 3513 3514/* 3515 * poll() is supported by many OSes, but so far Linux is the only 3516 * one we know of that supports using poll() in all places select() 3517 * would work. 3518 */ 3519#if defined(HAVE_POLL) && defined(__linux__) 3520# define USE_POLL 3521#endif 3522 3523#ifdef USE_POLL 3524 3525/* The same with linux kernel. TODO: make platform independent definition. */ 3526#define POLLIN_SET (POLLRDNORM | POLLRDBAND | POLLIN | POLLHUP | POLLERR) 3527#define POLLOUT_SET (POLLWRBAND | POLLWRNORM | POLLOUT | POLLERR) 3528#define POLLEX_SET (POLLPRI) 3529 3530#ifndef HAVE_PPOLL 3531/* TODO: don't ignore sigmask */ 3532int 3533ppoll(struct pollfd *fds, nfds_t nfds, 3534 const struct timespec *ts, const sigset_t *sigmask) 3535{ 3536 int timeout_ms; 3537 3538 if (ts) { 3539 int tmp, tmp2; 3540 3541 if (ts->tv_sec > TIMET_MAX/1000) 3542 timeout_ms = -1; 3543 else { 3544 tmp = ts->tv_sec * 1000; 3545 tmp2 = ts->tv_nsec / (1000 * 1000); 3546 if (TIMET_MAX - tmp < tmp2) 3547 timeout_ms = -1; 3548 else 3549 timeout_ms = tmp + tmp2; 3550 } 3551 } 3552 else 3553 timeout_ms = -1; 3554 3555 return poll(fds, nfds, timeout_ms); 3556} 3557#endif 3558 3559/* 3560 * returns a mask of events 3561 */ 3562int 3563rb_wait_for_single_fd(int fd, int events, struct timeval *tv) 3564{ 3565 struct pollfd fds; 3566 int result = 0, lerrno; 3567 double limit = 0; 3568 struct timespec ts; 3569 struct timespec *timeout = NULL; 3570 rb_thread_t *th = GET_THREAD(); 3571 3572 if (tv) { 3573 ts.tv_sec = tv->tv_sec; 3574 ts.tv_nsec = tv->tv_usec * 1000; 3575 limit = timeofday(); 3576 limit += (double)tv->tv_sec + (double)tv->tv_usec * 1e-6; 3577 timeout = &ts; 3578 } 3579 3580 fds.fd = fd; 3581 fds.events = (short)events; 3582 3583retry: 3584 lerrno = 0; 3585 BLOCKING_REGION({ 3586 result = ppoll(&fds, 1, timeout, NULL); 3587 if (result < 0) lerrno = errno; 3588 }, ubf_select, th, FALSE); 3589 3590 RUBY_VM_CHECK_INTS_BLOCKING(th); 3591 3592 if (result < 0) { 3593 errno = lerrno; 3594 switch (errno) { 3595 case EINTR: 3596#ifdef ERESTART 3597 case ERESTART: 3598#endif 3599 if (timeout) { 3600 double d = limit - timeofday(); 3601 3602 ts.tv_sec = (long)d; 3603 ts.tv_nsec = (long)((d - (double)ts.tv_sec) * 1e9); 3604 if (ts.tv_sec < 0) 3605 ts.tv_sec = 0; 3606 if (ts.tv_nsec < 0) 3607 ts.tv_nsec = 0; 3608 } 3609 goto retry; 3610 } 3611 return -1; 3612 } 3613 3614 if (fds.revents & POLLNVAL) { 3615 errno = EBADF; 3616 return -1; 3617 } 3618 3619 /* 3620 * POLLIN, POLLOUT have a different meanings from select(2)'s read/write bit. 3621 * Therefore we need fix it up. 3622 */ 3623 result = 0; 3624 if (fds.revents & POLLIN_SET) 3625 result |= RB_WAITFD_IN; 3626 if (fds.revents & POLLOUT_SET) 3627 result |= RB_WAITFD_OUT; 3628 if (fds.revents & POLLEX_SET) 3629 result |= RB_WAITFD_PRI; 3630 3631 return result; 3632} 3633#else /* ! USE_POLL - implement rb_io_poll_fd() using select() */ 3634static rb_fdset_t * 3635init_set_fd(int fd, rb_fdset_t *fds) 3636{ 3637 rb_fd_init(fds); 3638 rb_fd_set(fd, fds); 3639 3640 return fds; 3641} 3642 3643struct select_args { 3644 union { 3645 int fd; 3646 int error; 3647 } as; 3648 rb_fdset_t *read; 3649 rb_fdset_t *write; 3650 rb_fdset_t *except; 3651 struct timeval *tv; 3652}; 3653 3654static VALUE 3655select_single(VALUE ptr) 3656{ 3657 struct select_args *args = (struct select_args *)ptr; 3658 int r; 3659 3660 r = rb_thread_fd_select(args->as.fd + 1, 3661 args->read, args->write, args->except, args->tv); 3662 if (r == -1) 3663 args->as.error = errno; 3664 if (r > 0) { 3665 r = 0; 3666 if (args->read && rb_fd_isset(args->as.fd, args->read)) 3667 r |= RB_WAITFD_IN; 3668 if (args->write && rb_fd_isset(args->as.fd, args->write)) 3669 r |= RB_WAITFD_OUT; 3670 if (args->except && rb_fd_isset(args->as.fd, args->except)) 3671 r |= RB_WAITFD_PRI; 3672 } 3673 return (VALUE)r; 3674} 3675 3676static VALUE 3677select_single_cleanup(VALUE ptr) 3678{ 3679 struct select_args *args = (struct select_args *)ptr; 3680 3681 if (args->read) rb_fd_term(args->read); 3682 if (args->write) rb_fd_term(args->write); 3683 if (args->except) rb_fd_term(args->except); 3684 3685 return (VALUE)-1; 3686} 3687 3688int 3689rb_wait_for_single_fd(int fd, int events, struct timeval *tv) 3690{ 3691 rb_fdset_t rfds, wfds, efds; 3692 struct select_args args; 3693 int r; 3694 VALUE ptr = (VALUE)&args; 3695 3696 args.as.fd = fd; 3697 args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; 3698 args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; 3699 args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; 3700 args.tv = tv; 3701 3702 r = (int)rb_ensure(select_single, ptr, select_single_cleanup, ptr); 3703 if (r == -1) 3704 errno = args.as.error; 3705 3706 return r; 3707} 3708#endif /* ! USE_POLL */ 3709 3710/* 3711 * for GC 3712 */ 3713 3714#ifdef USE_CONSERVATIVE_STACK_END 3715void 3716rb_gc_set_stack_end(VALUE **stack_end_p) 3717{ 3718 VALUE stack_end; 3719 *stack_end_p = &stack_end; 3720} 3721#endif 3722 3723 3724/* 3725 * 3726 */ 3727 3728void 3729rb_threadptr_check_signal(rb_thread_t *mth) 3730{ 3731 /* mth must be main_thread */ 3732 if (rb_signal_buff_size() > 0) { 3733 /* wakeup main thread */ 3734 rb_threadptr_trap_interrupt(mth); 3735 } 3736} 3737 3738static void 3739timer_thread_function(void *arg) 3740{ 3741 rb_vm_t *vm = GET_VM(); /* TODO: fix me for Multi-VM */ 3742 3743 /* 3744 * Tricky: thread_destruct_lock doesn't close a race against 3745 * vm->running_thread switch. however it guarantee th->running_thread 3746 * point to valid pointer or NULL. 3747 */ 3748 native_mutex_lock(&vm->thread_destruct_lock); 3749 /* for time slice */ 3750 if (vm->running_thread) 3751 RUBY_VM_SET_TIMER_INTERRUPT(vm->running_thread); 3752 native_mutex_unlock(&vm->thread_destruct_lock); 3753 3754 /* check signal */ 3755 rb_threadptr_check_signal(vm->main_thread); 3756 3757#if 0 3758 /* prove profiler */ 3759 if (vm->prove_profile.enable) { 3760 rb_thread_t *th = vm->running_thread; 3761 3762 if (vm->during_gc) { 3763 /* GC prove profiling */ 3764 } 3765 } 3766#endif 3767} 3768 3769void 3770rb_thread_stop_timer_thread(int close_anyway) 3771{ 3772 if (timer_thread_id && native_stop_timer_thread(close_anyway)) { 3773 native_reset_timer_thread(); 3774 } 3775} 3776 3777void 3778rb_thread_reset_timer_thread(void) 3779{ 3780 native_reset_timer_thread(); 3781} 3782 3783void 3784rb_thread_start_timer_thread(void) 3785{ 3786 system_working = 1; 3787 rb_thread_create_timer_thread(); 3788} 3789 3790static int 3791clear_coverage_i(st_data_t key, st_data_t val, st_data_t dummy) 3792{ 3793 int i; 3794 VALUE lines = (VALUE)val; 3795 3796 for (i = 0; i < RARRAY_LEN(lines); i++) { 3797 if (RARRAY_PTR(lines)[i] != Qnil) { 3798 RARRAY_PTR(lines)[i] = INT2FIX(0); 3799 } 3800 } 3801 return ST_CONTINUE; 3802} 3803 3804static void 3805clear_coverage(void) 3806{ 3807 VALUE coverages = rb_get_coverages(); 3808 if (RTEST(coverages)) { 3809 st_foreach(RHASH_TBL(coverages), clear_coverage_i, 0); 3810 } 3811} 3812 3813static void 3814rb_thread_atfork_internal(int (*atfork)(st_data_t, st_data_t, st_data_t)) 3815{ 3816 rb_thread_t *th = GET_THREAD(); 3817 rb_vm_t *vm = th->vm; 3818 VALUE thval = th->self; 3819 vm->main_thread = th; 3820 3821 gvl_atfork(th->vm); 3822 st_foreach(vm->living_threads, atfork, (st_data_t)th); 3823 st_clear(vm->living_threads); 3824 st_insert(vm->living_threads, thval, (st_data_t)th->thread_id); 3825 vm->sleeper = 0; 3826 clear_coverage(); 3827} 3828 3829static int 3830terminate_atfork_i(st_data_t key, st_data_t val, st_data_t current_th) 3831{ 3832 VALUE thval = key; 3833 rb_thread_t *th; 3834 GetThreadPtr(thval, th); 3835 3836 if (th != (rb_thread_t *)current_th) { 3837 rb_mutex_abandon_keeping_mutexes(th); 3838 rb_mutex_abandon_locking_mutex(th); 3839 thread_cleanup_func(th, TRUE); 3840 } 3841 return ST_CONTINUE; 3842} 3843 3844void 3845rb_thread_atfork(void) 3846{ 3847 rb_thread_atfork_internal(terminate_atfork_i); 3848 GET_THREAD()->join_list = NULL; 3849 3850 /* We don't want reproduce CVE-2003-0900. */ 3851 rb_reset_random_seed(); 3852} 3853 3854static int 3855terminate_atfork_before_exec_i(st_data_t key, st_data_t val, st_data_t current_th) 3856{ 3857 VALUE thval = key; 3858 rb_thread_t *th; 3859 GetThreadPtr(thval, th); 3860 3861 if (th != (rb_thread_t *)current_th) { 3862 thread_cleanup_func_before_exec(th); 3863 } 3864 return ST_CONTINUE; 3865} 3866 3867void 3868rb_thread_atfork_before_exec(void) 3869{ 3870 rb_thread_atfork_internal(terminate_atfork_before_exec_i); 3871} 3872 3873struct thgroup { 3874 int enclosed; 3875 VALUE group; 3876}; 3877 3878static size_t 3879thgroup_memsize(const void *ptr) 3880{ 3881 return ptr ? sizeof(struct thgroup) : 0; 3882} 3883 3884static const rb_data_type_t thgroup_data_type = { 3885 "thgroup", 3886 {NULL, RUBY_TYPED_DEFAULT_FREE, thgroup_memsize,}, 3887}; 3888 3889/* 3890 * Document-class: ThreadGroup 3891 * 3892 * <code>ThreadGroup</code> provides a means of keeping track of a number of 3893 * threads as a group. A <code>Thread</code> can belong to only one 3894 * <code>ThreadGroup</code> at a time; adding a thread to a new group will 3895 * remove it from any previous group. 3896 * 3897 * Newly created threads belong to the same group as the thread from which they 3898 * were created. 3899 */ 3900 3901/* 3902 * Document-const: Default 3903 * 3904 * The default ThreadGroup created when Ruby starts; all Threads belong to it 3905 * by default. 3906 */ 3907static VALUE 3908thgroup_s_alloc(VALUE klass) 3909{ 3910 VALUE group; 3911 struct thgroup *data; 3912 3913 group = TypedData_Make_Struct(klass, struct thgroup, &thgroup_data_type, data); 3914 data->enclosed = 0; 3915 data->group = group; 3916 3917 return group; 3918} 3919 3920struct thgroup_list_params { 3921 VALUE ary; 3922 VALUE group; 3923}; 3924 3925static int 3926thgroup_list_i(st_data_t key, st_data_t val, st_data_t data) 3927{ 3928 VALUE thread = (VALUE)key; 3929 VALUE ary = ((struct thgroup_list_params *)data)->ary; 3930 VALUE group = ((struct thgroup_list_params *)data)->group; 3931 rb_thread_t *th; 3932 GetThreadPtr(thread, th); 3933 3934 if (th->thgroup == group) { 3935 rb_ary_push(ary, thread); 3936 } 3937 return ST_CONTINUE; 3938} 3939 3940/* 3941 * call-seq: 3942 * thgrp.list -> array 3943 * 3944 * Returns an array of all existing <code>Thread</code> objects that belong to 3945 * this group. 3946 * 3947 * ThreadGroup::Default.list #=> [#<Thread:0x401bdf4c run>] 3948 */ 3949 3950static VALUE 3951thgroup_list(VALUE group) 3952{ 3953 VALUE ary = rb_ary_new(); 3954 struct thgroup_list_params param; 3955 3956 param.ary = ary; 3957 param.group = group; 3958 st_foreach(GET_THREAD()->vm->living_threads, thgroup_list_i, (st_data_t) & param); 3959 return ary; 3960} 3961 3962 3963/* 3964 * call-seq: 3965 * thgrp.enclose -> thgrp 3966 * 3967 * Prevents threads from being added to or removed from the receiving 3968 * <code>ThreadGroup</code>. New threads can still be started in an enclosed 3969 * <code>ThreadGroup</code>. 3970 * 3971 * ThreadGroup::Default.enclose #=> #<ThreadGroup:0x4029d914> 3972 * thr = Thread::new { Thread.stop } #=> #<Thread:0x402a7210 sleep> 3973 * tg = ThreadGroup::new #=> #<ThreadGroup:0x402752d4> 3974 * tg.add thr 3975 * 3976 * <em>produces:</em> 3977 * 3978 * ThreadError: can't move from the enclosed thread group 3979 */ 3980 3981static VALUE 3982thgroup_enclose(VALUE group) 3983{ 3984 struct thgroup *data; 3985 3986 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 3987 data->enclosed = 1; 3988 3989 return group; 3990} 3991 3992 3993/* 3994 * call-seq: 3995 * thgrp.enclosed? -> true or false 3996 * 3997 * Returns <code>true</code> if <em>thgrp</em> is enclosed. See also 3998 * ThreadGroup#enclose. 3999 */ 4000 4001static VALUE 4002thgroup_enclosed_p(VALUE group) 4003{ 4004 struct thgroup *data; 4005 4006 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 4007 if (data->enclosed) 4008 return Qtrue; 4009 return Qfalse; 4010} 4011 4012 4013/* 4014 * call-seq: 4015 * thgrp.add(thread) -> thgrp 4016 * 4017 * Adds the given <em>thread</em> to this group, removing it from any other 4018 * group to which it may have previously belonged. 4019 * 4020 * puts "Initial group is #{ThreadGroup::Default.list}" 4021 * tg = ThreadGroup.new 4022 * t1 = Thread.new { sleep } 4023 * t2 = Thread.new { sleep } 4024 * puts "t1 is #{t1}" 4025 * puts "t2 is #{t2}" 4026 * tg.add(t1) 4027 * puts "Initial group now #{ThreadGroup::Default.list}" 4028 * puts "tg group now #{tg.list}" 4029 * 4030 * <em>produces:</em> 4031 * 4032 * Initial group is #<Thread:0x401bdf4c> 4033 * t1 is #<Thread:0x401b3c90> 4034 * t2 is #<Thread:0x401b3c18> 4035 * Initial group now #<Thread:0x401b3c18>#<Thread:0x401bdf4c> 4036 * tg group now #<Thread:0x401b3c90> 4037 */ 4038 4039static VALUE 4040thgroup_add(VALUE group, VALUE thread) 4041{ 4042 rb_thread_t *th; 4043 struct thgroup *data; 4044 4045 rb_secure(4); 4046 GetThreadPtr(thread, th); 4047 4048 if (OBJ_FROZEN(group)) { 4049 rb_raise(rb_eThreadError, "can't move to the frozen thread group"); 4050 } 4051 TypedData_Get_Struct(group, struct thgroup, &thgroup_data_type, data); 4052 if (data->enclosed) { 4053 rb_raise(rb_eThreadError, "can't move to the enclosed thread group"); 4054 } 4055 4056 if (!th->thgroup) { 4057 return Qnil; 4058 } 4059 4060 if (OBJ_FROZEN(th->thgroup)) { 4061 rb_raise(rb_eThreadError, "can't move from the frozen thread group"); 4062 } 4063 TypedData_Get_Struct(th->thgroup, struct thgroup, &thgroup_data_type, data); 4064 if (data->enclosed) { 4065 rb_raise(rb_eThreadError, 4066 "can't move from the enclosed thread group"); 4067 } 4068 4069 th->thgroup = group; 4070 return group; 4071} 4072 4073 4074/* 4075 * Document-class: Mutex 4076 * 4077 * Mutex implements a simple semaphore that can be used to coordinate access to 4078 * shared data from multiple concurrent threads. 4079 * 4080 * Example: 4081 * 4082 * require 'thread' 4083 * semaphore = Mutex.new 4084 * 4085 * a = Thread.new { 4086 * semaphore.synchronize { 4087 * # access shared resource 4088 * } 4089 * } 4090 * 4091 * b = Thread.new { 4092 * semaphore.synchronize { 4093 * # access shared resource 4094 * } 4095 * } 4096 * 4097 */ 4098 4099#define GetMutexPtr(obj, tobj) \ 4100 TypedData_Get_Struct((obj), rb_mutex_t, &mutex_data_type, (tobj)) 4101 4102#define mutex_mark NULL 4103 4104static void 4105mutex_free(void *ptr) 4106{ 4107 if (ptr) { 4108 rb_mutex_t *mutex = ptr; 4109 if (mutex->th) { 4110 /* rb_warn("free locked mutex"); */ 4111 const char *err = rb_mutex_unlock_th(mutex, mutex->th); 4112 if (err) rb_bug("%s", err); 4113 } 4114 native_mutex_destroy(&mutex->lock); 4115 native_cond_destroy(&mutex->cond); 4116 } 4117 ruby_xfree(ptr); 4118} 4119 4120static size_t 4121mutex_memsize(const void *ptr) 4122{ 4123 return ptr ? sizeof(rb_mutex_t) : 0; 4124} 4125 4126static const rb_data_type_t mutex_data_type = { 4127 "mutex", 4128 {mutex_mark, mutex_free, mutex_memsize,}, 4129}; 4130 4131VALUE 4132rb_obj_is_mutex(VALUE obj) 4133{ 4134 if (rb_typeddata_is_kind_of(obj, &mutex_data_type)) { 4135 return Qtrue; 4136 } 4137 else { 4138 return Qfalse; 4139 } 4140} 4141 4142static VALUE 4143mutex_alloc(VALUE klass) 4144{ 4145 VALUE volatile obj; 4146 rb_mutex_t *mutex; 4147 4148 obj = TypedData_Make_Struct(klass, rb_mutex_t, &mutex_data_type, mutex); 4149 native_mutex_initialize(&mutex->lock); 4150 native_cond_initialize(&mutex->cond, RB_CONDATTR_CLOCK_MONOTONIC); 4151 return obj; 4152} 4153 4154/* 4155 * call-seq: 4156 * Mutex.new -> mutex 4157 * 4158 * Creates a new Mutex 4159 */ 4160static VALUE 4161mutex_initialize(VALUE self) 4162{ 4163 return self; 4164} 4165 4166VALUE 4167rb_mutex_new(void) 4168{ 4169 return mutex_alloc(rb_cMutex); 4170} 4171 4172/* 4173 * call-seq: 4174 * mutex.locked? -> true or false 4175 * 4176 * Returns +true+ if this lock is currently held by some thread. 4177 */ 4178VALUE 4179rb_mutex_locked_p(VALUE self) 4180{ 4181 rb_mutex_t *mutex; 4182 GetMutexPtr(self, mutex); 4183 return mutex->th ? Qtrue : Qfalse; 4184} 4185 4186static void 4187mutex_locked(rb_thread_t *th, VALUE self) 4188{ 4189 rb_mutex_t *mutex; 4190 GetMutexPtr(self, mutex); 4191 4192 if (th->keeping_mutexes) { 4193 mutex->next_mutex = th->keeping_mutexes; 4194 } 4195 th->keeping_mutexes = mutex; 4196} 4197 4198/* 4199 * call-seq: 4200 * mutex.try_lock -> true or false 4201 * 4202 * Attempts to obtain the lock and returns immediately. Returns +true+ if the 4203 * lock was granted. 4204 */ 4205VALUE 4206rb_mutex_trylock(VALUE self) 4207{ 4208 rb_mutex_t *mutex; 4209 VALUE locked = Qfalse; 4210 GetMutexPtr(self, mutex); 4211 4212 native_mutex_lock(&mutex->lock); 4213 if (mutex->th == 0) { 4214 mutex->th = GET_THREAD(); 4215 locked = Qtrue; 4216 4217 mutex_locked(GET_THREAD(), self); 4218 } 4219 native_mutex_unlock(&mutex->lock); 4220 4221 return locked; 4222} 4223 4224static int 4225lock_func(rb_thread_t *th, rb_mutex_t *mutex, int timeout_ms) 4226{ 4227 int interrupted = 0; 4228 int err = 0; 4229 4230 mutex->cond_waiting++; 4231 for (;;) { 4232 if (!mutex->th) { 4233 mutex->th = th; 4234 break; 4235 } 4236 if (RUBY_VM_INTERRUPTED(th)) { 4237 interrupted = 1; 4238 break; 4239 } 4240 if (err == ETIMEDOUT) { 4241 interrupted = 2; 4242 break; 4243 } 4244 4245 if (timeout_ms) { 4246 struct timespec timeout_rel; 4247 struct timespec timeout; 4248 4249 timeout_rel.tv_sec = 0; 4250 timeout_rel.tv_nsec = timeout_ms * 1000 * 1000; 4251 timeout = native_cond_timeout(&mutex->cond, timeout_rel); 4252 err = native_cond_timedwait(&mutex->cond, &mutex->lock, &timeout); 4253 } 4254 else { 4255 native_cond_wait(&mutex->cond, &mutex->lock); 4256 err = 0; 4257 } 4258 } 4259 mutex->cond_waiting--; 4260 4261 return interrupted; 4262} 4263 4264static void 4265lock_interrupt(void *ptr) 4266{ 4267 rb_mutex_t *mutex = (rb_mutex_t *)ptr; 4268 native_mutex_lock(&mutex->lock); 4269 if (mutex->cond_waiting > 0) 4270 native_cond_broadcast(&mutex->cond); 4271 native_mutex_unlock(&mutex->lock); 4272} 4273 4274/* 4275 * At maximum, only one thread can use cond_timedwait and watch deadlock 4276 * periodically. Multiple polling thread (i.e. concurrent deadlock check) 4277 * introduces new race conditions. [Bug #6278] [ruby-core:44275] 4278 */ 4279static const rb_thread_t *patrol_thread = NULL; 4280 4281/* 4282 * call-seq: 4283 * mutex.lock -> self 4284 * 4285 * Attempts to grab the lock and waits if it isn't available. 4286 * Raises +ThreadError+ if +mutex+ was locked by the current thread. 4287 */ 4288VALUE 4289rb_mutex_lock(VALUE self) 4290{ 4291 rb_thread_t *th = GET_THREAD(); 4292 rb_mutex_t *mutex; 4293 GetMutexPtr(self, mutex); 4294 4295 /* When running trap handler */ 4296 if (!mutex->allow_trap && th->interrupt_mask & TRAP_INTERRUPT_MASK) { 4297 rb_raise(rb_eThreadError, "can't be called from trap context"); 4298 } 4299 4300 if (rb_mutex_trylock(self) == Qfalse) { 4301 if (mutex->th == GET_THREAD()) { 4302 rb_raise(rb_eThreadError, "deadlock; recursive locking"); 4303 } 4304 4305 while (mutex->th != th) { 4306 int interrupted; 4307 enum rb_thread_status prev_status = th->status; 4308 volatile int timeout_ms = 0; 4309 struct rb_unblock_callback oldubf; 4310 4311 set_unblock_function(th, lock_interrupt, mutex, &oldubf, FALSE); 4312 th->status = THREAD_STOPPED_FOREVER; 4313 th->locking_mutex = self; 4314 4315 native_mutex_lock(&mutex->lock); 4316 th->vm->sleeper++; 4317 /* 4318 * Carefully! while some contended threads are in lock_func(), 4319 * vm->sleepr is unstable value. we have to avoid both deadlock 4320 * and busy loop. 4321 */ 4322 if ((vm_living_thread_num(th->vm) == th->vm->sleeper) && 4323 !patrol_thread) { 4324 timeout_ms = 100; 4325 patrol_thread = th; 4326 } 4327 4328 GVL_UNLOCK_BEGIN(); 4329 interrupted = lock_func(th, mutex, (int)timeout_ms); 4330 native_mutex_unlock(&mutex->lock); 4331 GVL_UNLOCK_END(); 4332 4333 if (patrol_thread == th) 4334 patrol_thread = NULL; 4335 4336 reset_unblock_function(th, &oldubf); 4337 4338 th->locking_mutex = Qfalse; 4339 if (mutex->th && interrupted == 2) { 4340 rb_check_deadlock(th->vm); 4341 } 4342 if (th->status == THREAD_STOPPED_FOREVER) { 4343 th->status = prev_status; 4344 } 4345 th->vm->sleeper--; 4346 4347 if (mutex->th == th) mutex_locked(th, self); 4348 4349 if (interrupted) { 4350 RUBY_VM_CHECK_INTS_BLOCKING(th); 4351 } 4352 } 4353 } 4354 return self; 4355} 4356 4357/* 4358 * call-seq: 4359 * mutex.owned? -> true or false 4360 * 4361 * Returns +true+ if this lock is currently held by current thread. 4362 * <em>This API is experimental, and subject to change.</em> 4363 */ 4364VALUE 4365rb_mutex_owned_p(VALUE self) 4366{ 4367 VALUE owned = Qfalse; 4368 rb_thread_t *th = GET_THREAD(); 4369 rb_mutex_t *mutex; 4370 4371 GetMutexPtr(self, mutex); 4372 4373 if (mutex->th == th) 4374 owned = Qtrue; 4375 4376 return owned; 4377} 4378 4379static const char * 4380rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th) 4381{ 4382 const char *err = NULL; 4383 4384 native_mutex_lock(&mutex->lock); 4385 4386 if (mutex->th == 0) { 4387 err = "Attempt to unlock a mutex which is not locked"; 4388 } 4389 else if (mutex->th != th) { 4390 err = "Attempt to unlock a mutex which is locked by another thread"; 4391 } 4392 else { 4393 mutex->th = 0; 4394 if (mutex->cond_waiting > 0) 4395 native_cond_signal(&mutex->cond); 4396 } 4397 4398 native_mutex_unlock(&mutex->lock); 4399 4400 if (!err) { 4401 rb_mutex_t *volatile *th_mutex = &th->keeping_mutexes; 4402 while (*th_mutex != mutex) { 4403 th_mutex = &(*th_mutex)->next_mutex; 4404 } 4405 *th_mutex = mutex->next_mutex; 4406 mutex->next_mutex = NULL; 4407 } 4408 4409 return err; 4410} 4411 4412/* 4413 * call-seq: 4414 * mutex.unlock -> self 4415 * 4416 * Releases the lock. 4417 * Raises +ThreadError+ if +mutex+ wasn't locked by the current thread. 4418 */ 4419VALUE 4420rb_mutex_unlock(VALUE self) 4421{ 4422 const char *err; 4423 rb_mutex_t *mutex; 4424 GetMutexPtr(self, mutex); 4425 4426 err = rb_mutex_unlock_th(mutex, GET_THREAD()); 4427 if (err) rb_raise(rb_eThreadError, "%s", err); 4428 4429 return self; 4430} 4431 4432static void 4433rb_mutex_abandon_keeping_mutexes(rb_thread_t *th) 4434{ 4435 if (th->keeping_mutexes) { 4436 rb_mutex_abandon_all(th->keeping_mutexes); 4437 } 4438 th->keeping_mutexes = NULL; 4439} 4440 4441static void 4442rb_mutex_abandon_locking_mutex(rb_thread_t *th) 4443{ 4444 rb_mutex_t *mutex; 4445 4446 if (!th->locking_mutex) return; 4447 4448 GetMutexPtr(th->locking_mutex, mutex); 4449 if (mutex->th == th) 4450 rb_mutex_abandon_all(mutex); 4451 th->locking_mutex = Qfalse; 4452} 4453 4454static void 4455rb_mutex_abandon_all(rb_mutex_t *mutexes) 4456{ 4457 rb_mutex_t *mutex; 4458 4459 while (mutexes) { 4460 mutex = mutexes; 4461 mutexes = mutex->next_mutex; 4462 mutex->th = 0; 4463 mutex->next_mutex = 0; 4464 } 4465} 4466 4467static VALUE 4468rb_mutex_sleep_forever(VALUE time) 4469{ 4470 sleep_forever(GET_THREAD(), 1, 0); /* permit spurious check */ 4471 return Qnil; 4472} 4473 4474static VALUE 4475rb_mutex_wait_for(VALUE time) 4476{ 4477 struct timeval *t = (struct timeval *)time; 4478 sleep_timeval(GET_THREAD(), *t, 0); /* permit spurious check */ 4479 return Qnil; 4480} 4481 4482VALUE 4483rb_mutex_sleep(VALUE self, VALUE timeout) 4484{ 4485 time_t beg, end; 4486 struct timeval t; 4487 4488 if (!NIL_P(timeout)) { 4489 t = rb_time_interval(timeout); 4490 } 4491 rb_mutex_unlock(self); 4492 beg = time(0); 4493 if (NIL_P(timeout)) { 4494 rb_ensure(rb_mutex_sleep_forever, Qnil, rb_mutex_lock, self); 4495 } 4496 else { 4497 rb_ensure(rb_mutex_wait_for, (VALUE)&t, rb_mutex_lock, self); 4498 } 4499 end = time(0) - beg; 4500 return INT2FIX(end); 4501} 4502 4503/* 4504 * call-seq: 4505 * mutex.sleep(timeout = nil) -> number 4506 * 4507 * Releases the lock and sleeps +timeout+ seconds if it is given and 4508 * non-nil or forever. Raises +ThreadError+ if +mutex+ wasn't locked by 4509 * the current thread. 4510 * 4511 * Note that this method can wakeup without explicit Thread#wakeup call. 4512 * For example, receiving signal and so on. 4513 */ 4514static VALUE 4515mutex_sleep(int argc, VALUE *argv, VALUE self) 4516{ 4517 VALUE timeout; 4518 4519 rb_scan_args(argc, argv, "01", &timeout); 4520 return rb_mutex_sleep(self, timeout); 4521} 4522 4523/* 4524 * call-seq: 4525 * mutex.synchronize { ... } -> result of the block 4526 * 4527 * Obtains a lock, runs the block, and releases the lock when the block 4528 * completes. See the example under +Mutex+. 4529 */ 4530 4531VALUE 4532rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg) 4533{ 4534 rb_mutex_lock(mutex); 4535 return rb_ensure(func, arg, rb_mutex_unlock, mutex); 4536} 4537 4538/* 4539 * call-seq: 4540 * mutex.synchronize { ... } -> result of the block 4541 * 4542 * Obtains a lock, runs the block, and releases the lock when the block 4543 * completes. See the example under +Mutex+. 4544 */ 4545static VALUE 4546rb_mutex_synchronize_m(VALUE self, VALUE args) 4547{ 4548 if (!rb_block_given_p()) { 4549 rb_raise(rb_eThreadError, "must be called with a block"); 4550 } 4551 4552 return rb_mutex_synchronize(self, rb_yield, Qundef); 4553} 4554 4555void rb_mutex_allow_trap(VALUE self, int val) 4556{ 4557 rb_mutex_t *m; 4558 GetMutexPtr(self, m); 4559 4560 m->allow_trap = val; 4561} 4562 4563/* 4564 * Document-class: ThreadShield 4565 */ 4566static void 4567thread_shield_mark(void *ptr) 4568{ 4569 rb_gc_mark((VALUE)ptr); 4570} 4571 4572static const rb_data_type_t thread_shield_data_type = { 4573 "thread_shield", 4574 {thread_shield_mark, 0, 0,}, 4575}; 4576 4577static VALUE 4578thread_shield_alloc(VALUE klass) 4579{ 4580 return TypedData_Wrap_Struct(klass, &thread_shield_data_type, (void *)mutex_alloc(0)); 4581} 4582 4583#define GetThreadShieldPtr(obj) ((VALUE)rb_check_typeddata((obj), &thread_shield_data_type)) 4584#define THREAD_SHIELD_WAITING_MASK (FL_USER0|FL_USER1|FL_USER2|FL_USER3|FL_USER4|FL_USER5|FL_USER6|FL_USER7|FL_USER8|FL_USER9|FL_USER10|FL_USER11|FL_USER12|FL_USER13|FL_USER14|FL_USER15|FL_USER16|FL_USER17|FL_USER18|FL_USER19) 4585#define THREAD_SHIELD_WAITING_SHIFT (FL_USHIFT) 4586#define rb_thread_shield_waiting(b) (int)((RBASIC(b)->flags&THREAD_SHIELD_WAITING_MASK)>>THREAD_SHIELD_WAITING_SHIFT) 4587 4588static inline void 4589rb_thread_shield_waiting_inc(VALUE b) 4590{ 4591 unsigned int w = rb_thread_shield_waiting(b); 4592 w++; 4593 if (w > (unsigned int)(THREAD_SHIELD_WAITING_MASK>>THREAD_SHIELD_WAITING_SHIFT)) 4594 rb_raise(rb_eRuntimeError, "waiting count overflow"); 4595 RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK; 4596 RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT); 4597} 4598 4599static inline void 4600rb_thread_shield_waiting_dec(VALUE b) 4601{ 4602 unsigned int w = rb_thread_shield_waiting(b); 4603 if (!w) rb_raise(rb_eRuntimeError, "waiting count underflow"); 4604 w--; 4605 RBASIC(b)->flags &= ~THREAD_SHIELD_WAITING_MASK; 4606 RBASIC(b)->flags |= ((VALUE)w << THREAD_SHIELD_WAITING_SHIFT); 4607} 4608 4609VALUE 4610rb_thread_shield_new(void) 4611{ 4612 VALUE thread_shield = thread_shield_alloc(rb_cThreadShield); 4613 rb_mutex_lock((VALUE)DATA_PTR(thread_shield)); 4614 return thread_shield; 4615} 4616 4617/* 4618 * Wait a thread shield. 4619 * 4620 * Returns 4621 * true: acquired the thread shield 4622 * false: the thread shield was destroyed and no other threads waiting 4623 * nil: the thread shield was destroyed but still in use 4624 */ 4625VALUE 4626rb_thread_shield_wait(VALUE self) 4627{ 4628 VALUE mutex = GetThreadShieldPtr(self); 4629 rb_mutex_t *m; 4630 4631 if (!mutex) return Qfalse; 4632 GetMutexPtr(mutex, m); 4633 if (m->th == GET_THREAD()) return Qnil; 4634 rb_thread_shield_waiting_inc(self); 4635 rb_mutex_lock(mutex); 4636 rb_thread_shield_waiting_dec(self); 4637 if (DATA_PTR(self)) return Qtrue; 4638 rb_mutex_unlock(mutex); 4639 return rb_thread_shield_waiting(self) > 0 ? Qnil : Qfalse; 4640} 4641 4642/* 4643 * Release a thread shield, and return true if it has waiting threads. 4644 */ 4645VALUE 4646rb_thread_shield_release(VALUE self) 4647{ 4648 VALUE mutex = GetThreadShieldPtr(self); 4649 rb_mutex_unlock(mutex); 4650 return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse; 4651} 4652 4653/* 4654 * Release and destroy a thread shield, and return true if it has waiting threads. 4655 */ 4656VALUE 4657rb_thread_shield_destroy(VALUE self) 4658{ 4659 VALUE mutex = GetThreadShieldPtr(self); 4660 DATA_PTR(self) = 0; 4661 rb_mutex_unlock(mutex); 4662 return rb_thread_shield_waiting(self) > 0 ? Qtrue : Qfalse; 4663} 4664 4665/* variables for recursive traversals */ 4666static ID recursive_key; 4667 4668/* 4669 * Returns the current "recursive list" used to detect recursion. 4670 * This list is a hash table, unique for the current thread and for 4671 * the current __callee__. 4672 */ 4673 4674static VALUE 4675recursive_list_access(void) 4676{ 4677 volatile VALUE hash = rb_thread_local_aref(rb_thread_current(), recursive_key); 4678 VALUE sym = ID2SYM(rb_frame_this_func()); 4679 VALUE list; 4680 if (NIL_P(hash) || !RB_TYPE_P(hash, T_HASH)) { 4681 hash = rb_hash_new(); 4682 OBJ_UNTRUST(hash); 4683 rb_thread_local_aset(rb_thread_current(), recursive_key, hash); 4684 list = Qnil; 4685 } 4686 else { 4687 list = rb_hash_aref(hash, sym); 4688 } 4689 if (NIL_P(list) || !RB_TYPE_P(list, T_HASH)) { 4690 list = rb_hash_new(); 4691 OBJ_UNTRUST(list); 4692 rb_hash_aset(hash, sym, list); 4693 } 4694 return list; 4695} 4696 4697/* 4698 * Returns Qtrue iff obj_id (or the pair <obj, paired_obj>) is already 4699 * in the recursion list. 4700 * Assumes the recursion list is valid. 4701 */ 4702 4703static VALUE 4704recursive_check(VALUE list, VALUE obj_id, VALUE paired_obj_id) 4705{ 4706#if SIZEOF_LONG == SIZEOF_VOIDP 4707 #define OBJ_ID_EQL(obj_id, other) ((obj_id) == (other)) 4708#elif SIZEOF_LONG_LONG == SIZEOF_VOIDP 4709 #define OBJ_ID_EQL(obj_id, other) (RB_TYPE_P((obj_id), T_BIGNUM) ? \ 4710 rb_big_eql((obj_id), (other)) : ((obj_id) == (other))) 4711#endif 4712 4713 VALUE pair_list = rb_hash_lookup2(list, obj_id, Qundef); 4714 if (pair_list == Qundef) 4715 return Qfalse; 4716 if (paired_obj_id) { 4717 if (!RB_TYPE_P(pair_list, T_HASH)) { 4718 if (!OBJ_ID_EQL(paired_obj_id, pair_list)) 4719 return Qfalse; 4720 } 4721 else { 4722 if (NIL_P(rb_hash_lookup(pair_list, paired_obj_id))) 4723 return Qfalse; 4724 } 4725 } 4726 return Qtrue; 4727} 4728 4729/* 4730 * Pushes obj_id (or the pair <obj_id, paired_obj_id>) in the recursion list. 4731 * For a single obj_id, it sets list[obj_id] to Qtrue. 4732 * For a pair, it sets list[obj_id] to paired_obj_id if possible, 4733 * otherwise list[obj_id] becomes a hash like: 4734 * {paired_obj_id_1 => true, paired_obj_id_2 => true, ... } 4735 * Assumes the recursion list is valid. 4736 */ 4737 4738static void 4739recursive_push(VALUE list, VALUE obj, VALUE paired_obj) 4740{ 4741 VALUE pair_list; 4742 4743 if (!paired_obj) { 4744 rb_hash_aset(list, obj, Qtrue); 4745 } 4746 else if ((pair_list = rb_hash_lookup2(list, obj, Qundef)) == Qundef) { 4747 rb_hash_aset(list, obj, paired_obj); 4748 } 4749 else { 4750 if (!RB_TYPE_P(pair_list, T_HASH)){ 4751 VALUE other_paired_obj = pair_list; 4752 pair_list = rb_hash_new(); 4753 OBJ_UNTRUST(pair_list); 4754 rb_hash_aset(pair_list, other_paired_obj, Qtrue); 4755 rb_hash_aset(list, obj, pair_list); 4756 } 4757 rb_hash_aset(pair_list, paired_obj, Qtrue); 4758 } 4759} 4760 4761/* 4762 * Pops obj_id (or the pair <obj_id, paired_obj_id>) from the recursion list. 4763 * For a pair, if list[obj_id] is a hash, then paired_obj_id is 4764 * removed from the hash and no attempt is made to simplify 4765 * list[obj_id] from {only_one_paired_id => true} to only_one_paired_id 4766 * Assumes the recursion list is valid. 4767 */ 4768 4769static void 4770recursive_pop(VALUE list, VALUE obj, VALUE paired_obj) 4771{ 4772 if (paired_obj) { 4773 VALUE pair_list = rb_hash_lookup2(list, obj, Qundef); 4774 if (pair_list == Qundef) { 4775 VALUE symname = rb_inspect(ID2SYM(rb_frame_this_func())); 4776 VALUE thrname = rb_inspect(rb_thread_current()); 4777 rb_raise(rb_eTypeError, "invalid inspect_tbl pair_list for %s in %s", 4778 StringValuePtr(symname), StringValuePtr(thrname)); 4779 } 4780 if (RB_TYPE_P(pair_list, T_HASH)) { 4781 rb_hash_delete(pair_list, paired_obj); 4782 if (!RHASH_EMPTY_P(pair_list)) { 4783 return; /* keep hash until is empty */ 4784 } 4785 } 4786 } 4787 rb_hash_delete(list, obj); 4788} 4789 4790struct exec_recursive_params { 4791 VALUE (*func) (VALUE, VALUE, int); 4792 VALUE list; 4793 VALUE obj; 4794 VALUE objid; 4795 VALUE pairid; 4796 VALUE arg; 4797}; 4798 4799static VALUE 4800exec_recursive_i(VALUE tag, struct exec_recursive_params *p) 4801{ 4802 VALUE result = Qundef; 4803 int state; 4804 4805 recursive_push(p->list, p->objid, p->pairid); 4806 PUSH_TAG(); 4807 if ((state = EXEC_TAG()) == 0) { 4808 result = (*p->func)(p->obj, p->arg, FALSE); 4809 } 4810 POP_TAG(); 4811 recursive_pop(p->list, p->objid, p->pairid); 4812 if (state) 4813 JUMP_TAG(state); 4814 return result; 4815} 4816 4817/* 4818 * Calls func(obj, arg, recursive), where recursive is non-zero if the 4819 * current method is called recursively on obj, or on the pair <obj, pairid> 4820 * If outer is 0, then the innermost func will be called with recursive set 4821 * to Qtrue, otherwise the outermost func will be called. In the latter case, 4822 * all inner func are short-circuited by throw. 4823 * Implementation details: the value thrown is the recursive list which is 4824 * proper to the current method and unlikely to be catched anywhere else. 4825 * list[recursive_key] is used as a flag for the outermost call. 4826 */ 4827 4828static VALUE 4829exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE pairid, VALUE arg, int outer) 4830{ 4831 VALUE result = Qundef; 4832 struct exec_recursive_params p; 4833 int outermost; 4834 p.list = recursive_list_access(); 4835 p.objid = rb_obj_id(obj); 4836 p.obj = obj; 4837 p.pairid = pairid; 4838 p.arg = arg; 4839 outermost = outer && !recursive_check(p.list, ID2SYM(recursive_key), 0); 4840 4841 if (recursive_check(p.list, p.objid, pairid)) { 4842 if (outer && !outermost) { 4843 rb_throw_obj(p.list, p.list); 4844 } 4845 return (*func)(obj, arg, TRUE); 4846 } 4847 else { 4848 p.func = func; 4849 4850 if (outermost) { 4851 recursive_push(p.list, ID2SYM(recursive_key), 0); 4852 result = rb_catch_obj(p.list, exec_recursive_i, (VALUE)&p); 4853 recursive_pop(p.list, ID2SYM(recursive_key), 0); 4854 if (result == p.list) { 4855 result = (*func)(obj, arg, TRUE); 4856 } 4857 } 4858 else { 4859 result = exec_recursive_i(0, &p); 4860 } 4861 } 4862 *(volatile struct exec_recursive_params *)&p; 4863 return result; 4864} 4865 4866/* 4867 * Calls func(obj, arg, recursive), where recursive is non-zero if the 4868 * current method is called recursively on obj 4869 */ 4870 4871VALUE 4872rb_exec_recursive(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) 4873{ 4874 return exec_recursive(func, obj, 0, arg, 0); 4875} 4876 4877/* 4878 * Calls func(obj, arg, recursive), where recursive is non-zero if the 4879 * current method is called recursively on the ordered pair <obj, paired_obj> 4880 */ 4881 4882VALUE 4883rb_exec_recursive_paired(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg) 4884{ 4885 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 0); 4886} 4887 4888/* 4889 * If recursion is detected on the current method and obj, the outermost 4890 * func will be called with (obj, arg, Qtrue). All inner func will be 4891 * short-circuited using throw. 4892 */ 4893 4894VALUE 4895rb_exec_recursive_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE arg) 4896{ 4897 return exec_recursive(func, obj, 0, arg, 1); 4898} 4899 4900/* 4901 * If recursion is detected on the current method, obj and paired_obj, 4902 * the outermost func will be called with (obj, arg, Qtrue). All inner 4903 * func will be short-circuited using throw. 4904 */ 4905 4906VALUE 4907rb_exec_recursive_paired_outer(VALUE (*func) (VALUE, VALUE, int), VALUE obj, VALUE paired_obj, VALUE arg) 4908{ 4909 return exec_recursive(func, obj, rb_obj_id(paired_obj), arg, 1); 4910} 4911 4912/* 4913 * call-seq: 4914 * thr.backtrace -> array 4915 * 4916 * Returns the current backtrace of the target thread. 4917 * 4918 */ 4919 4920static VALUE 4921rb_thread_backtrace_m(int argc, VALUE *argv, VALUE thval) 4922{ 4923 return vm_thread_backtrace(argc, argv, thval); 4924} 4925 4926/* call-seq: 4927 * thr.backtrace_locations(*args) -> array or nil 4928 * 4929 * Returns the execution stack for the target thread---an array containing 4930 * backtrace location objects. 4931 * 4932 * See Thread::Backtrace::Location for more information. 4933 * 4934 * This method behaves similarly to Kernel#caller_locations except it applies 4935 * to a specific thread. 4936 */ 4937static VALUE 4938rb_thread_backtrace_locations_m(int argc, VALUE *argv, VALUE thval) 4939{ 4940 return vm_thread_backtrace_locations(argc, argv, thval); 4941} 4942 4943/* 4944 * Document-class: ThreadError 4945 * 4946 * Raised when an invalid operation is attempted on a thread. 4947 * 4948 * For example, when no other thread has been started: 4949 * 4950 * Thread.stop 4951 * 4952 * <em>raises the exception:</em> 4953 * 4954 * ThreadError: stopping only thread 4955 */ 4956 4957/* 4958 * +Thread+ encapsulates the behavior of a thread of 4959 * execution, including the main thread of the Ruby script. 4960 * 4961 * In the descriptions of the methods in this class, the parameter _sym_ 4962 * refers to a symbol, which is either a quoted string or a 4963 * +Symbol+ (such as <code>:name</code>). 4964 */ 4965 4966void 4967Init_Thread(void) 4968{ 4969#undef rb_intern 4970#define rb_intern(str) rb_intern_const(str) 4971 4972 VALUE cThGroup; 4973 rb_thread_t *th = GET_THREAD(); 4974 4975 sym_never = ID2SYM(rb_intern("never")); 4976 sym_immediate = ID2SYM(rb_intern("immediate")); 4977 sym_on_blocking = ID2SYM(rb_intern("on_blocking")); 4978 4979 rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1); 4980 rb_define_singleton_method(rb_cThread, "start", thread_start, -2); 4981 rb_define_singleton_method(rb_cThread, "fork", thread_start, -2); 4982 rb_define_singleton_method(rb_cThread, "main", rb_thread_s_main, 0); 4983 rb_define_singleton_method(rb_cThread, "current", thread_s_current, 0); 4984 rb_define_singleton_method(rb_cThread, "stop", rb_thread_stop, 0); 4985 rb_define_singleton_method(rb_cThread, "kill", rb_thread_s_kill, 1); 4986 rb_define_singleton_method(rb_cThread, "exit", rb_thread_exit, 0); 4987 rb_define_singleton_method(rb_cThread, "pass", thread_s_pass, 0); 4988 rb_define_singleton_method(rb_cThread, "list", rb_thread_list, 0); 4989 rb_define_singleton_method(rb_cThread, "abort_on_exception", rb_thread_s_abort_exc, 0); 4990 rb_define_singleton_method(rb_cThread, "abort_on_exception=", rb_thread_s_abort_exc_set, 1); 4991#if THREAD_DEBUG < 0 4992 rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0); 4993 rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1); 4994#endif 4995 rb_define_singleton_method(rb_cThread, "handle_interrupt", rb_thread_s_handle_interrupt, 1); 4996 rb_define_singleton_method(rb_cThread, "pending_interrupt?", rb_thread_s_pending_interrupt_p, -1); 4997 rb_define_method(rb_cThread, "pending_interrupt?", rb_thread_pending_interrupt_p, -1); 4998 4999 rb_define_method(rb_cThread, "initialize", thread_initialize, -2); 5000 rb_define_method(rb_cThread, "raise", thread_raise_m, -1); 5001 rb_define_method(rb_cThread, "join", thread_join_m, -1); 5002 rb_define_method(rb_cThread, "value", thread_value, 0); 5003 rb_define_method(rb_cThread, "kill", rb_thread_kill, 0); 5004 rb_define_method(rb_cThread, "terminate", rb_thread_kill, 0); 5005 rb_define_method(rb_cThread, "exit", rb_thread_kill, 0); 5006 rb_define_method(rb_cThread, "run", rb_thread_run, 0); 5007 rb_define_method(rb_cThread, "wakeup", rb_thread_wakeup, 0); 5008 rb_define_method(rb_cThread, "[]", rb_thread_aref, 1); 5009 rb_define_method(rb_cThread, "[]=", rb_thread_aset, 2); 5010 rb_define_method(rb_cThread, "key?", rb_thread_key_p, 1); 5011 rb_define_method(rb_cThread, "keys", rb_thread_keys, 0); 5012 rb_define_method(rb_cThread, "priority", rb_thread_priority, 0); 5013 rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1); 5014 rb_define_method(rb_cThread, "status", rb_thread_status, 0); 5015 rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1); 5016 rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2); 5017 rb_define_method(rb_cThread, "thread_variables", rb_thread_variables, 0); 5018 rb_define_method(rb_cThread, "thread_variable?", rb_thread_variable_p, 1); 5019 rb_define_method(rb_cThread, "alive?", rb_thread_alive_p, 0); 5020 rb_define_method(rb_cThread, "stop?", rb_thread_stop_p, 0); 5021 rb_define_method(rb_cThread, "abort_on_exception", rb_thread_abort_exc, 0); 5022 rb_define_method(rb_cThread, "abort_on_exception=", rb_thread_abort_exc_set, 1); 5023 rb_define_method(rb_cThread, "safe_level", rb_thread_safe_level, 0); 5024 rb_define_method(rb_cThread, "group", rb_thread_group, 0); 5025 rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1); 5026 rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1); 5027 5028 rb_define_method(rb_cThread, "inspect", rb_thread_inspect, 0); 5029 5030 closed_stream_error = rb_exc_new2(rb_eIOError, "stream closed"); 5031 OBJ_TAINT(closed_stream_error); 5032 OBJ_FREEZE(closed_stream_error); 5033 5034 cThGroup = rb_define_class("ThreadGroup", rb_cObject); 5035 rb_define_alloc_func(cThGroup, thgroup_s_alloc); 5036 rb_define_method(cThGroup, "list", thgroup_list, 0); 5037 rb_define_method(cThGroup, "enclose", thgroup_enclose, 0); 5038 rb_define_method(cThGroup, "enclosed?", thgroup_enclosed_p, 0); 5039 rb_define_method(cThGroup, "add", thgroup_add, 1); 5040 5041 { 5042 th->thgroup = th->vm->thgroup_default = rb_obj_alloc(cThGroup); 5043 rb_define_const(cThGroup, "Default", th->thgroup); 5044 } 5045 5046 rb_cMutex = rb_define_class("Mutex", rb_cObject); 5047 rb_define_alloc_func(rb_cMutex, mutex_alloc); 5048 rb_define_method(rb_cMutex, "initialize", mutex_initialize, 0); 5049 rb_define_method(rb_cMutex, "locked?", rb_mutex_locked_p, 0); 5050 rb_define_method(rb_cMutex, "try_lock", rb_mutex_trylock, 0); 5051 rb_define_method(rb_cMutex, "lock", rb_mutex_lock, 0); 5052 rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0); 5053 rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1); 5054 rb_define_method(rb_cMutex, "synchronize", rb_mutex_synchronize_m, 0); 5055 rb_define_method(rb_cMutex, "owned?", rb_mutex_owned_p, 0); 5056 5057 recursive_key = rb_intern("__recursive_key__"); 5058 rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError); 5059 5060 /* init thread core */ 5061 { 5062 /* main thread setting */ 5063 { 5064 /* acquire global vm lock */ 5065 gvl_init(th->vm); 5066 gvl_acquire(th->vm, th); 5067 native_mutex_initialize(&th->vm->thread_destruct_lock); 5068 native_mutex_initialize(&th->interrupt_lock); 5069 5070 th->pending_interrupt_queue = rb_ary_tmp_new(0); 5071 th->pending_interrupt_queue_checked = 0; 5072 th->pending_interrupt_mask_stack = rb_ary_tmp_new(0); 5073 5074 th->interrupt_mask = 0; 5075 } 5076 } 5077 5078 rb_thread_create_timer_thread(); 5079 5080 /* suppress warnings on cygwin, mingw and mswin.*/ 5081 (void)native_mutex_trylock; 5082} 5083 5084int 5085ruby_native_thread_p(void) 5086{ 5087 rb_thread_t *th = ruby_thread_from_native(); 5088 5089 return th != 0; 5090} 5091 5092static int 5093check_deadlock_i(st_data_t key, st_data_t val, int *found) 5094{ 5095 VALUE thval = key; 5096 rb_thread_t *th; 5097 GetThreadPtr(thval, th); 5098 5099 if (th->status != THREAD_STOPPED_FOREVER || RUBY_VM_INTERRUPTED(th)) { 5100 *found = 1; 5101 } 5102 else if (th->locking_mutex) { 5103 rb_mutex_t *mutex; 5104 GetMutexPtr(th->locking_mutex, mutex); 5105 5106 native_mutex_lock(&mutex->lock); 5107 if (mutex->th == th || (!mutex->th && mutex->cond_waiting)) { 5108 *found = 1; 5109 } 5110 native_mutex_unlock(&mutex->lock); 5111 } 5112 5113 return (*found) ? ST_STOP : ST_CONTINUE; 5114} 5115 5116#ifdef DEBUG_DEADLOCK_CHECK 5117static int 5118debug_i(st_data_t key, st_data_t val, int *found) 5119{ 5120 VALUE thval = key; 5121 rb_thread_t *th; 5122 GetThreadPtr(thval, th); 5123 5124 printf("th:%p %d %d", th, th->status, th->interrupt_flag); 5125 if (th->locking_mutex) { 5126 rb_mutex_t *mutex; 5127 GetMutexPtr(th->locking_mutex, mutex); 5128 5129 native_mutex_lock(&mutex->lock); 5130 printf(" %p %d\n", mutex->th, mutex->cond_waiting); 5131 native_mutex_unlock(&mutex->lock); 5132 } 5133 else 5134 puts(""); 5135 5136 return ST_CONTINUE; 5137} 5138#endif 5139 5140static void 5141rb_check_deadlock(rb_vm_t *vm) 5142{ 5143 int found = 0; 5144 5145 if (vm_living_thread_num(vm) > vm->sleeper) return; 5146 if (vm_living_thread_num(vm) < vm->sleeper) rb_bug("sleeper must not be more than vm_living_thread_num(vm)"); 5147 if (patrol_thread && patrol_thread != GET_THREAD()) return; 5148 5149 st_foreach(vm->living_threads, check_deadlock_i, (st_data_t)&found); 5150 5151 if (!found) { 5152 VALUE argv[2]; 5153 argv[0] = rb_eFatal; 5154 argv[1] = rb_str_new2("No live threads left. Deadlock?"); 5155#ifdef DEBUG_DEADLOCK_CHECK 5156 printf("%d %d %p %p\n", vm->living_threads->num_entries, vm->sleeper, GET_THREAD(), vm->main_thread); 5157 st_foreach(vm->living_threads, debug_i, (st_data_t)0); 5158#endif 5159 vm->sleeper--; 5160 rb_threadptr_raise(vm->main_thread, 2, argv); 5161 } 5162} 5163 5164static void 5165update_coverage(rb_event_flag_t event, VALUE proc, VALUE self, ID id, VALUE klass) 5166{ 5167 VALUE coverage = GET_THREAD()->cfp->iseq->coverage; 5168 if (coverage && RBASIC(coverage)->klass == 0) { 5169 long line = rb_sourceline() - 1; 5170 long count; 5171 if (RARRAY_PTR(coverage)[line] == Qnil) { 5172 return; 5173 } 5174 count = FIX2LONG(RARRAY_PTR(coverage)[line]) + 1; 5175 if (POSFIXABLE(count)) { 5176 RARRAY_PTR(coverage)[line] = LONG2FIX(count); 5177 } 5178 } 5179} 5180 5181VALUE 5182rb_get_coverages(void) 5183{ 5184 return GET_VM()->coverages; 5185} 5186 5187void 5188rb_set_coverages(VALUE coverages) 5189{ 5190 GET_VM()->coverages = coverages; 5191 rb_add_event_hook(update_coverage, RUBY_EVENT_COVERAGE, Qnil); 5192} 5193 5194void 5195rb_reset_coverages(void) 5196{ 5197 GET_VM()->coverages = Qfalse; 5198 rb_remove_event_hook(update_coverage); 5199} 5200 5201VALUE 5202rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data) 5203{ 5204 VALUE interrupt_mask = rb_hash_new(); 5205 rb_thread_t *cur_th = GET_THREAD(); 5206 5207 rb_hash_aset(interrupt_mask, rb_cObject, sym_never); 5208 rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask); 5209 5210 return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack); 5211} 5212