1/* 2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved. 3 * 4 * @APPLE_APACHE_LICENSE_HEADER_START@ 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 * @APPLE_APACHE_LICENSE_HEADER_END@ 19 */ 20 21#include "internal.h" 22#if HAVE_MACH 23#include "protocol.h" 24#endif 25 26#if (!HAVE_PTHREAD_WORKQUEUES || DISPATCH_DEBUG) && \ 27 !defined(DISPATCH_ENABLE_THREAD_POOL) 28#define DISPATCH_ENABLE_THREAD_POOL 1 29#endif 30#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES || DISPATCH_ENABLE_THREAD_POOL 31#define DISPATCH_USE_PTHREAD_POOL 1 32#endif 33#if HAVE_PTHREAD_WORKQUEUES && !HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP && \ 34 !defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK) 35#define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1 36#endif 37#if HAVE_PTHREAD_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \ 38 !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 39#define pthread_workqueue_t void* 40#endif 41 42static void _dispatch_cache_cleanup(void *value); 43static void _dispatch_async_f_redirect(dispatch_queue_t dq, 44 dispatch_continuation_t dc); 45static void _dispatch_queue_cleanup(void *ctxt); 46static inline void _dispatch_queue_wakeup_global2(dispatch_queue_t dq, 47 unsigned int n); 48static inline void _dispatch_queue_wakeup_global(dispatch_queue_t dq); 49static inline _dispatch_thread_semaphore_t 50 _dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq); 51#if HAVE_PTHREAD_WORKQUEUES 52static void _dispatch_worker_thread3(void *context); 53#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 54static void _dispatch_worker_thread2(int priority, int options, void *context); 55#endif 56#endif 57#if DISPATCH_USE_PTHREAD_POOL 58static void *_dispatch_worker_thread(void *context); 59static int _dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset); 60#endif 61 62#if DISPATCH_COCOA_COMPAT 63static dispatch_once_t _dispatch_main_q_port_pred; 64static dispatch_queue_t _dispatch_main_queue_wakeup(void); 65unsigned long _dispatch_runloop_queue_wakeup(dispatch_queue_t dq); 66static void _dispatch_runloop_queue_port_init(void *ctxt); 67static void _dispatch_runloop_queue_port_dispose(dispatch_queue_t dq); 68#endif 69 70#pragma mark - 71#pragma mark dispatch_root_queue 72 73#if DISPATCH_ENABLE_THREAD_POOL 74static struct dispatch_semaphore_s _dispatch_thread_mediator[] = { 75 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = { 76 .do_vtable = DISPATCH_VTABLE(semaphore), 77 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 78 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 79 }, 80 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = { 81 .do_vtable = DISPATCH_VTABLE(semaphore), 82 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 83 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 84 }, 85 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = { 86 .do_vtable = DISPATCH_VTABLE(semaphore), 87 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 88 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 89 }, 90 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = { 91 .do_vtable = DISPATCH_VTABLE(semaphore), 92 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 93 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 94 }, 95 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = { 96 .do_vtable = DISPATCH_VTABLE(semaphore), 97 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 98 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 99 }, 100 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = { 101 .do_vtable = DISPATCH_VTABLE(semaphore), 102 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 103 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 104 }, 105 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = { 106 .do_vtable = DISPATCH_VTABLE(semaphore), 107 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 108 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 109 }, 110 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = { 111 .do_vtable = DISPATCH_VTABLE(semaphore), 112 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 113 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 114 }, 115}; 116#endif 117 118#define MAX_PTHREAD_COUNT 255 119 120struct dispatch_root_queue_context_s { 121 union { 122 struct { 123 unsigned int volatile dgq_pending; 124#if HAVE_PTHREAD_WORKQUEUES 125 int dgq_wq_priority, dgq_wq_options; 126#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL 127 pthread_workqueue_t dgq_kworkqueue; 128#endif 129#endif // HAVE_PTHREAD_WORKQUEUES 130#if DISPATCH_USE_PTHREAD_POOL 131 void *dgq_ctxt; 132 dispatch_semaphore_t dgq_thread_mediator; 133 uint32_t volatile dgq_thread_pool_size; 134#endif 135 }; 136 char _dgq_pad[DISPATCH_CACHELINE_SIZE]; 137 }; 138}; 139typedef struct dispatch_root_queue_context_s *dispatch_root_queue_context_t; 140 141DISPATCH_CACHELINE_ALIGN 142static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = { 143 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = {{{ 144#if HAVE_PTHREAD_WORKQUEUES 145 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE, 146 .dgq_wq_options = 0, 147#endif 148#if DISPATCH_ENABLE_THREAD_POOL 149 .dgq_thread_mediator = &_dispatch_thread_mediator[ 150 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY], 151#endif 152 }}}, 153 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = {{{ 154#if HAVE_PTHREAD_WORKQUEUES 155 .dgq_wq_priority = WORKQ_LOW_PRIOQUEUE, 156 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, 157#endif 158#if DISPATCH_ENABLE_THREAD_POOL 159 .dgq_thread_mediator = &_dispatch_thread_mediator[ 160 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY], 161#endif 162 }}}, 163 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = {{{ 164#if HAVE_PTHREAD_WORKQUEUES 165 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE, 166 .dgq_wq_options = 0, 167#endif 168#if DISPATCH_ENABLE_THREAD_POOL 169 .dgq_thread_mediator = &_dispatch_thread_mediator[ 170 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY], 171#endif 172 }}}, 173 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = {{{ 174#if HAVE_PTHREAD_WORKQUEUES 175 .dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE, 176 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, 177#endif 178#if DISPATCH_ENABLE_THREAD_POOL 179 .dgq_thread_mediator = &_dispatch_thread_mediator[ 180 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY], 181#endif 182 }}}, 183 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = {{{ 184#if HAVE_PTHREAD_WORKQUEUES 185 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE, 186 .dgq_wq_options = 0, 187#endif 188#if DISPATCH_ENABLE_THREAD_POOL 189 .dgq_thread_mediator = &_dispatch_thread_mediator[ 190 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY], 191#endif 192 }}}, 193 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = {{{ 194#if HAVE_PTHREAD_WORKQUEUES 195 .dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE, 196 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, 197#endif 198#if DISPATCH_ENABLE_THREAD_POOL 199 .dgq_thread_mediator = &_dispatch_thread_mediator[ 200 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY], 201#endif 202 }}}, 203 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = {{{ 204#if HAVE_PTHREAD_WORKQUEUES 205 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE, 206 .dgq_wq_options = 0, 207#endif 208#if DISPATCH_ENABLE_THREAD_POOL 209 .dgq_thread_mediator = &_dispatch_thread_mediator[ 210 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY], 211#endif 212 }}}, 213 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = {{{ 214#if HAVE_PTHREAD_WORKQUEUES 215 .dgq_wq_priority = WORKQ_BG_PRIOQUEUE, 216 .dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT, 217#endif 218#if DISPATCH_ENABLE_THREAD_POOL 219 .dgq_thread_mediator = &_dispatch_thread_mediator[ 220 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY], 221#endif 222 }}}, 223}; 224 225// 6618342 Contact the team that owns the Instrument DTrace probe before 226// renaming this symbol 227// dq_running is set to 2 so that barrier operations go through the slow path 228DISPATCH_CACHELINE_ALIGN 229struct dispatch_queue_s _dispatch_root_queues[] = { 230 [DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY] = { 231 .do_vtable = DISPATCH_VTABLE(queue_root), 232 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 233 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 234 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 235 .do_ctxt = &_dispatch_root_queue_contexts[ 236 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY], 237 .dq_label = "com.apple.root.low-priority", 238 .dq_running = 2, 239 .dq_width = UINT32_MAX, 240 .dq_serialnum = 4, 241 }, 242 [DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY] = { 243 .do_vtable = DISPATCH_VTABLE(queue_root), 244 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 245 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 246 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 247 .do_ctxt = &_dispatch_root_queue_contexts[ 248 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY], 249 .dq_label = "com.apple.root.low-overcommit-priority", 250 .dq_running = 2, 251 .dq_width = UINT32_MAX, 252 .dq_serialnum = 5, 253 }, 254 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY] = { 255 .do_vtable = DISPATCH_VTABLE(queue_root), 256 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 257 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 258 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 259 .do_ctxt = &_dispatch_root_queue_contexts[ 260 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY], 261 .dq_label = "com.apple.root.default-priority", 262 .dq_running = 2, 263 .dq_width = UINT32_MAX, 264 .dq_serialnum = 6, 265 }, 266 [DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY] = { 267 .do_vtable = DISPATCH_VTABLE(queue_root), 268 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 269 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 270 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 271 .do_ctxt = &_dispatch_root_queue_contexts[ 272 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY], 273 .dq_label = "com.apple.root.default-overcommit-priority", 274 .dq_running = 2, 275 .dq_width = UINT32_MAX, 276 .dq_serialnum = 7, 277 }, 278 [DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY] = { 279 .do_vtable = DISPATCH_VTABLE(queue_root), 280 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 281 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 282 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 283 .do_ctxt = &_dispatch_root_queue_contexts[ 284 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY], 285 .dq_label = "com.apple.root.high-priority", 286 .dq_running = 2, 287 .dq_width = UINT32_MAX, 288 .dq_serialnum = 8, 289 }, 290 [DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] = { 291 .do_vtable = DISPATCH_VTABLE(queue_root), 292 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 293 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 294 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 295 .do_ctxt = &_dispatch_root_queue_contexts[ 296 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY], 297 .dq_label = "com.apple.root.high-overcommit-priority", 298 .dq_running = 2, 299 .dq_width = UINT32_MAX, 300 .dq_serialnum = 9, 301 }, 302 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY] = { 303 .do_vtable = DISPATCH_VTABLE(queue_root), 304 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 305 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 306 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 307 .do_ctxt = &_dispatch_root_queue_contexts[ 308 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY], 309 .dq_label = "com.apple.root.background-priority", 310 .dq_running = 2, 311 .dq_width = UINT32_MAX, 312 .dq_serialnum = 10, 313 }, 314 [DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY] = { 315 .do_vtable = DISPATCH_VTABLE(queue_root), 316 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 317 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 318 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 319 .do_ctxt = &_dispatch_root_queue_contexts[ 320 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY], 321 .dq_label = "com.apple.root.background-overcommit-priority", 322 .dq_running = 2, 323 .dq_width = UINT32_MAX, 324 .dq_serialnum = 11, 325 }, 326}; 327 328#if HAVE_PTHREAD_WORKQUEUES 329static const dispatch_queue_t _dispatch_wq2root_queues[][2] = { 330 [WORKQ_LOW_PRIOQUEUE][0] = &_dispatch_root_queues[ 331 DISPATCH_ROOT_QUEUE_IDX_LOW_PRIORITY], 332 [WORKQ_LOW_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] = 333 &_dispatch_root_queues[ 334 DISPATCH_ROOT_QUEUE_IDX_LOW_OVERCOMMIT_PRIORITY], 335 [WORKQ_DEFAULT_PRIOQUEUE][0] = &_dispatch_root_queues[ 336 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_PRIORITY], 337 [WORKQ_DEFAULT_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] = 338 &_dispatch_root_queues[ 339 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY], 340 [WORKQ_HIGH_PRIOQUEUE][0] = &_dispatch_root_queues[ 341 DISPATCH_ROOT_QUEUE_IDX_HIGH_PRIORITY], 342 [WORKQ_HIGH_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] = 343 &_dispatch_root_queues[ 344 DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY], 345 [WORKQ_BG_PRIOQUEUE][0] = &_dispatch_root_queues[ 346 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_PRIORITY], 347 [WORKQ_BG_PRIOQUEUE][WORKQ_ADDTHREADS_OPTION_OVERCOMMIT] = 348 &_dispatch_root_queues[ 349 DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_OVERCOMMIT_PRIORITY], 350}; 351#endif // HAVE_PTHREAD_WORKQUEUES 352 353#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES 354static struct dispatch_queue_s _dispatch_mgr_root_queue; 355#else 356#define _dispatch_mgr_root_queue \ 357 _dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_HIGH_OVERCOMMIT_PRIORITY] 358#endif 359 360// 6618342 Contact the team that owns the Instrument DTrace probe before 361// renaming this symbol 362DISPATCH_CACHELINE_ALIGN 363struct dispatch_queue_s _dispatch_mgr_q = { 364 .do_vtable = DISPATCH_VTABLE(queue_mgr), 365 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 366 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 367 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 368 .do_targetq = &_dispatch_mgr_root_queue, 369 .dq_label = "com.apple.libdispatch-manager", 370 .dq_width = 1, 371 .dq_is_thread_bound = 1, 372 .dq_serialnum = 2, 373}; 374 375dispatch_queue_t 376dispatch_get_global_queue(long priority, unsigned long flags) 377{ 378 if (flags & ~(unsigned long)DISPATCH_QUEUE_OVERCOMMIT) { 379 return NULL; 380 } 381 return _dispatch_get_root_queue(priority, 382 flags & DISPATCH_QUEUE_OVERCOMMIT); 383} 384 385DISPATCH_ALWAYS_INLINE 386static inline dispatch_queue_t 387_dispatch_get_current_queue(void) 388{ 389 return _dispatch_queue_get_current() ?: _dispatch_get_root_queue(0, true); 390} 391 392dispatch_queue_t 393dispatch_get_current_queue(void) 394{ 395 return _dispatch_get_current_queue(); 396} 397 398DISPATCH_ALWAYS_INLINE 399static inline bool 400_dispatch_queue_targets_queue(dispatch_queue_t dq1, dispatch_queue_t dq2) 401{ 402 while (dq1) { 403 if (dq1 == dq2) { 404 return true; 405 } 406 dq1 = dq1->do_targetq; 407 } 408 return false; 409} 410 411#define DISPATCH_ASSERT_QUEUE_MESSAGE "BUG in client of libdispatch: " \ 412 "Assertion failed: Block was run on an unexpected queue" 413 414DISPATCH_NOINLINE 415static void 416_dispatch_assert_queue_fail(dispatch_queue_t dq, bool expected) 417{ 418 char *msg; 419 asprintf(&msg, "%s\n%s queue: 0x%p[%s]", DISPATCH_ASSERT_QUEUE_MESSAGE, 420 expected ? "Expected" : "Unexpected", dq, dq->dq_label ? 421 dq->dq_label : ""); 422 _dispatch_log("%s", msg); 423 _dispatch_set_crash_log_message(msg); 424 _dispatch_hardware_crash(); 425 free(msg); 426} 427 428void 429dispatch_assert_queue(dispatch_queue_t dq) 430{ 431 if (slowpath(!dq) || slowpath(!(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE))) { 432 DISPATCH_CLIENT_CRASH("invalid queue passed to " 433 "dispatch_assert_queue()"); 434 } 435 dispatch_queue_t cq = _dispatch_queue_get_current(); 436 if (fastpath(cq) && fastpath(_dispatch_queue_targets_queue(cq, dq))) { 437 return; 438 } 439 _dispatch_assert_queue_fail(dq, true); 440} 441 442void 443dispatch_assert_queue_not(dispatch_queue_t dq) 444{ 445 if (slowpath(!dq) || slowpath(!(dx_metatype(dq) == _DISPATCH_QUEUE_TYPE))) { 446 DISPATCH_CLIENT_CRASH("invalid queue passed to " 447 "dispatch_assert_queue_not()"); 448 } 449 dispatch_queue_t cq = _dispatch_queue_get_current(); 450 if (slowpath(cq) && slowpath(_dispatch_queue_targets_queue(cq, dq))) { 451 _dispatch_assert_queue_fail(dq, false); 452 } 453} 454 455#if DISPATCH_DEBUG && DISPATCH_ROOT_QUEUE_DEBUG 456#define _dispatch_root_queue_debug(...) _dispatch_debug(__VA_ARGS__) 457#define _dispatch_debug_root_queue(...) dispatch_debug_queue(__VA_ARGS__) 458#else 459#define _dispatch_root_queue_debug(...) 460#define _dispatch_debug_root_queue(...) 461#endif 462 463#pragma mark - 464#pragma mark dispatch_init 465 466static void 467_dispatch_hw_config_init(void) 468{ 469 _dispatch_hw_config.cc_max_active = _dispatch_get_activecpu(); 470 _dispatch_hw_config.cc_max_logical = _dispatch_get_logicalcpu_max(); 471 _dispatch_hw_config.cc_max_physical = _dispatch_get_physicalcpu_max(); 472} 473 474static inline bool 475_dispatch_root_queues_init_workq(void) 476{ 477 bool result = false; 478#if HAVE_PTHREAD_WORKQUEUES 479 bool disable_wq = false; 480#if DISPATCH_ENABLE_THREAD_POOL && DISPATCH_DEBUG 481 disable_wq = slowpath(getenv("LIBDISPATCH_DISABLE_KWQ")); 482#endif 483 int r; 484#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 485 if (!disable_wq) { 486#if PTHREAD_WORKQUEUE_SPI_VERSION >= 20121218 487 pthread_workqueue_setdispatchoffset_np( 488 offsetof(struct dispatch_queue_s, dq_serialnum)); 489#endif 490 r = pthread_workqueue_setdispatch_np(_dispatch_worker_thread2); 491#if !DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 492 (void)dispatch_assume_zero(r); 493#endif 494 result = !r; 495 } 496#endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 497#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL 498 if (!result) { 499#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 500 pthread_workqueue_attr_t pwq_attr; 501 if (!disable_wq) { 502 r = pthread_workqueue_attr_init_np(&pwq_attr); 503 (void)dispatch_assume_zero(r); 504 } 505#endif 506 int i; 507 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) { 508 pthread_workqueue_t pwq = NULL; 509 dispatch_root_queue_context_t qc; 510 qc = &_dispatch_root_queue_contexts[i]; 511#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 512 if (!disable_wq 513#if DISPATCH_NO_BG_PRIORITY 514 && (qc->dgq_wq_priority != WORKQ_BG_PRIOQUEUE) 515#endif 516 ) { 517 r = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr, 518 qc->dgq_wq_priority); 519 (void)dispatch_assume_zero(r); 520 r = pthread_workqueue_attr_setovercommit_np(&pwq_attr, 521 qc->dgq_wq_options & 522 WORKQ_ADDTHREADS_OPTION_OVERCOMMIT); 523 (void)dispatch_assume_zero(r); 524 r = pthread_workqueue_create_np(&pwq, &pwq_attr); 525 (void)dispatch_assume_zero(r); 526 result = result || dispatch_assume(pwq); 527 } 528#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 529 qc->dgq_kworkqueue = pwq ? pwq : (void*)(~0ul); 530 } 531#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 532 if (!disable_wq) { 533 r = pthread_workqueue_attr_destroy_np(&pwq_attr); 534 (void)dispatch_assume_zero(r); 535 } 536#endif 537 } 538#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_ENABLE_THREAD_POOL 539#endif // HAVE_PTHREAD_WORKQUEUES 540 return result; 541} 542 543#if DISPATCH_USE_PTHREAD_POOL 544static inline void 545_dispatch_root_queue_init_pthread_pool(dispatch_root_queue_context_t qc, 546 bool overcommit) 547{ 548 qc->dgq_thread_pool_size = overcommit ? MAX_PTHREAD_COUNT : 549 _dispatch_hw_config.cc_max_active; 550#if USE_MACH_SEM 551 // override the default FIFO behavior for the pool semaphores 552 kern_return_t kr = semaphore_create(mach_task_self(), 553 &qc->dgq_thread_mediator->dsema_port, SYNC_POLICY_LIFO, 0); 554 DISPATCH_VERIFY_MIG(kr); 555 (void)dispatch_assume_zero(kr); 556 (void)dispatch_assume(qc->dgq_thread_mediator->dsema_port); 557#elif USE_POSIX_SEM 558 /* XXXRW: POSIX semaphores don't support LIFO? */ 559 int ret = sem_init(&qc->dgq_thread_mediator->dsema_sem, 0, 0); 560 (void)dispatch_assume_zero(ret); 561#endif 562} 563#endif // DISPATCH_USE_PTHREAD_POOL 564 565static void 566_dispatch_root_queues_init(void *context DISPATCH_UNUSED) 567{ 568 _dispatch_safe_fork = false; 569 if (!_dispatch_root_queues_init_workq()) { 570#if DISPATCH_ENABLE_THREAD_POOL 571 int i; 572 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) { 573 bool overcommit = true; 574#if TARGET_OS_EMBEDDED 575 // some software hangs if the non-overcommitting queues do not 576 // overcommit when threads block. Someday, this behavior should 577 // apply to all platforms 578 if (!(i & 1)) { 579 overcommit = false; 580 } 581#endif 582 _dispatch_root_queue_init_pthread_pool( 583 &_dispatch_root_queue_contexts[i], overcommit); 584 } 585#else 586 DISPATCH_CRASH("Root queue initialization failed"); 587#endif // DISPATCH_ENABLE_THREAD_POOL 588 } 589 590} 591 592#define countof(x) (sizeof(x) / sizeof(x[0])) 593 594DISPATCH_EXPORT DISPATCH_NOTHROW 595void 596libdispatch_init(void) 597{ 598 dispatch_assert(DISPATCH_QUEUE_PRIORITY_COUNT == 4); 599 dispatch_assert(DISPATCH_ROOT_QUEUE_COUNT == 8); 600 601 dispatch_assert(DISPATCH_QUEUE_PRIORITY_LOW == 602 -DISPATCH_QUEUE_PRIORITY_HIGH); 603 dispatch_assert(countof(_dispatch_root_queues) == 604 DISPATCH_ROOT_QUEUE_COUNT); 605 dispatch_assert(countof(_dispatch_root_queue_contexts) == 606 DISPATCH_ROOT_QUEUE_COUNT); 607#if HAVE_PTHREAD_WORKQUEUES 608 dispatch_assert(sizeof(_dispatch_wq2root_queues) / 609 sizeof(_dispatch_wq2root_queues[0][0]) == 610 DISPATCH_ROOT_QUEUE_COUNT); 611#endif 612#if DISPATCH_ENABLE_THREAD_POOL 613 dispatch_assert(countof(_dispatch_thread_mediator) == 614 DISPATCH_ROOT_QUEUE_COUNT); 615#endif 616 617 dispatch_assert(sizeof(struct dispatch_apply_s) <= 618 DISPATCH_CONTINUATION_SIZE); 619 dispatch_assert(sizeof(struct dispatch_queue_s) % DISPATCH_CACHELINE_SIZE 620 == 0); 621 dispatch_assert(sizeof(struct dispatch_root_queue_context_s) % 622 DISPATCH_CACHELINE_SIZE == 0); 623 624 _dispatch_thread_key_create(&dispatch_queue_key, _dispatch_queue_cleanup); 625#if !DISPATCH_USE_OS_SEMAPHORE_CACHE 626 _dispatch_thread_key_create(&dispatch_sema4_key, 627 (void (*)(void *))_dispatch_thread_semaphore_dispose); 628#endif 629 _dispatch_thread_key_create(&dispatch_cache_key, _dispatch_cache_cleanup); 630 _dispatch_thread_key_create(&dispatch_io_key, NULL); 631 _dispatch_thread_key_create(&dispatch_apply_key, NULL); 632#if DISPATCH_PERF_MON 633 _dispatch_thread_key_create(&dispatch_bcounter_key, NULL); 634#endif 635 636#if DISPATCH_USE_RESOLVERS // rdar://problem/8541707 637 _dispatch_main_q.do_targetq = &_dispatch_root_queues[ 638 DISPATCH_ROOT_QUEUE_IDX_DEFAULT_OVERCOMMIT_PRIORITY]; 639#endif 640 641 _dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_main_q); 642 _dispatch_queue_set_bound_thread(&_dispatch_main_q); 643 644#if DISPATCH_USE_PTHREAD_ATFORK 645 (void)dispatch_assume_zero(pthread_atfork(dispatch_atfork_prepare, 646 dispatch_atfork_parent, dispatch_atfork_child)); 647#endif 648 649 _dispatch_hw_config_init(); 650 _dispatch_vtable_init(); 651 _os_object_init(); 652 _dispatch_introspection_init(); 653} 654 655DISPATCH_EXPORT DISPATCH_NOTHROW 656void 657dispatch_atfork_child(void) 658{ 659 void *crash = (void *)0x100; 660 size_t i; 661 662 if (_dispatch_safe_fork) { 663 return; 664 } 665 _dispatch_child_of_unsafe_fork = true; 666 667 _dispatch_main_q.dq_items_head = crash; 668 _dispatch_main_q.dq_items_tail = crash; 669 670 _dispatch_mgr_q.dq_items_head = crash; 671 _dispatch_mgr_q.dq_items_tail = crash; 672 673 for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) { 674 _dispatch_root_queues[i].dq_items_head = crash; 675 _dispatch_root_queues[i].dq_items_tail = crash; 676 } 677} 678 679#pragma mark - 680#pragma mark dispatch_queue_t 681 682// skip zero 683// 1 - main_q 684// 2 - mgr_q 685// 3 - mgr_root_q 686// 4,5,6,7,8,9,10,11 - global queues 687// we use 'xadd' on Intel, so the initial value == next assigned 688unsigned long volatile _dispatch_queue_serial_numbers = 12; 689 690dispatch_queue_t 691dispatch_queue_create_with_target(const char *label, 692 dispatch_queue_attr_t attr, dispatch_queue_t tq) 693{ 694 dispatch_queue_t dq; 695 696 dq = _dispatch_alloc(DISPATCH_VTABLE(queue), 697 sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD); 698 699 _dispatch_queue_init(dq); 700 if (label) { 701 dq->dq_label = strdup(label); 702 } 703 704 if (attr == DISPATCH_QUEUE_CONCURRENT) { 705 dq->dq_width = UINT32_MAX; 706 if (!tq) { 707 tq = _dispatch_get_root_queue(0, false); 708 } 709 } else { 710 if (!tq) { 711 // Default target queue is overcommit! 712 tq = _dispatch_get_root_queue(0, true); 713 } 714 if (slowpath(attr)) { 715 dispatch_debug_assert(!attr, "Invalid attribute"); 716 } 717 } 718 dq->do_targetq = tq; 719 _dispatch_object_debug(dq, "%s", __func__); 720 return _dispatch_introspection_queue_create(dq); 721} 722 723dispatch_queue_t 724dispatch_queue_create(const char *label, dispatch_queue_attr_t attr) 725{ 726 return dispatch_queue_create_with_target(label, attr, 727 DISPATCH_TARGET_QUEUE_DEFAULT); 728} 729 730void 731_dispatch_queue_destroy(dispatch_object_t dou) 732{ 733 dispatch_queue_t dq = dou._dq; 734 if (slowpath(dq == _dispatch_queue_get_current())) { 735 DISPATCH_CRASH("Release of a queue by itself"); 736 } 737 if (slowpath(dq->dq_items_tail)) { 738 DISPATCH_CRASH("Release of a queue while items are enqueued"); 739 } 740 741 // trash the tail queue so that use after free will crash 742 dq->dq_items_tail = (void *)0x200; 743 744 dispatch_queue_t dqsq = dispatch_atomic_xchg2o(dq, dq_specific_q, 745 (void *)0x200, relaxed); 746 if (dqsq) { 747 _dispatch_release(dqsq); 748 } 749} 750 751// 6618342 Contact the team that owns the Instrument DTrace probe before 752// renaming this symbol 753void 754_dispatch_queue_dispose(dispatch_queue_t dq) 755{ 756 _dispatch_object_debug(dq, "%s", __func__); 757 _dispatch_introspection_queue_dispose(dq); 758 if (dq->dq_label) { 759 free((void*)dq->dq_label); 760 } 761 _dispatch_queue_destroy(dq); 762} 763 764const char * 765dispatch_queue_get_label(dispatch_queue_t dq) 766{ 767 if (slowpath(dq == DISPATCH_CURRENT_QUEUE_LABEL)) { 768 dq = _dispatch_get_current_queue(); 769 } 770 return dq->dq_label ? dq->dq_label : ""; 771} 772 773static void 774_dispatch_queue_set_width2(void *ctxt) 775{ 776 int w = (int)(intptr_t)ctxt; // intentional truncation 777 uint32_t tmp; 778 dispatch_queue_t dq = _dispatch_queue_get_current(); 779 780 if (w == 1 || w == 0) { 781 dq->dq_width = 1; 782 _dispatch_object_debug(dq, "%s", __func__); 783 return; 784 } 785 if (w > 0) { 786 tmp = (unsigned int)w; 787 } else switch (w) { 788 case DISPATCH_QUEUE_WIDTH_MAX_PHYSICAL_CPUS: 789 tmp = _dispatch_hw_config.cc_max_physical; 790 break; 791 case DISPATCH_QUEUE_WIDTH_ACTIVE_CPUS: 792 tmp = _dispatch_hw_config.cc_max_active; 793 break; 794 default: 795 // fall through 796 case DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS: 797 tmp = _dispatch_hw_config.cc_max_logical; 798 break; 799 } 800 // multiply by two since the running count is inc/dec by two 801 // (the low bit == barrier) 802 dq->dq_width = tmp * 2; 803 _dispatch_object_debug(dq, "%s", __func__); 804} 805 806void 807dispatch_queue_set_width(dispatch_queue_t dq, long width) 808{ 809 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) || 810 slowpath(dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE)) { 811 return; 812 } 813 _dispatch_barrier_trysync_f(dq, (void*)(intptr_t)width, 814 _dispatch_queue_set_width2); 815} 816 817// 6618342 Contact the team that owns the Instrument DTrace probe before 818// renaming this symbol 819static void 820_dispatch_set_target_queue2(void *ctxt) 821{ 822 dispatch_queue_t prev_dq, dq = _dispatch_queue_get_current(); 823 824 prev_dq = dq->do_targetq; 825 dq->do_targetq = ctxt; 826 _dispatch_release(prev_dq); 827 _dispatch_object_debug(dq, "%s", __func__); 828} 829 830void 831dispatch_set_target_queue(dispatch_object_t dou, dispatch_queue_t dq) 832{ 833 DISPATCH_OBJECT_TFB(_dispatch_objc_set_target_queue, dou, dq); 834 if (slowpath(dou._do->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT) || 835 slowpath(dx_type(dou._do) == DISPATCH_QUEUE_ROOT_TYPE)) { 836 return; 837 } 838 unsigned long type = dx_metatype(dou._do); 839 if (slowpath(!dq)) { 840 bool is_concurrent_q = (type == _DISPATCH_QUEUE_TYPE && 841 slowpath(dou._dq->dq_width > 1)); 842 dq = _dispatch_get_root_queue(0, !is_concurrent_q); 843 } 844 // TODO: put into the vtable 845 switch(type) { 846 case _DISPATCH_QUEUE_TYPE: 847 case _DISPATCH_SOURCE_TYPE: 848 _dispatch_retain(dq); 849 return _dispatch_barrier_trysync_f(dou._dq, dq, 850 _dispatch_set_target_queue2); 851 case _DISPATCH_IO_TYPE: 852 return _dispatch_io_set_target_queue(dou._dchannel, dq); 853 default: { 854 dispatch_queue_t prev_dq; 855 _dispatch_retain(dq); 856 prev_dq = dispatch_atomic_xchg2o(dou._do, do_targetq, dq, release); 857 if (prev_dq) _dispatch_release(prev_dq); 858 _dispatch_object_debug(dou._do, "%s", __func__); 859 return; 860 } 861 } 862} 863 864#pragma mark - 865#pragma mark dispatch_pthread_root_queue 866 867struct dispatch_pthread_root_queue_context_s { 868 pthread_attr_t dpq_thread_attr; 869 dispatch_block_t dpq_thread_configure; 870 struct dispatch_semaphore_s dpq_thread_mediator; 871}; 872typedef struct dispatch_pthread_root_queue_context_s * 873 dispatch_pthread_root_queue_context_t; 874 875#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES 876static struct dispatch_pthread_root_queue_context_s 877 _dispatch_mgr_root_queue_pthread_context; 878static struct dispatch_root_queue_context_s 879 _dispatch_mgr_root_queue_context = {{{ 880#if HAVE_PTHREAD_WORKQUEUES 881 .dgq_kworkqueue = (void*)(~0ul), 882#endif 883 .dgq_ctxt = &_dispatch_mgr_root_queue_pthread_context, 884 .dgq_thread_pool_size = 1, 885}}}; 886static struct dispatch_queue_s _dispatch_mgr_root_queue = { 887 .do_vtable = DISPATCH_VTABLE(queue_root), 888 .do_ref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 889 .do_xref_cnt = DISPATCH_OBJECT_GLOBAL_REFCNT, 890 .do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK, 891 .do_ctxt = &_dispatch_mgr_root_queue_context, 892 .dq_label = "com.apple.root.libdispatch-manager", 893 .dq_running = 2, 894 .dq_width = UINT32_MAX, 895 .dq_serialnum = 3, 896}; 897static struct { 898 volatile int prio; 899 int policy; 900 pthread_t tid; 901} _dispatch_mgr_sched; 902static dispatch_once_t _dispatch_mgr_sched_pred; 903 904static void 905_dispatch_mgr_sched_init(void *ctxt DISPATCH_UNUSED) 906{ 907 struct sched_param param; 908 pthread_attr_t *attr; 909 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr; 910 (void)dispatch_assume_zero(pthread_attr_init(attr)); 911 (void)dispatch_assume_zero(pthread_attr_getschedpolicy(attr, 912 &_dispatch_mgr_sched.policy)); 913 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, ¶m)); 914 // high-priority workq threads are at priority 2 above default 915 _dispatch_mgr_sched.prio = param.sched_priority + 2; 916} 917 918DISPATCH_NOINLINE 919static pthread_t * 920_dispatch_mgr_root_queue_init(void) 921{ 922 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init); 923 struct sched_param param; 924 pthread_attr_t *attr; 925 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr; 926 (void)dispatch_assume_zero(pthread_attr_setdetachstate(attr, 927 PTHREAD_CREATE_DETACHED)); 928#if !DISPATCH_DEBUG 929 (void)dispatch_assume_zero(pthread_attr_setstacksize(attr, 64 * 1024)); 930#endif 931 param.sched_priority = _dispatch_mgr_sched.prio; 932 (void)dispatch_assume_zero(pthread_attr_setschedparam(attr, ¶m)); 933 return &_dispatch_mgr_sched.tid; 934} 935 936static inline void 937_dispatch_mgr_priority_apply(void) 938{ 939 struct sched_param param; 940 do { 941 param.sched_priority = _dispatch_mgr_sched.prio; 942 (void)dispatch_assume_zero(pthread_setschedparam( 943 _dispatch_mgr_sched.tid, _dispatch_mgr_sched.policy, ¶m)); 944 } while (_dispatch_mgr_sched.prio > param.sched_priority); 945} 946 947DISPATCH_NOINLINE 948void 949_dispatch_mgr_priority_init(void) 950{ 951 struct sched_param param; 952 pthread_attr_t *attr; 953 attr = &_dispatch_mgr_root_queue_pthread_context.dpq_thread_attr; 954 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, ¶m)); 955 if (slowpath(_dispatch_mgr_sched.prio > param.sched_priority)) { 956 return _dispatch_mgr_priority_apply(); 957 } 958} 959 960DISPATCH_NOINLINE 961static void 962_dispatch_mgr_priority_raise(const pthread_attr_t *attr) 963{ 964 dispatch_once_f(&_dispatch_mgr_sched_pred, NULL, _dispatch_mgr_sched_init); 965 struct sched_param param; 966 (void)dispatch_assume_zero(pthread_attr_getschedparam(attr, ¶m)); 967 int p = _dispatch_mgr_sched.prio; 968 do if (p >= param.sched_priority) { 969 return; 970 } while (slowpath(!dispatch_atomic_cmpxchgvw2o(&_dispatch_mgr_sched, prio, 971 p, param.sched_priority, &p, relaxed))); 972 if (_dispatch_mgr_sched.tid) { 973 return _dispatch_mgr_priority_apply(); 974 } 975} 976 977dispatch_queue_t 978dispatch_pthread_root_queue_create(const char *label, unsigned long flags, 979 const pthread_attr_t *attr, dispatch_block_t configure) 980{ 981 dispatch_queue_t dq; 982 dispatch_root_queue_context_t qc; 983 dispatch_pthread_root_queue_context_t pqc; 984 size_t dqs; 985 986 if (slowpath(flags)) { 987 return NULL; 988 } 989 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD; 990 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_root), dqs + 991 sizeof(struct dispatch_root_queue_context_s) + 992 sizeof(struct dispatch_pthread_root_queue_context_s)); 993 qc = (void*)dq + dqs; 994 pqc = (void*)qc + sizeof(struct dispatch_root_queue_context_s); 995 996 _dispatch_queue_init(dq); 997 if (label) { 998 dq->dq_label = strdup(label); 999 } 1000 1001 dq->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK; 1002 dq->do_ctxt = qc; 1003 dq->do_targetq = NULL; 1004 dq->dq_running = 2; 1005 dq->dq_width = UINT32_MAX; 1006 1007 pqc->dpq_thread_mediator.do_vtable = DISPATCH_VTABLE(semaphore); 1008 qc->dgq_thread_mediator = &pqc->dpq_thread_mediator; 1009 qc->dgq_ctxt = pqc; 1010#if HAVE_PTHREAD_WORKQUEUES 1011 qc->dgq_kworkqueue = (void*)(~0ul); 1012#endif 1013 _dispatch_root_queue_init_pthread_pool(qc, true); // rdar://11352331 1014 1015 if (attr) { 1016 memcpy(&pqc->dpq_thread_attr, attr, sizeof(pthread_attr_t)); 1017 _dispatch_mgr_priority_raise(&pqc->dpq_thread_attr); 1018 } else { 1019 (void)dispatch_assume_zero(pthread_attr_init(&pqc->dpq_thread_attr)); 1020 } 1021 (void)dispatch_assume_zero(pthread_attr_setdetachstate( 1022 &pqc->dpq_thread_attr, PTHREAD_CREATE_DETACHED)); 1023 if (configure) { 1024 pqc->dpq_thread_configure = _dispatch_Block_copy(configure); 1025 } 1026 _dispatch_object_debug(dq, "%s", __func__); 1027 return _dispatch_introspection_queue_create(dq); 1028} 1029#endif 1030 1031void 1032_dispatch_pthread_root_queue_dispose(dispatch_queue_t dq) 1033{ 1034 if (slowpath(dq->do_ref_cnt == DISPATCH_OBJECT_GLOBAL_REFCNT)) { 1035 DISPATCH_CRASH("Global root queue disposed"); 1036 } 1037 _dispatch_object_debug(dq, "%s", __func__); 1038 _dispatch_introspection_queue_dispose(dq); 1039#if DISPATCH_USE_PTHREAD_POOL 1040 dispatch_root_queue_context_t qc = dq->do_ctxt; 1041 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; 1042 1043 _dispatch_semaphore_dispose(qc->dgq_thread_mediator); 1044 if (pqc->dpq_thread_configure) { 1045 Block_release(pqc->dpq_thread_configure); 1046 } 1047 dq->do_targetq = _dispatch_get_root_queue(0, false); 1048#endif 1049 if (dq->dq_label) { 1050 free((void*)dq->dq_label); 1051 } 1052 _dispatch_queue_destroy(dq); 1053} 1054 1055#pragma mark - 1056#pragma mark dispatch_queue_specific 1057 1058struct dispatch_queue_specific_queue_s { 1059 DISPATCH_STRUCT_HEADER(queue_specific_queue); 1060 DISPATCH_QUEUE_HEADER; 1061 TAILQ_HEAD(dispatch_queue_specific_head_s, 1062 dispatch_queue_specific_s) dqsq_contexts; 1063}; 1064 1065struct dispatch_queue_specific_s { 1066 const void *dqs_key; 1067 void *dqs_ctxt; 1068 dispatch_function_t dqs_destructor; 1069 TAILQ_ENTRY(dispatch_queue_specific_s) dqs_list; 1070}; 1071DISPATCH_DECL(dispatch_queue_specific); 1072 1073void 1074_dispatch_queue_specific_queue_dispose(dispatch_queue_specific_queue_t dqsq) 1075{ 1076 dispatch_queue_specific_t dqs, tmp; 1077 1078 TAILQ_FOREACH_SAFE(dqs, &dqsq->dqsq_contexts, dqs_list, tmp) { 1079 if (dqs->dqs_destructor) { 1080 dispatch_async_f(_dispatch_get_root_queue( 1081 DISPATCH_QUEUE_PRIORITY_DEFAULT, false), dqs->dqs_ctxt, 1082 dqs->dqs_destructor); 1083 } 1084 free(dqs); 1085 } 1086 _dispatch_queue_destroy((dispatch_queue_t)dqsq); 1087} 1088 1089static void 1090_dispatch_queue_init_specific(dispatch_queue_t dq) 1091{ 1092 dispatch_queue_specific_queue_t dqsq; 1093 1094 dqsq = _dispatch_alloc(DISPATCH_VTABLE(queue_specific_queue), 1095 sizeof(struct dispatch_queue_specific_queue_s)); 1096 _dispatch_queue_init((dispatch_queue_t)dqsq); 1097 dqsq->do_xref_cnt = -1; 1098 dqsq->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH, 1099 true); 1100 dqsq->dq_width = UINT32_MAX; 1101 dqsq->dq_label = "queue-specific"; 1102 TAILQ_INIT(&dqsq->dqsq_contexts); 1103 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_specific_q, NULL, 1104 (dispatch_queue_t)dqsq, release))) { 1105 _dispatch_release((dispatch_queue_t)dqsq); 1106 } 1107} 1108 1109static void 1110_dispatch_queue_set_specific(void *ctxt) 1111{ 1112 dispatch_queue_specific_t dqs, dqsn = ctxt; 1113 dispatch_queue_specific_queue_t dqsq = 1114 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current(); 1115 1116 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) { 1117 if (dqs->dqs_key == dqsn->dqs_key) { 1118 // Destroy previous context for existing key 1119 if (dqs->dqs_destructor) { 1120 dispatch_async_f(_dispatch_get_root_queue( 1121 DISPATCH_QUEUE_PRIORITY_DEFAULT, false), dqs->dqs_ctxt, 1122 dqs->dqs_destructor); 1123 } 1124 if (dqsn->dqs_ctxt) { 1125 // Copy new context for existing key 1126 dqs->dqs_ctxt = dqsn->dqs_ctxt; 1127 dqs->dqs_destructor = dqsn->dqs_destructor; 1128 } else { 1129 // Remove context storage for existing key 1130 TAILQ_REMOVE(&dqsq->dqsq_contexts, dqs, dqs_list); 1131 free(dqs); 1132 } 1133 return free(dqsn); 1134 } 1135 } 1136 // Insert context storage for new key 1137 TAILQ_INSERT_TAIL(&dqsq->dqsq_contexts, dqsn, dqs_list); 1138} 1139 1140DISPATCH_NOINLINE 1141void 1142dispatch_queue_set_specific(dispatch_queue_t dq, const void *key, 1143 void *ctxt, dispatch_function_t destructor) 1144{ 1145 if (slowpath(!key)) { 1146 return; 1147 } 1148 dispatch_queue_specific_t dqs; 1149 1150 dqs = _dispatch_calloc(1, sizeof(struct dispatch_queue_specific_s)); 1151 dqs->dqs_key = key; 1152 dqs->dqs_ctxt = ctxt; 1153 dqs->dqs_destructor = destructor; 1154 if (slowpath(!dq->dq_specific_q)) { 1155 _dispatch_queue_init_specific(dq); 1156 } 1157 _dispatch_barrier_trysync_f(dq->dq_specific_q, dqs, 1158 _dispatch_queue_set_specific); 1159} 1160 1161static void 1162_dispatch_queue_get_specific(void *ctxt) 1163{ 1164 void **ctxtp = ctxt; 1165 void *key = *ctxtp; 1166 dispatch_queue_specific_queue_t dqsq = 1167 (dispatch_queue_specific_queue_t)_dispatch_queue_get_current(); 1168 dispatch_queue_specific_t dqs; 1169 1170 TAILQ_FOREACH(dqs, &dqsq->dqsq_contexts, dqs_list) { 1171 if (dqs->dqs_key == key) { 1172 *ctxtp = dqs->dqs_ctxt; 1173 return; 1174 } 1175 } 1176 *ctxtp = NULL; 1177} 1178 1179DISPATCH_NOINLINE 1180void * 1181dispatch_queue_get_specific(dispatch_queue_t dq, const void *key) 1182{ 1183 if (slowpath(!key)) { 1184 return NULL; 1185 } 1186 void *ctxt = NULL; 1187 1188 if (fastpath(dq->dq_specific_q)) { 1189 ctxt = (void *)key; 1190 dispatch_sync_f(dq->dq_specific_q, &ctxt, _dispatch_queue_get_specific); 1191 } 1192 return ctxt; 1193} 1194 1195DISPATCH_NOINLINE 1196void * 1197dispatch_get_specific(const void *key) 1198{ 1199 if (slowpath(!key)) { 1200 return NULL; 1201 } 1202 void *ctxt = NULL; 1203 dispatch_queue_t dq = _dispatch_queue_get_current(); 1204 1205 while (slowpath(dq)) { 1206 if (slowpath(dq->dq_specific_q)) { 1207 ctxt = (void *)key; 1208 dispatch_sync_f(dq->dq_specific_q, &ctxt, 1209 _dispatch_queue_get_specific); 1210 if (ctxt) break; 1211 } 1212 dq = dq->do_targetq; 1213 } 1214 return ctxt; 1215} 1216 1217#pragma mark - 1218#pragma mark dispatch_queue_debug 1219 1220size_t 1221_dispatch_queue_debug_attr(dispatch_queue_t dq, char* buf, size_t bufsiz) 1222{ 1223 size_t offset = 0; 1224 dispatch_queue_t target = dq->do_targetq; 1225 offset += dsnprintf(buf, bufsiz, "target = %s[%p], width = 0x%x, " 1226 "running = 0x%x, barrier = %d ", target && target->dq_label ? 1227 target->dq_label : "", target, dq->dq_width / 2, 1228 dq->dq_running / 2, dq->dq_running & 1); 1229 if (dq->dq_is_thread_bound) { 1230 offset += dsnprintf(buf, bufsiz, ", thread = %p ", 1231 _dispatch_queue_get_bound_thread(dq)); 1232 } 1233 return offset; 1234} 1235 1236size_t 1237dispatch_queue_debug(dispatch_queue_t dq, char* buf, size_t bufsiz) 1238{ 1239 size_t offset = 0; 1240 offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ", 1241 dq->dq_label ? dq->dq_label : dx_kind(dq), dq); 1242 offset += _dispatch_object_debug_attr(dq, &buf[offset], bufsiz - offset); 1243 offset += _dispatch_queue_debug_attr(dq, &buf[offset], bufsiz - offset); 1244 offset += dsnprintf(&buf[offset], bufsiz - offset, "}"); 1245 return offset; 1246} 1247 1248#if DISPATCH_DEBUG 1249void 1250dispatch_debug_queue(dispatch_queue_t dq, const char* str) { 1251 if (fastpath(dq)) { 1252 _dispatch_object_debug(dq, "%s", str); 1253 } else { 1254 _dispatch_log("queue[NULL]: %s", str); 1255 } 1256} 1257#endif 1258 1259#if DISPATCH_PERF_MON 1260static OSSpinLock _dispatch_stats_lock; 1261static struct { 1262 uint64_t time_total; 1263 uint64_t count_total; 1264 uint64_t thread_total; 1265} _dispatch_stats[65]; // ffs*/fls*() returns zero when no bits are set 1266 1267static void 1268_dispatch_queue_merge_stats(uint64_t start) 1269{ 1270 uint64_t avg, delta = _dispatch_absolute_time() - start; 1271 unsigned long count, bucket; 1272 1273 count = (size_t)_dispatch_thread_getspecific(dispatch_bcounter_key); 1274 _dispatch_thread_setspecific(dispatch_bcounter_key, NULL); 1275 1276 if (count) { 1277 avg = delta / count; 1278 bucket = flsll(avg); 1279 } else { 1280 bucket = 0; 1281 } 1282 1283 // 64-bit counters on 32-bit require a lock or a queue 1284 OSSpinLockLock(&_dispatch_stats_lock); 1285 1286 _dispatch_stats[bucket].time_total += delta; 1287 _dispatch_stats[bucket].count_total += count; 1288 _dispatch_stats[bucket].thread_total++; 1289 1290 OSSpinLockUnlock(&_dispatch_stats_lock); 1291} 1292#endif 1293 1294#pragma mark - 1295#pragma mark dispatch_continuation_t 1296 1297static void 1298_dispatch_force_cache_cleanup(void) 1299{ 1300 dispatch_continuation_t dc; 1301 dc = _dispatch_thread_getspecific(dispatch_cache_key); 1302 if (dc) { 1303 _dispatch_thread_setspecific(dispatch_cache_key, NULL); 1304 _dispatch_cache_cleanup(dc); 1305 } 1306} 1307 1308DISPATCH_NOINLINE 1309static void 1310_dispatch_cache_cleanup(void *value) 1311{ 1312 dispatch_continuation_t dc, next_dc = value; 1313 1314 while ((dc = next_dc)) { 1315 next_dc = dc->do_next; 1316 _dispatch_continuation_free_to_heap(dc); 1317 } 1318} 1319 1320#if DISPATCH_USE_MEMORYSTATUS_SOURCE 1321int _dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT; 1322 1323DISPATCH_NOINLINE 1324void 1325_dispatch_continuation_free_to_cache_limit(dispatch_continuation_t dc) 1326{ 1327 _dispatch_continuation_free_to_heap(dc); 1328 dispatch_continuation_t next_dc; 1329 dc = _dispatch_thread_getspecific(dispatch_cache_key); 1330 int cnt; 1331 if (!dc || (cnt = dc->do_ref_cnt-_dispatch_continuation_cache_limit) <= 0) { 1332 return; 1333 } 1334 do { 1335 next_dc = dc->do_next; 1336 _dispatch_continuation_free_to_heap(dc); 1337 } while (--cnt && (dc = next_dc)); 1338 _dispatch_thread_setspecific(dispatch_cache_key, next_dc); 1339} 1340#endif 1341 1342DISPATCH_ALWAYS_INLINE_NDEBUG 1343static inline void 1344_dispatch_continuation_redirect(dispatch_queue_t dq, dispatch_object_t dou) 1345{ 1346 dispatch_continuation_t dc = dou._dc; 1347 1348 (void)dispatch_atomic_add2o(dq, dq_running, 2, acquire); 1349 if (!DISPATCH_OBJ_IS_VTABLE(dc) && 1350 (long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT) { 1351 _dispatch_trace_continuation_pop(dq, dou); 1352 _dispatch_thread_semaphore_signal( 1353 (_dispatch_thread_semaphore_t)dc->dc_other); 1354 _dispatch_introspection_queue_item_complete(dou); 1355 } else { 1356 _dispatch_async_f_redirect(dq, dc); 1357 } 1358 _dispatch_perfmon_workitem_inc(); 1359} 1360 1361DISPATCH_ALWAYS_INLINE_NDEBUG 1362static inline void 1363_dispatch_continuation_pop(dispatch_object_t dou) 1364{ 1365 dispatch_continuation_t dc = dou._dc, dc1; 1366 dispatch_group_t dg; 1367 1368 _dispatch_trace_continuation_pop(_dispatch_queue_get_current(), dou); 1369 if (DISPATCH_OBJ_IS_VTABLE(dou._do)) { 1370 return dx_invoke(dou._do); 1371 } 1372 1373 // Add the item back to the cache before calling the function. This 1374 // allows the 'hot' continuation to be used for a quick callback. 1375 // 1376 // The ccache version is per-thread. 1377 // Therefore, the object has not been reused yet. 1378 // This generates better assembly. 1379 if ((long)dc->do_vtable & DISPATCH_OBJ_ASYNC_BIT) { 1380 dc1 = _dispatch_continuation_free_cacheonly(dc); 1381 } else { 1382 dc1 = NULL; 1383 } 1384 if ((long)dc->do_vtable & DISPATCH_OBJ_GROUP_BIT) { 1385 dg = dc->dc_data; 1386 } else { 1387 dg = NULL; 1388 } 1389 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); 1390 if (dg) { 1391 dispatch_group_leave(dg); 1392 _dispatch_release(dg); 1393 } 1394 _dispatch_introspection_queue_item_complete(dou); 1395 if (slowpath(dc1)) { 1396 _dispatch_continuation_free_to_cache_limit(dc1); 1397 } 1398} 1399 1400#pragma mark - 1401#pragma mark dispatch_barrier_async 1402 1403DISPATCH_NOINLINE 1404static void 1405_dispatch_barrier_async_f_slow(dispatch_queue_t dq, void *ctxt, 1406 dispatch_function_t func) 1407{ 1408 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap(); 1409 1410 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT); 1411 dc->dc_func = func; 1412 dc->dc_ctxt = ctxt; 1413 1414 _dispatch_queue_push(dq, dc); 1415} 1416 1417DISPATCH_NOINLINE 1418void 1419dispatch_barrier_async_f(dispatch_queue_t dq, void *ctxt, 1420 dispatch_function_t func) 1421{ 1422 dispatch_continuation_t dc; 1423 1424 dc = fastpath(_dispatch_continuation_alloc_cacheonly()); 1425 if (!dc) { 1426 return _dispatch_barrier_async_f_slow(dq, ctxt, func); 1427 } 1428 1429 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT); 1430 dc->dc_func = func; 1431 dc->dc_ctxt = ctxt; 1432 1433 _dispatch_queue_push(dq, dc); 1434} 1435 1436#ifdef __BLOCKS__ 1437void 1438dispatch_barrier_async(dispatch_queue_t dq, void (^work)(void)) 1439{ 1440 dispatch_barrier_async_f(dq, _dispatch_Block_copy(work), 1441 _dispatch_call_block_and_release); 1442} 1443#endif 1444 1445#pragma mark - 1446#pragma mark dispatch_async 1447 1448void 1449_dispatch_async_redirect_invoke(void *ctxt) 1450{ 1451 struct dispatch_continuation_s *dc = ctxt; 1452 struct dispatch_continuation_s *other_dc = dc->dc_other; 1453 dispatch_queue_t old_dq, dq = dc->dc_data, rq; 1454 1455 old_dq = _dispatch_thread_getspecific(dispatch_queue_key); 1456 _dispatch_thread_setspecific(dispatch_queue_key, dq); 1457 _dispatch_continuation_pop(other_dc); 1458 _dispatch_thread_setspecific(dispatch_queue_key, old_dq); 1459 1460 rq = dq->do_targetq; 1461 while (slowpath(rq->do_targetq) && rq != old_dq) { 1462 if (dispatch_atomic_sub2o(rq, dq_running, 2, relaxed) == 0) { 1463 _dispatch_wakeup(rq); 1464 } 1465 rq = rq->do_targetq; 1466 } 1467 1468 if (dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0) { 1469 _dispatch_wakeup(dq); 1470 } 1471 _dispatch_release(dq); 1472} 1473 1474static inline void 1475_dispatch_async_f_redirect2(dispatch_queue_t dq, dispatch_continuation_t dc) 1476{ 1477 uint32_t running = 2; 1478 1479 // Find the queue to redirect to 1480 do { 1481 if (slowpath(dq->dq_items_tail) || 1482 slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) || 1483 slowpath(dq->dq_width == 1)) { 1484 break; 1485 } 1486 running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 1487 if (slowpath(running & 1) || slowpath(running > dq->dq_width)) { 1488 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed); 1489 break; 1490 } 1491 dq = dq->do_targetq; 1492 } while (slowpath(dq->do_targetq)); 1493 1494 _dispatch_queue_push_wakeup(dq, dc, running == 0); 1495} 1496 1497DISPATCH_NOINLINE 1498static void 1499_dispatch_async_f_redirect(dispatch_queue_t dq, 1500 dispatch_continuation_t other_dc) 1501{ 1502 dispatch_continuation_t dc = _dispatch_continuation_alloc(); 1503 1504 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT; 1505 dc->dc_func = _dispatch_async_redirect_invoke; 1506 dc->dc_ctxt = dc; 1507 dc->dc_data = dq; 1508 dc->dc_other = other_dc; 1509 1510 _dispatch_retain(dq); 1511 dq = dq->do_targetq; 1512 if (slowpath(dq->do_targetq)) { 1513 return _dispatch_async_f_redirect2(dq, dc); 1514 } 1515 1516 _dispatch_queue_push(dq, dc); 1517} 1518 1519DISPATCH_NOINLINE 1520static void 1521_dispatch_async_f2(dispatch_queue_t dq, dispatch_continuation_t dc) 1522{ 1523 uint32_t running = 2; 1524 1525 do { 1526 if (slowpath(dq->dq_items_tail) 1527 || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) { 1528 break; 1529 } 1530 running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 1531 if (slowpath(running > dq->dq_width)) { 1532 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed); 1533 break; 1534 } 1535 if (!slowpath(running & 1)) { 1536 return _dispatch_async_f_redirect(dq, dc); 1537 } 1538 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed); 1539 // We might get lucky and find that the barrier has ended by now 1540 } while (!(running & 1)); 1541 1542 _dispatch_queue_push_wakeup(dq, dc, running == 0); 1543} 1544 1545DISPATCH_NOINLINE 1546static void 1547_dispatch_async_f_slow(dispatch_queue_t dq, void *ctxt, 1548 dispatch_function_t func) 1549{ 1550 dispatch_continuation_t dc = _dispatch_continuation_alloc_from_heap(); 1551 1552 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT; 1553 dc->dc_func = func; 1554 dc->dc_ctxt = ctxt; 1555 1556 // No fastpath/slowpath hint because we simply don't know 1557 if (dq->do_targetq) { 1558 return _dispatch_async_f2(dq, dc); 1559 } 1560 1561 _dispatch_queue_push(dq, dc); 1562} 1563 1564DISPATCH_NOINLINE 1565void 1566dispatch_async_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) 1567{ 1568 dispatch_continuation_t dc; 1569 1570 // No fastpath/slowpath hint because we simply don't know 1571 if (dq->dq_width == 1) { 1572 return dispatch_barrier_async_f(dq, ctxt, func); 1573 } 1574 1575 dc = fastpath(_dispatch_continuation_alloc_cacheonly()); 1576 if (!dc) { 1577 return _dispatch_async_f_slow(dq, ctxt, func); 1578 } 1579 1580 dc->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT; 1581 dc->dc_func = func; 1582 dc->dc_ctxt = ctxt; 1583 1584 // No fastpath/slowpath hint because we simply don't know 1585 if (dq->do_targetq) { 1586 return _dispatch_async_f2(dq, dc); 1587 } 1588 1589 _dispatch_queue_push(dq, dc); 1590} 1591 1592#ifdef __BLOCKS__ 1593void 1594dispatch_async(dispatch_queue_t dq, void (^work)(void)) 1595{ 1596 dispatch_async_f(dq, _dispatch_Block_copy(work), 1597 _dispatch_call_block_and_release); 1598} 1599#endif 1600 1601#pragma mark - 1602#pragma mark dispatch_group_async 1603 1604DISPATCH_NOINLINE 1605void 1606dispatch_group_async_f(dispatch_group_t dg, dispatch_queue_t dq, void *ctxt, 1607 dispatch_function_t func) 1608{ 1609 dispatch_continuation_t dc; 1610 1611 _dispatch_retain(dg); 1612 dispatch_group_enter(dg); 1613 1614 dc = _dispatch_continuation_alloc(); 1615 1616 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_GROUP_BIT); 1617 dc->dc_func = func; 1618 dc->dc_ctxt = ctxt; 1619 dc->dc_data = dg; 1620 1621 // No fastpath/slowpath hint because we simply don't know 1622 if (dq->dq_width != 1 && dq->do_targetq) { 1623 return _dispatch_async_f2(dq, dc); 1624 } 1625 1626 _dispatch_queue_push(dq, dc); 1627} 1628 1629#ifdef __BLOCKS__ 1630void 1631dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq, 1632 dispatch_block_t db) 1633{ 1634 dispatch_group_async_f(dg, dq, _dispatch_Block_copy(db), 1635 _dispatch_call_block_and_release); 1636} 1637#endif 1638 1639#pragma mark - 1640#pragma mark dispatch_function_invoke 1641 1642DISPATCH_ALWAYS_INLINE 1643static inline void 1644_dispatch_function_invoke(dispatch_queue_t dq, void *ctxt, 1645 dispatch_function_t func) 1646{ 1647 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key); 1648 _dispatch_thread_setspecific(dispatch_queue_key, dq); 1649 _dispatch_client_callout(ctxt, func); 1650 _dispatch_perfmon_workitem_inc(); 1651 _dispatch_thread_setspecific(dispatch_queue_key, old_dq); 1652} 1653 1654void 1655_dispatch_sync_recurse_invoke(void *ctxt) 1656{ 1657 dispatch_continuation_t dc = ctxt; 1658 _dispatch_function_invoke(dc->dc_data, dc->dc_ctxt, dc->dc_func); 1659} 1660 1661DISPATCH_ALWAYS_INLINE 1662static inline void 1663_dispatch_function_recurse(dispatch_queue_t dq, void *ctxt, 1664 dispatch_function_t func) 1665{ 1666 struct dispatch_continuation_s dc = { 1667 .dc_data = dq, 1668 .dc_func = func, 1669 .dc_ctxt = ctxt, 1670 }; 1671 dispatch_sync_f(dq->do_targetq, &dc, _dispatch_sync_recurse_invoke); 1672} 1673 1674#pragma mark - 1675#pragma mark dispatch_barrier_sync 1676 1677static void _dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt, 1678 dispatch_function_t func); 1679 1680DISPATCH_ALWAYS_INLINE_NDEBUG 1681static inline _dispatch_thread_semaphore_t 1682_dispatch_barrier_sync_f_pop(dispatch_queue_t dq, dispatch_object_t dou, 1683 bool lock) 1684{ 1685 _dispatch_thread_semaphore_t sema; 1686 dispatch_continuation_t dc = dou._dc; 1687 1688 if (DISPATCH_OBJ_IS_VTABLE(dc) || ((long)dc->do_vtable & 1689 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) != 1690 (DISPATCH_OBJ_BARRIER_BIT | DISPATCH_OBJ_SYNC_SLOW_BIT)) { 1691 return 0; 1692 } 1693 _dispatch_trace_continuation_pop(dq, dc); 1694 _dispatch_perfmon_workitem_inc(); 1695 1696 dc = dc->dc_ctxt; 1697 dq = dc->dc_data; 1698 sema = (_dispatch_thread_semaphore_t)dc->dc_other; 1699 if (lock) { 1700 (void)dispatch_atomic_add2o(dq, do_suspend_cnt, 1701 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed); 1702 // rdar://problem/9032024 running lock must be held until sync_f_slow 1703 // returns 1704 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 1705 } 1706 _dispatch_introspection_queue_item_complete(dou); 1707 return sema ? sema : MACH_PORT_DEAD; 1708} 1709 1710static void 1711_dispatch_barrier_sync_f_slow_invoke(void *ctxt) 1712{ 1713 dispatch_continuation_t dc = ctxt; 1714 dispatch_queue_t dq = dc->dc_data; 1715 _dispatch_thread_semaphore_t sema; 1716 sema = (_dispatch_thread_semaphore_t)dc->dc_other; 1717 1718 dispatch_assert(dq == _dispatch_queue_get_current()); 1719#if DISPATCH_COCOA_COMPAT 1720 if (slowpath(dq->dq_is_thread_bound)) { 1721 // The queue is bound to a non-dispatch thread (e.g. main thread) 1722 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); 1723 dispatch_atomic_store2o(dc, dc_func, NULL, release); 1724 _dispatch_thread_semaphore_signal(sema); // release 1725 return; 1726 } 1727#endif 1728 (void)dispatch_atomic_add2o(dq, do_suspend_cnt, 1729 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed); 1730 // rdar://9032024 running lock must be held until sync_f_slow returns 1731 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 1732 _dispatch_thread_semaphore_signal(sema); // release 1733} 1734 1735DISPATCH_NOINLINE 1736static void 1737_dispatch_barrier_sync_f_slow(dispatch_queue_t dq, void *ctxt, 1738 dispatch_function_t func) 1739{ 1740 if (slowpath(!dq->do_targetq)) { 1741 // the global concurrent queues do not need strict ordering 1742 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 1743 return _dispatch_sync_f_invoke(dq, ctxt, func); 1744 } 1745 // It's preferred to execute synchronous blocks on the current thread 1746 // due to thread-local side effects, garbage collection, etc. However, 1747 // blocks submitted to the main thread MUST be run on the main thread 1748 1749 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore(); 1750 struct dispatch_continuation_s dc = { 1751 .dc_data = dq, 1752#if DISPATCH_COCOA_COMPAT 1753 .dc_func = func, 1754 .dc_ctxt = ctxt, 1755#endif 1756 .dc_other = (void*)sema, 1757 }; 1758 struct dispatch_continuation_s dbss = { 1759 .do_vtable = (void *)(DISPATCH_OBJ_BARRIER_BIT | 1760 DISPATCH_OBJ_SYNC_SLOW_BIT), 1761 .dc_func = _dispatch_barrier_sync_f_slow_invoke, 1762 .dc_ctxt = &dc, 1763#if DISPATCH_INTROSPECTION 1764 .dc_data = (void*)_dispatch_thread_self(), 1765#endif 1766 }; 1767 _dispatch_queue_push(dq, &dbss); 1768 1769 _dispatch_thread_semaphore_wait(sema); // acquire 1770 _dispatch_put_thread_semaphore(sema); 1771 1772#if DISPATCH_COCOA_COMPAT 1773 // Queue bound to a non-dispatch thread 1774 if (dc.dc_func == NULL) { 1775 return; 1776 } 1777#endif 1778 if (slowpath(dq->do_targetq->do_targetq)) { 1779 _dispatch_function_recurse(dq, ctxt, func); 1780 } else { 1781 _dispatch_function_invoke(dq, ctxt, func); 1782 } 1783 if (fastpath(dq->do_suspend_cnt < 2 * DISPATCH_OBJECT_SUSPEND_INTERVAL) && 1784 dq->dq_running == 2) { 1785 // rdar://problem/8290662 "lock transfer" 1786 sema = _dispatch_queue_drain_one_barrier_sync(dq); 1787 if (sema) { 1788 _dispatch_thread_semaphore_signal(sema); // release 1789 return; 1790 } 1791 } 1792 (void)dispatch_atomic_sub2o(dq, do_suspend_cnt, 1793 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed); 1794 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, release) == 0)) { 1795 _dispatch_wakeup(dq); 1796 } 1797} 1798 1799DISPATCH_NOINLINE 1800static void 1801_dispatch_barrier_sync_f2(dispatch_queue_t dq) 1802{ 1803 if (!slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) { 1804 // rdar://problem/8290662 "lock transfer" 1805 _dispatch_thread_semaphore_t sema; 1806 sema = _dispatch_queue_drain_one_barrier_sync(dq); 1807 if (sema) { 1808 (void)dispatch_atomic_add2o(dq, do_suspend_cnt, 1809 DISPATCH_OBJECT_SUSPEND_INTERVAL, relaxed); 1810 // rdar://9032024 running lock must be held until sync_f_slow 1811 // returns: increment by 2 and decrement by 1 1812 (void)dispatch_atomic_inc2o(dq, dq_running, relaxed); 1813 _dispatch_thread_semaphore_signal(sema); 1814 return; 1815 } 1816 } 1817 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) { 1818 _dispatch_wakeup(dq); 1819 } 1820} 1821 1822DISPATCH_NOINLINE 1823static void 1824_dispatch_barrier_sync_f_invoke(dispatch_queue_t dq, void *ctxt, 1825 dispatch_function_t func) 1826{ 1827 _dispatch_function_invoke(dq, ctxt, func); 1828 if (slowpath(dq->dq_items_tail)) { 1829 return _dispatch_barrier_sync_f2(dq); 1830 } 1831 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) { 1832 _dispatch_wakeup(dq); 1833 } 1834} 1835 1836DISPATCH_NOINLINE 1837static void 1838_dispatch_barrier_sync_f_recurse(dispatch_queue_t dq, void *ctxt, 1839 dispatch_function_t func) 1840{ 1841 _dispatch_function_recurse(dq, ctxt, func); 1842 if (slowpath(dq->dq_items_tail)) { 1843 return _dispatch_barrier_sync_f2(dq); 1844 } 1845 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) { 1846 _dispatch_wakeup(dq); 1847 } 1848} 1849 1850DISPATCH_NOINLINE 1851void 1852dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt, 1853 dispatch_function_t func) 1854{ 1855 // 1) ensure that this thread hasn't enqueued anything ahead of this call 1856 // 2) the queue is not suspended 1857 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){ 1858 return _dispatch_barrier_sync_f_slow(dq, ctxt, func); 1859 } 1860 if (slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, acquire))) { 1861 // global concurrent queues and queues bound to non-dispatch threads 1862 // always fall into the slow case 1863 return _dispatch_barrier_sync_f_slow(dq, ctxt, func); 1864 } 1865 if (slowpath(dq->do_targetq->do_targetq)) { 1866 return _dispatch_barrier_sync_f_recurse(dq, ctxt, func); 1867 } 1868 _dispatch_barrier_sync_f_invoke(dq, ctxt, func); 1869} 1870 1871#ifdef __BLOCKS__ 1872#if DISPATCH_COCOA_COMPAT 1873DISPATCH_NOINLINE 1874static void 1875_dispatch_barrier_sync_slow(dispatch_queue_t dq, void (^work)(void)) 1876{ 1877 // Blocks submitted to the main queue MUST be run on the main thread, 1878 // therefore under GC we must Block_copy in order to notify the thread-local 1879 // garbage collector that the objects are transferring to the main thread 1880 // rdar://problem/7176237&7181849&7458685 1881 if (dispatch_begin_thread_4GC) { 1882 dispatch_block_t block = _dispatch_Block_copy(work); 1883 return dispatch_barrier_sync_f(dq, block, 1884 _dispatch_call_block_and_release); 1885 } 1886 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work)); 1887} 1888#endif 1889 1890void 1891dispatch_barrier_sync(dispatch_queue_t dq, void (^work)(void)) 1892{ 1893#if DISPATCH_COCOA_COMPAT 1894 if (slowpath(dq->dq_is_thread_bound)) { 1895 return _dispatch_barrier_sync_slow(dq, work); 1896 } 1897#endif 1898 dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work)); 1899} 1900#endif 1901 1902DISPATCH_NOINLINE 1903static void 1904_dispatch_barrier_trysync_f_invoke(dispatch_queue_t dq, void *ctxt, 1905 dispatch_function_t func) 1906{ 1907 _dispatch_function_invoke(dq, ctxt, func); 1908 if (slowpath(dispatch_atomic_dec2o(dq, dq_running, release) == 0)) { 1909 _dispatch_wakeup(dq); 1910 } 1911} 1912 1913DISPATCH_NOINLINE 1914void 1915_dispatch_barrier_trysync_f(dispatch_queue_t dq, void *ctxt, 1916 dispatch_function_t func) 1917{ 1918 // Use for mutation of queue-/source-internal state only, ignores target 1919 // queue hierarchy! 1920 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq)) 1921 || slowpath(!dispatch_atomic_cmpxchg2o(dq, dq_running, 0, 1, 1922 acquire))) { 1923 return dispatch_barrier_async_f(dq, ctxt, func); 1924 } 1925 _dispatch_barrier_trysync_f_invoke(dq, ctxt, func); 1926} 1927 1928#pragma mark - 1929#pragma mark dispatch_sync 1930 1931DISPATCH_NOINLINE 1932static void 1933_dispatch_sync_f_slow(dispatch_queue_t dq, void *ctxt, dispatch_function_t func, 1934 bool wakeup) 1935{ 1936 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore(); 1937 struct dispatch_continuation_s dss = { 1938 .do_vtable = (void*)DISPATCH_OBJ_SYNC_SLOW_BIT, 1939#if DISPATCH_INTROSPECTION 1940 .dc_func = func, 1941 .dc_ctxt = ctxt, 1942 .dc_data = (void*)_dispatch_thread_self(), 1943#endif 1944 .dc_other = (void*)sema, 1945 }; 1946 _dispatch_queue_push_wakeup(dq, &dss, wakeup); 1947 1948 _dispatch_thread_semaphore_wait(sema); 1949 _dispatch_put_thread_semaphore(sema); 1950 1951 if (slowpath(dq->do_targetq->do_targetq)) { 1952 _dispatch_function_recurse(dq, ctxt, func); 1953 } else { 1954 _dispatch_function_invoke(dq, ctxt, func); 1955 } 1956 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) { 1957 _dispatch_wakeup(dq); 1958 } 1959} 1960 1961DISPATCH_NOINLINE 1962static void 1963_dispatch_sync_f_invoke(dispatch_queue_t dq, void *ctxt, 1964 dispatch_function_t func) 1965{ 1966 _dispatch_function_invoke(dq, ctxt, func); 1967 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) { 1968 _dispatch_wakeup(dq); 1969 } 1970} 1971 1972DISPATCH_NOINLINE 1973static void 1974_dispatch_sync_f_recurse(dispatch_queue_t dq, void *ctxt, 1975 dispatch_function_t func) 1976{ 1977 _dispatch_function_recurse(dq, ctxt, func); 1978 if (slowpath(dispatch_atomic_sub2o(dq, dq_running, 2, relaxed) == 0)) { 1979 _dispatch_wakeup(dq); 1980 } 1981} 1982 1983static inline void 1984_dispatch_sync_f2(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) 1985{ 1986 // 1) ensure that this thread hasn't enqueued anything ahead of this call 1987 // 2) the queue is not suspended 1988 if (slowpath(dq->dq_items_tail) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))){ 1989 return _dispatch_sync_f_slow(dq, ctxt, func, false); 1990 } 1991 uint32_t running = dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 1992 // re-check suspension after barrier check <rdar://problem/15242126> 1993 if (slowpath(running & 1) || slowpath(DISPATCH_OBJECT_SUSPENDED(dq))) { 1994 running = dispatch_atomic_sub2o(dq, dq_running, 2, relaxed); 1995 return _dispatch_sync_f_slow(dq, ctxt, func, running == 0); 1996 } 1997 if (slowpath(dq->do_targetq->do_targetq)) { 1998 return _dispatch_sync_f_recurse(dq, ctxt, func); 1999 } 2000 _dispatch_sync_f_invoke(dq, ctxt, func); 2001} 2002 2003DISPATCH_NOINLINE 2004void 2005dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func) 2006{ 2007 if (fastpath(dq->dq_width == 1)) { 2008 return dispatch_barrier_sync_f(dq, ctxt, func); 2009 } 2010 if (slowpath(!dq->do_targetq)) { 2011 // the global concurrent queues do not need strict ordering 2012 (void)dispatch_atomic_add2o(dq, dq_running, 2, relaxed); 2013 return _dispatch_sync_f_invoke(dq, ctxt, func); 2014 } 2015 _dispatch_sync_f2(dq, ctxt, func); 2016} 2017 2018#ifdef __BLOCKS__ 2019#if DISPATCH_COCOA_COMPAT 2020DISPATCH_NOINLINE 2021static void 2022_dispatch_sync_slow(dispatch_queue_t dq, void (^work)(void)) 2023{ 2024 // Blocks submitted to the main queue MUST be run on the main thread, 2025 // therefore under GC we must Block_copy in order to notify the thread-local 2026 // garbage collector that the objects are transferring to the main thread 2027 // rdar://problem/7176237&7181849&7458685 2028 if (dispatch_begin_thread_4GC) { 2029 dispatch_block_t block = _dispatch_Block_copy(work); 2030 return dispatch_sync_f(dq, block, _dispatch_call_block_and_release); 2031 } 2032 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work)); 2033} 2034#endif 2035 2036void 2037dispatch_sync(dispatch_queue_t dq, void (^work)(void)) 2038{ 2039#if DISPATCH_COCOA_COMPAT 2040 if (slowpath(dq->dq_is_thread_bound)) { 2041 return _dispatch_sync_slow(dq, work); 2042 } 2043#endif 2044 dispatch_sync_f(dq, work, _dispatch_Block_invoke(work)); 2045} 2046#endif 2047 2048#pragma mark - 2049#pragma mark dispatch_after 2050 2051void 2052_dispatch_after_timer_callback(void *ctxt) 2053{ 2054 dispatch_continuation_t dc = ctxt, dc1; 2055 dispatch_source_t ds = dc->dc_data; 2056 dc1 = _dispatch_continuation_free_cacheonly(dc); 2057 _dispatch_client_callout(dc->dc_ctxt, dc->dc_func); 2058 dispatch_source_cancel(ds); 2059 dispatch_release(ds); 2060 if (slowpath(dc1)) { 2061 _dispatch_continuation_free_to_cache_limit(dc1); 2062 } 2063} 2064 2065DISPATCH_NOINLINE 2066void 2067dispatch_after_f(dispatch_time_t when, dispatch_queue_t queue, void *ctxt, 2068 dispatch_function_t func) 2069{ 2070 uint64_t delta, leeway; 2071 dispatch_source_t ds; 2072 2073 if (when == DISPATCH_TIME_FOREVER) { 2074#if DISPATCH_DEBUG 2075 DISPATCH_CLIENT_CRASH( 2076 "dispatch_after_f() called with 'when' == infinity"); 2077#endif 2078 return; 2079 } 2080 2081 delta = _dispatch_timeout(when); 2082 if (delta == 0) { 2083 return dispatch_async_f(queue, ctxt, func); 2084 } 2085 leeway = delta / 10; // <rdar://problem/13447496> 2086 if (leeway < NSEC_PER_MSEC) leeway = NSEC_PER_MSEC; 2087 if (leeway > 60 * NSEC_PER_SEC) leeway = 60 * NSEC_PER_SEC; 2088 2089 // this function can and should be optimized to not use a dispatch source 2090 ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, queue); 2091 dispatch_assert(ds); 2092 2093 dispatch_continuation_t dc = _dispatch_continuation_alloc(); 2094 dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT); 2095 dc->dc_func = func; 2096 dc->dc_ctxt = ctxt; 2097 dc->dc_data = ds; 2098 2099 dispatch_set_context(ds, dc); 2100 dispatch_source_set_event_handler_f(ds, _dispatch_after_timer_callback); 2101 dispatch_source_set_timer(ds, when, DISPATCH_TIME_FOREVER, leeway); 2102 dispatch_resume(ds); 2103} 2104 2105#ifdef __BLOCKS__ 2106void 2107dispatch_after(dispatch_time_t when, dispatch_queue_t queue, 2108 dispatch_block_t work) 2109{ 2110 // test before the copy of the block 2111 if (when == DISPATCH_TIME_FOREVER) { 2112#if DISPATCH_DEBUG 2113 DISPATCH_CLIENT_CRASH( 2114 "dispatch_after() called with 'when' == infinity"); 2115#endif 2116 return; 2117 } 2118 dispatch_after_f(when, queue, _dispatch_Block_copy(work), 2119 _dispatch_call_block_and_release); 2120} 2121#endif 2122 2123#pragma mark - 2124#pragma mark dispatch_queue_push 2125 2126DISPATCH_NOINLINE 2127static void 2128_dispatch_queue_push_list_slow2(dispatch_queue_t dq, 2129 struct dispatch_object_s *obj) 2130{ 2131 // The queue must be retained before dq_items_head is written in order 2132 // to ensure that the reference is still valid when _dispatch_wakeup is 2133 // called. Otherwise, if preempted between the assignment to 2134 // dq_items_head and _dispatch_wakeup, the blocks submitted to the 2135 // queue may release the last reference to the queue when invoked by 2136 // _dispatch_queue_drain. <rdar://problem/6932776> 2137 _dispatch_retain(dq); 2138 dq->dq_items_head = obj; 2139 _dispatch_wakeup(dq); 2140 _dispatch_release(dq); 2141} 2142 2143DISPATCH_NOINLINE 2144void 2145_dispatch_queue_push_list_slow(dispatch_queue_t dq, 2146 struct dispatch_object_s *obj, unsigned int n) 2147{ 2148 if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) { 2149 dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed); 2150 return _dispatch_queue_wakeup_global2(dq, n); 2151 } 2152 _dispatch_queue_push_list_slow2(dq, obj); 2153} 2154 2155DISPATCH_NOINLINE 2156void 2157_dispatch_queue_push_slow(dispatch_queue_t dq, 2158 struct dispatch_object_s *obj) 2159{ 2160 if (dx_type(dq) == DISPATCH_QUEUE_ROOT_TYPE && !dq->dq_is_thread_bound) { 2161 dispatch_atomic_store2o(dq, dq_items_head, obj, relaxed); 2162 return _dispatch_queue_wakeup_global(dq); 2163 } 2164 _dispatch_queue_push_list_slow2(dq, obj); 2165} 2166 2167#pragma mark - 2168#pragma mark dispatch_queue_probe 2169 2170unsigned long 2171_dispatch_queue_probe(dispatch_queue_t dq) 2172{ 2173 return (unsigned long)slowpath(dq->dq_items_tail != NULL); 2174} 2175 2176#if DISPATCH_COCOA_COMPAT 2177unsigned long 2178_dispatch_runloop_queue_probe(dispatch_queue_t dq) 2179{ 2180 if (_dispatch_queue_probe(dq)) { 2181 if (dq->do_xref_cnt == -1) return true; // <rdar://problem/14026816> 2182 return _dispatch_runloop_queue_wakeup(dq); 2183 } 2184 return false; 2185} 2186#endif 2187 2188unsigned long 2189_dispatch_mgr_queue_probe(dispatch_queue_t dq) 2190{ 2191 if (_dispatch_queue_probe(dq)) { 2192 return _dispatch_mgr_wakeup(dq); 2193 } 2194 return false; 2195} 2196 2197unsigned long 2198_dispatch_root_queue_probe(dispatch_queue_t dq) 2199{ 2200 _dispatch_queue_wakeup_global(dq); 2201 return false; 2202} 2203 2204#pragma mark - 2205#pragma mark dispatch_wakeup 2206 2207// 6618342 Contact the team that owns the Instrument DTrace probe before 2208// renaming this symbol 2209dispatch_queue_t 2210_dispatch_wakeup(dispatch_object_t dou) 2211{ 2212 if (slowpath(DISPATCH_OBJECT_SUSPENDED(dou._do))) { 2213 return NULL; 2214 } 2215 if (!dx_probe(dou._do)) { 2216 return NULL; 2217 } 2218 if (!dispatch_atomic_cmpxchg2o(dou._do, do_suspend_cnt, 0, 2219 DISPATCH_OBJECT_SUSPEND_LOCK, release)) { 2220#if DISPATCH_COCOA_COMPAT 2221 if (dou._dq == &_dispatch_main_q) { 2222 return _dispatch_main_queue_wakeup(); 2223 } 2224#endif 2225 return NULL; 2226 } 2227 _dispatch_retain(dou._do); 2228 dispatch_queue_t tq = dou._do->do_targetq; 2229 _dispatch_queue_push(tq, dou._do); 2230 return tq; // libdispatch does not need this, but the Instrument DTrace 2231 // probe does 2232} 2233 2234#if DISPATCH_COCOA_COMPAT 2235static inline void 2236_dispatch_runloop_queue_wakeup_thread(dispatch_queue_t dq) 2237{ 2238 mach_port_t mp = (mach_port_t)dq->do_ctxt; 2239 if (!mp) { 2240 return; 2241 } 2242 kern_return_t kr = _dispatch_send_wakeup_runloop_thread(mp, 0); 2243 switch (kr) { 2244 case MACH_SEND_TIMEOUT: 2245 case MACH_SEND_TIMED_OUT: 2246 case MACH_SEND_INVALID_DEST: 2247 break; 2248 default: 2249 (void)dispatch_assume_zero(kr); 2250 break; 2251 } 2252} 2253 2254DISPATCH_NOINLINE DISPATCH_WEAK 2255unsigned long 2256_dispatch_runloop_queue_wakeup(dispatch_queue_t dq) 2257{ 2258 _dispatch_runloop_queue_wakeup_thread(dq); 2259 return false; 2260} 2261 2262DISPATCH_NOINLINE 2263static dispatch_queue_t 2264_dispatch_main_queue_wakeup(void) 2265{ 2266 dispatch_queue_t dq = &_dispatch_main_q; 2267 if (!dq->dq_is_thread_bound) { 2268 return NULL; 2269 } 2270 dispatch_once_f(&_dispatch_main_q_port_pred, dq, 2271 _dispatch_runloop_queue_port_init); 2272 _dispatch_runloop_queue_wakeup_thread(dq); 2273 return NULL; 2274} 2275#endif 2276 2277DISPATCH_NOINLINE 2278static void 2279_dispatch_queue_wakeup_global_slow(dispatch_queue_t dq, unsigned int n) 2280{ 2281 static dispatch_once_t pred; 2282 dispatch_root_queue_context_t qc = dq->do_ctxt; 2283 uint32_t i = n; 2284 int r; 2285 2286 _dispatch_debug_root_queue(dq, __func__); 2287 dispatch_once_f(&pred, NULL, _dispatch_root_queues_init); 2288 2289#if HAVE_PTHREAD_WORKQUEUES 2290#if DISPATCH_USE_PTHREAD_POOL 2291 if (qc->dgq_kworkqueue != (void*)(~0ul)) 2292#endif 2293 { 2294 _dispatch_root_queue_debug("requesting new worker thread for global " 2295 "queue: %p", dq); 2296#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 2297 if (qc->dgq_kworkqueue) { 2298 pthread_workitem_handle_t wh; 2299 unsigned int gen_cnt; 2300 do { 2301 r = pthread_workqueue_additem_np(qc->dgq_kworkqueue, 2302 _dispatch_worker_thread3, dq, &wh, &gen_cnt); 2303 (void)dispatch_assume_zero(r); 2304 } while (--i); 2305 return; 2306 } 2307#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 2308#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 2309 r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority, 2310 qc->dgq_wq_options, (int)i); 2311 (void)dispatch_assume_zero(r); 2312#endif 2313 return; 2314 } 2315#endif // HAVE_PTHREAD_WORKQUEUES 2316#if DISPATCH_USE_PTHREAD_POOL 2317 if (fastpath(qc->dgq_thread_mediator)) { 2318 while (dispatch_semaphore_signal(qc->dgq_thread_mediator)) { 2319 if (!--i) { 2320 return; 2321 } 2322 } 2323 } 2324 uint32_t j, t_count = qc->dgq_thread_pool_size; 2325 do { 2326 if (!t_count) { 2327 _dispatch_root_queue_debug("pthread pool is full for root queue: " 2328 "%p", dq); 2329 return; 2330 } 2331 j = i > t_count ? t_count : i; 2332 } while (!dispatch_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count, 2333 t_count - j, &t_count, relaxed)); 2334 2335 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; 2336 pthread_attr_t *attr = pqc ? &pqc->dpq_thread_attr : NULL; 2337 pthread_t tid, *pthr = &tid; 2338#if DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES 2339 if (slowpath(dq == &_dispatch_mgr_root_queue)) { 2340 pthr = _dispatch_mgr_root_queue_init(); 2341 } 2342#endif 2343 do { 2344 _dispatch_retain(dq); 2345 while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) { 2346 if (r != EAGAIN) { 2347 (void)dispatch_assume_zero(r); 2348 } 2349 _dispatch_temporary_resource_shortage(); 2350 } 2351 if (!attr) { 2352 r = pthread_detach(*pthr); 2353 (void)dispatch_assume_zero(r); 2354 } 2355 } while (--j); 2356#endif // DISPATCH_USE_PTHREAD_POOL 2357} 2358 2359static inline void 2360_dispatch_queue_wakeup_global2(dispatch_queue_t dq, unsigned int n) 2361{ 2362 if (!dq->dq_items_tail) { 2363 return; 2364 } 2365#if HAVE_PTHREAD_WORKQUEUES 2366 dispatch_root_queue_context_t qc = dq->do_ctxt; 2367 if ( 2368#if DISPATCH_USE_PTHREAD_POOL 2369 (qc->dgq_kworkqueue != (void*)(~0ul)) && 2370#endif 2371 !dispatch_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) { 2372 _dispatch_root_queue_debug("worker thread request still pending for " 2373 "global queue: %p", dq); 2374 return; 2375 } 2376#endif // HAVE_PTHREAD_WORKQUEUES 2377 return _dispatch_queue_wakeup_global_slow(dq, n); 2378} 2379 2380static inline void 2381_dispatch_queue_wakeup_global(dispatch_queue_t dq) 2382{ 2383 return _dispatch_queue_wakeup_global2(dq, 1); 2384} 2385 2386#pragma mark - 2387#pragma mark dispatch_queue_invoke 2388 2389DISPATCH_ALWAYS_INLINE 2390static inline dispatch_queue_t 2391dispatch_queue_invoke2(dispatch_object_t dou, 2392 _dispatch_thread_semaphore_t *sema_ptr) 2393{ 2394 dispatch_queue_t dq = dou._dq; 2395 dispatch_queue_t otq = dq->do_targetq; 2396 *sema_ptr = _dispatch_queue_drain(dq); 2397 2398 if (slowpath(otq != dq->do_targetq)) { 2399 // An item on the queue changed the target queue 2400 return dq->do_targetq; 2401 } 2402 return NULL; 2403} 2404 2405// 6618342 Contact the team that owns the Instrument DTrace probe before 2406// renaming this symbol 2407DISPATCH_NOINLINE 2408void 2409_dispatch_queue_invoke(dispatch_queue_t dq) 2410{ 2411 _dispatch_queue_class_invoke(dq, dispatch_queue_invoke2); 2412} 2413 2414#pragma mark - 2415#pragma mark dispatch_queue_drain 2416 2417DISPATCH_ALWAYS_INLINE 2418static inline struct dispatch_object_s* 2419_dispatch_queue_head(dispatch_queue_t dq) 2420{ 2421 struct dispatch_object_s *dc; 2422 while (!(dc = fastpath(dq->dq_items_head))) { 2423 dispatch_hardware_pause(); 2424 } 2425 return dc; 2426} 2427 2428DISPATCH_ALWAYS_INLINE 2429static inline struct dispatch_object_s* 2430_dispatch_queue_next(dispatch_queue_t dq, struct dispatch_object_s *dc) 2431{ 2432 struct dispatch_object_s *next_dc; 2433 next_dc = fastpath(dc->do_next); 2434 dq->dq_items_head = next_dc; 2435 if (!next_dc && !dispatch_atomic_cmpxchg2o(dq, dq_items_tail, dc, NULL, 2436 relaxed)) { 2437 // Enqueue is TIGHTLY controlled, we won't wait long. 2438 while (!(next_dc = fastpath(dc->do_next))) { 2439 dispatch_hardware_pause(); 2440 } 2441 dq->dq_items_head = next_dc; 2442 } 2443 return next_dc; 2444} 2445 2446_dispatch_thread_semaphore_t 2447_dispatch_queue_drain(dispatch_object_t dou) 2448{ 2449 dispatch_queue_t dq = dou._dq, orig_tq, old_dq; 2450 old_dq = _dispatch_thread_getspecific(dispatch_queue_key); 2451 struct dispatch_object_s *dc, *next_dc; 2452 _dispatch_thread_semaphore_t sema = 0; 2453 2454 // Continue draining sources after target queue change rdar://8928171 2455 bool check_tq = (dx_type(dq) != DISPATCH_SOURCE_KEVENT_TYPE); 2456 2457 orig_tq = dq->do_targetq; 2458 2459 _dispatch_thread_setspecific(dispatch_queue_key, dq); 2460 //dispatch_debug_queue(dq, __func__); 2461 2462 while (dq->dq_items_tail) { 2463 dc = _dispatch_queue_head(dq); 2464 do { 2465 if (DISPATCH_OBJECT_SUSPENDED(dq)) { 2466 goto out; 2467 } 2468 if (dq->dq_running > dq->dq_width) { 2469 goto out; 2470 } 2471 if (slowpath(orig_tq != dq->do_targetq) && check_tq) { 2472 goto out; 2473 } 2474 bool redirect = false; 2475 if (!fastpath(dq->dq_width == 1)) { 2476 if (!DISPATCH_OBJ_IS_VTABLE(dc) && 2477 (long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) { 2478 if (dq->dq_running > 1) { 2479 goto out; 2480 } 2481 } else { 2482 redirect = true; 2483 } 2484 } 2485 next_dc = _dispatch_queue_next(dq, dc); 2486 if (redirect) { 2487 _dispatch_continuation_redirect(dq, dc); 2488 continue; 2489 } 2490 if ((sema = _dispatch_barrier_sync_f_pop(dq, dc, true))) { 2491 goto out; 2492 } 2493 _dispatch_continuation_pop(dc); 2494 _dispatch_perfmon_workitem_inc(); 2495 } while ((dc = next_dc)); 2496 } 2497 2498out: 2499 _dispatch_thread_setspecific(dispatch_queue_key, old_dq); 2500 return sema; 2501} 2502 2503#if DISPATCH_COCOA_COMPAT 2504static void 2505_dispatch_main_queue_drain(void) 2506{ 2507 dispatch_queue_t dq = &_dispatch_main_q; 2508 if (!dq->dq_items_tail) { 2509 return; 2510 } 2511 struct dispatch_continuation_s marker = { 2512 .do_vtable = NULL, 2513 }; 2514 struct dispatch_object_s *dmarker = (void*)▮ 2515 _dispatch_queue_push_notrace(dq, dmarker); 2516 2517 _dispatch_perfmon_start(); 2518 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key); 2519 _dispatch_thread_setspecific(dispatch_queue_key, dq); 2520 2521 struct dispatch_object_s *dc, *next_dc; 2522 dc = _dispatch_queue_head(dq); 2523 do { 2524 next_dc = _dispatch_queue_next(dq, dc); 2525 if (dc == dmarker) { 2526 goto out; 2527 } 2528 _dispatch_continuation_pop(dc); 2529 _dispatch_perfmon_workitem_inc(); 2530 } while ((dc = next_dc)); 2531 DISPATCH_CRASH("Main queue corruption"); 2532 2533out: 2534 if (next_dc) { 2535 _dispatch_main_queue_wakeup(); 2536 } 2537 _dispatch_thread_setspecific(dispatch_queue_key, old_dq); 2538 _dispatch_perfmon_end(); 2539 _dispatch_force_cache_cleanup(); 2540} 2541 2542static bool 2543_dispatch_runloop_queue_drain_one(dispatch_queue_t dq) 2544{ 2545 if (!dq->dq_items_tail) { 2546 return false; 2547 } 2548 _dispatch_perfmon_start(); 2549 dispatch_queue_t old_dq = _dispatch_thread_getspecific(dispatch_queue_key); 2550 _dispatch_thread_setspecific(dispatch_queue_key, dq); 2551 2552 struct dispatch_object_s *dc, *next_dc; 2553 dc = _dispatch_queue_head(dq); 2554 next_dc = _dispatch_queue_next(dq, dc); 2555 _dispatch_continuation_pop(dc); 2556 _dispatch_perfmon_workitem_inc(); 2557 2558 _dispatch_thread_setspecific(dispatch_queue_key, old_dq); 2559 _dispatch_perfmon_end(); 2560 _dispatch_force_cache_cleanup(); 2561 return next_dc; 2562} 2563#endif 2564 2565DISPATCH_ALWAYS_INLINE_NDEBUG 2566static inline _dispatch_thread_semaphore_t 2567_dispatch_queue_drain_one_barrier_sync(dispatch_queue_t dq) 2568{ 2569 // rdar://problem/8290662 "lock transfer" 2570 struct dispatch_object_s *dc; 2571 _dispatch_thread_semaphore_t sema; 2572 2573 // queue is locked, or suspended and not being drained 2574 dc = dq->dq_items_head; 2575 if (slowpath(!dc) || !(sema = _dispatch_barrier_sync_f_pop(dq, dc, false))){ 2576 return 0; 2577 } 2578 // dequeue dc, it is a barrier sync 2579 (void)_dispatch_queue_next(dq, dc); 2580 return sema; 2581} 2582 2583void 2584_dispatch_mgr_queue_drain(void) 2585{ 2586 dispatch_queue_t dq = &_dispatch_mgr_q; 2587 if (!dq->dq_items_tail) { 2588 return _dispatch_force_cache_cleanup(); 2589 } 2590 _dispatch_perfmon_start(); 2591 if (slowpath(_dispatch_queue_drain(dq))) { 2592 DISPATCH_CRASH("Sync onto manager queue"); 2593 } 2594 _dispatch_perfmon_end(); 2595 _dispatch_force_cache_cleanup(); 2596} 2597 2598#pragma mark - 2599#pragma mark dispatch_root_queue_drain 2600 2601#ifndef DISPATCH_CONTENTION_USE_RAND 2602#define DISPATCH_CONTENTION_USE_RAND (!TARGET_OS_EMBEDDED) 2603#endif 2604#ifndef DISPATCH_CONTENTION_SPINS_MAX 2605#define DISPATCH_CONTENTION_SPINS_MAX (128 - 1) 2606#endif 2607#ifndef DISPATCH_CONTENTION_SPINS_MIN 2608#define DISPATCH_CONTENTION_SPINS_MIN (32 - 1) 2609#endif 2610#ifndef DISPATCH_CONTENTION_USLEEP_START 2611#define DISPATCH_CONTENTION_USLEEP_START 500 2612#endif 2613#ifndef DISPATCH_CONTENTION_USLEEP_MAX 2614#define DISPATCH_CONTENTION_USLEEP_MAX 100000 2615#endif 2616 2617DISPATCH_NOINLINE 2618static bool 2619_dispatch_queue_concurrent_drain_one_slow(dispatch_queue_t dq) 2620{ 2621 dispatch_root_queue_context_t qc = dq->do_ctxt; 2622 struct dispatch_object_s *const mediator = (void *)~0ul; 2623 bool pending = false, available = true; 2624 unsigned int spins, sleep_time = DISPATCH_CONTENTION_USLEEP_START; 2625 2626 do { 2627 // Spin for a short while in case the contention is temporary -- e.g. 2628 // when starting up after dispatch_apply, or when executing a few 2629 // short continuations in a row. 2630#if DISPATCH_CONTENTION_USE_RAND 2631 // Use randomness to prevent threads from resonating at the same 2632 // frequency and permanently contending. All threads sharing the same 2633 // seed value is safe with the FreeBSD rand_r implementation. 2634 static unsigned int seed; 2635 spins = (rand_r(&seed) & DISPATCH_CONTENTION_SPINS_MAX) | 2636 DISPATCH_CONTENTION_SPINS_MIN; 2637#else 2638 spins = DISPATCH_CONTENTION_SPINS_MIN + 2639 (DISPATCH_CONTENTION_SPINS_MAX-DISPATCH_CONTENTION_SPINS_MIN)/2; 2640#endif 2641 while (spins--) { 2642 dispatch_hardware_pause(); 2643 if (fastpath(dq->dq_items_head != mediator)) goto out; 2644 }; 2645 // Since we have serious contention, we need to back off. 2646 if (!pending) { 2647 // Mark this queue as pending to avoid requests for further threads 2648 (void)dispatch_atomic_inc2o(qc, dgq_pending, relaxed); 2649 pending = true; 2650 } 2651 _dispatch_contention_usleep(sleep_time); 2652 if (fastpath(dq->dq_items_head != mediator)) goto out; 2653 sleep_time *= 2; 2654 } while (sleep_time < DISPATCH_CONTENTION_USLEEP_MAX); 2655 2656 // The ratio of work to libdispatch overhead must be bad. This 2657 // scenario implies that there are too many threads in the pool. 2658 // Create a new pending thread and then exit this thread. 2659 // The kernel will grant a new thread when the load subsides. 2660 _dispatch_debug("contention on global queue: %p", dq); 2661 _dispatch_queue_wakeup_global(dq); 2662 available = false; 2663out: 2664 if (pending) { 2665 (void)dispatch_atomic_dec2o(qc, dgq_pending, relaxed); 2666 } 2667 return available; 2668} 2669 2670DISPATCH_ALWAYS_INLINE_NDEBUG 2671static inline struct dispatch_object_s * 2672_dispatch_queue_concurrent_drain_one(dispatch_queue_t dq) 2673{ 2674 struct dispatch_object_s *head, *next, *const mediator = (void *)~0ul; 2675 2676start: 2677 // The mediator value acts both as a "lock" and a signal 2678 head = dispatch_atomic_xchg2o(dq, dq_items_head, mediator, relaxed); 2679 2680 if (slowpath(head == NULL)) { 2681 // The first xchg on the tail will tell the enqueueing thread that it 2682 // is safe to blindly write out to the head pointer. A cmpxchg honors 2683 // the algorithm. 2684 (void)dispatch_atomic_cmpxchg2o(dq, dq_items_head, mediator, NULL, 2685 relaxed); 2686 _dispatch_root_queue_debug("no work on global queue: %p", dq); 2687 return NULL; 2688 } 2689 2690 if (slowpath(head == mediator)) { 2691 // This thread lost the race for ownership of the queue. 2692 if (fastpath(_dispatch_queue_concurrent_drain_one_slow(dq))) { 2693 goto start; 2694 } 2695 return NULL; 2696 } 2697 2698 // Restore the head pointer to a sane value before returning. 2699 // If 'next' is NULL, then this item _might_ be the last item. 2700 next = fastpath(head->do_next); 2701 2702 if (slowpath(!next)) { 2703 dispatch_atomic_store2o(dq, dq_items_head, NULL, relaxed); 2704 2705 if (dispatch_atomic_cmpxchg2o(dq, dq_items_tail, head, NULL, relaxed)) { 2706 // both head and tail are NULL now 2707 goto out; 2708 } 2709 2710 // There must be a next item now. This thread won't wait long. 2711 while (!(next = head->do_next)) { 2712 dispatch_hardware_pause(); 2713 } 2714 } 2715 2716 dispatch_atomic_store2o(dq, dq_items_head, next, relaxed); 2717 _dispatch_queue_wakeup_global(dq); 2718out: 2719 return head; 2720} 2721 2722static void 2723_dispatch_root_queue_drain(dispatch_queue_t dq) 2724{ 2725#if DISPATCH_DEBUG 2726 if (_dispatch_thread_getspecific(dispatch_queue_key)) { 2727 DISPATCH_CRASH("Premature thread recycling"); 2728 } 2729#endif 2730 _dispatch_thread_setspecific(dispatch_queue_key, dq); 2731 2732#if DISPATCH_COCOA_COMPAT 2733 // ensure that high-level memory management techniques do not leak/crash 2734 if (dispatch_begin_thread_4GC) { 2735 dispatch_begin_thread_4GC(); 2736 } 2737 void *pool = _dispatch_autorelease_pool_push(); 2738#endif // DISPATCH_COCOA_COMPAT 2739 2740 _dispatch_perfmon_start(); 2741 struct dispatch_object_s *item; 2742 while ((item = fastpath(_dispatch_queue_concurrent_drain_one(dq)))) { 2743 _dispatch_continuation_pop(item); 2744 } 2745 _dispatch_perfmon_end(); 2746 2747#if DISPATCH_COCOA_COMPAT 2748 _dispatch_autorelease_pool_pop(pool); 2749 if (dispatch_end_thread_4GC) { 2750 dispatch_end_thread_4GC(); 2751 } 2752#endif // DISPATCH_COCOA_COMPAT 2753 2754 _dispatch_thread_setspecific(dispatch_queue_key, NULL); 2755} 2756 2757#pragma mark - 2758#pragma mark dispatch_worker_thread 2759 2760#if HAVE_PTHREAD_WORKQUEUES 2761static void 2762_dispatch_worker_thread3(void *context) 2763{ 2764 dispatch_queue_t dq = context; 2765 dispatch_root_queue_context_t qc = dq->do_ctxt; 2766 2767 _dispatch_introspection_thread_add(); 2768 2769 (void)dispatch_atomic_dec2o(qc, dgq_pending, relaxed); 2770 _dispatch_root_queue_drain(dq); 2771 __asm__(""); // prevent tailcall (for Instrument DTrace probe) 2772 2773} 2774 2775#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 2776// 6618342 Contact the team that owns the Instrument DTrace probe before 2777// renaming this symbol 2778static void 2779_dispatch_worker_thread2(int priority, int options, 2780 void *context DISPATCH_UNUSED) 2781{ 2782 dispatch_assert(priority >= 0 && priority < WORKQ_NUM_PRIOQUEUE); 2783 dispatch_assert(!(options & ~WORKQ_ADDTHREADS_OPTION_OVERCOMMIT)); 2784 dispatch_queue_t dq = _dispatch_wq2root_queues[priority][options]; 2785 2786 return _dispatch_worker_thread3(dq); 2787} 2788#endif // HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP 2789#endif // HAVE_PTHREAD_WORKQUEUES 2790 2791#if DISPATCH_USE_PTHREAD_POOL 2792// 6618342 Contact the team that owns the Instrument DTrace probe before 2793// renaming this symbol 2794static void * 2795_dispatch_worker_thread(void *context) 2796{ 2797 dispatch_queue_t dq = context; 2798 dispatch_root_queue_context_t qc = dq->do_ctxt; 2799 dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt; 2800 2801 if (pqc && pqc->dpq_thread_configure) { 2802 pqc->dpq_thread_configure(); 2803 } 2804 2805 sigset_t mask; 2806 int r; 2807 // workaround tweaks the kernel workqueue does for us 2808 r = sigfillset(&mask); 2809 (void)dispatch_assume_zero(r); 2810 r = _dispatch_pthread_sigmask(SIG_BLOCK, &mask, NULL); 2811 (void)dispatch_assume_zero(r); 2812 _dispatch_introspection_thread_add(); 2813 2814 // Non-pthread-root-queue pthreads use a 65 second timeout in case there 2815 // are any timers that run once a minute <rdar://problem/11744973> 2816 const int64_t timeout = (pqc ? 5ull : 65ull) * NSEC_PER_SEC; 2817 2818 do { 2819 _dispatch_root_queue_drain(dq); 2820 } while (dispatch_semaphore_wait(qc->dgq_thread_mediator, 2821 dispatch_time(0, timeout)) == 0); 2822 2823 (void)dispatch_atomic_inc2o(qc, dgq_thread_pool_size, relaxed); 2824 _dispatch_queue_wakeup_global(dq); 2825 _dispatch_release(dq); 2826 2827 return NULL; 2828} 2829 2830int 2831_dispatch_pthread_sigmask(int how, sigset_t *set, sigset_t *oset) 2832{ 2833 int r; 2834 2835 /* Workaround: 6269619 Not all signals can be delivered on any thread */ 2836 2837 r = sigdelset(set, SIGILL); 2838 (void)dispatch_assume_zero(r); 2839 r = sigdelset(set, SIGTRAP); 2840 (void)dispatch_assume_zero(r); 2841#if HAVE_DECL_SIGEMT 2842 r = sigdelset(set, SIGEMT); 2843 (void)dispatch_assume_zero(r); 2844#endif 2845 r = sigdelset(set, SIGFPE); 2846 (void)dispatch_assume_zero(r); 2847 r = sigdelset(set, SIGBUS); 2848 (void)dispatch_assume_zero(r); 2849 r = sigdelset(set, SIGSEGV); 2850 (void)dispatch_assume_zero(r); 2851 r = sigdelset(set, SIGSYS); 2852 (void)dispatch_assume_zero(r); 2853 r = sigdelset(set, SIGPIPE); 2854 (void)dispatch_assume_zero(r); 2855 2856 return pthread_sigmask(how, set, oset); 2857} 2858#endif // DISPATCH_USE_PTHREAD_POOL 2859 2860#pragma mark - 2861#pragma mark dispatch_runloop_queue 2862 2863static bool _dispatch_program_is_probably_callback_driven; 2864 2865#if DISPATCH_COCOA_COMPAT 2866 2867dispatch_queue_t 2868_dispatch_runloop_root_queue_create_4CF(const char *label, unsigned long flags) 2869{ 2870 dispatch_queue_t dq; 2871 size_t dqs; 2872 2873 if (slowpath(flags)) { 2874 return NULL; 2875 } 2876 dqs = sizeof(struct dispatch_queue_s) - DISPATCH_QUEUE_CACHELINE_PAD; 2877 dq = _dispatch_alloc(DISPATCH_VTABLE(queue_runloop), dqs); 2878 _dispatch_queue_init(dq); 2879 dq->do_targetq = _dispatch_get_root_queue(0, true); 2880 dq->dq_label = label ? label : "runloop-queue"; // no-copy contract 2881 dq->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_LOCK; 2882 dq->dq_running = 1; 2883 dq->dq_is_thread_bound = 1; 2884 _dispatch_runloop_queue_port_init(dq); 2885 _dispatch_queue_set_bound_thread(dq); 2886 _dispatch_object_debug(dq, "%s", __func__); 2887 return _dispatch_introspection_queue_create(dq); 2888} 2889 2890void 2891_dispatch_runloop_queue_xref_dispose(dispatch_queue_t dq) 2892{ 2893 _dispatch_object_debug(dq, "%s", __func__); 2894 (void)dispatch_atomic_dec2o(dq, dq_running, relaxed); 2895 unsigned int suspend_cnt = dispatch_atomic_sub2o(dq, do_suspend_cnt, 2896 DISPATCH_OBJECT_SUSPEND_LOCK, release); 2897 _dispatch_queue_clear_bound_thread(dq); 2898 if (suspend_cnt == 0) { 2899 _dispatch_wakeup(dq); 2900 } 2901} 2902 2903void 2904_dispatch_runloop_queue_dispose(dispatch_queue_t dq) 2905{ 2906 _dispatch_object_debug(dq, "%s", __func__); 2907 _dispatch_introspection_queue_dispose(dq); 2908 _dispatch_runloop_queue_port_dispose(dq); 2909 _dispatch_queue_destroy(dq); 2910} 2911 2912bool 2913_dispatch_runloop_root_queue_perform_4CF(dispatch_queue_t dq) 2914{ 2915 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) { 2916 DISPATCH_CLIENT_CRASH("Not a runloop queue"); 2917 } 2918 dispatch_retain(dq); 2919 bool r = _dispatch_runloop_queue_drain_one(dq); 2920 dispatch_release(dq); 2921 return r; 2922} 2923 2924void 2925_dispatch_runloop_root_queue_wakeup_4CF(dispatch_queue_t dq) 2926{ 2927 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) { 2928 DISPATCH_CLIENT_CRASH("Not a runloop queue"); 2929 } 2930 _dispatch_runloop_queue_probe(dq); 2931} 2932 2933mach_port_t 2934_dispatch_runloop_root_queue_get_port_4CF(dispatch_queue_t dq) 2935{ 2936 if (slowpath(dq->do_vtable != DISPATCH_VTABLE(queue_runloop))) { 2937 DISPATCH_CLIENT_CRASH("Not a runloop queue"); 2938 } 2939 return (mach_port_t)dq->do_ctxt; 2940} 2941 2942static void 2943_dispatch_runloop_queue_port_init(void *ctxt) 2944{ 2945 dispatch_queue_t dq = (dispatch_queue_t)ctxt; 2946 mach_port_t mp; 2947 kern_return_t kr; 2948 2949 _dispatch_safe_fork = false; 2950 kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE, &mp); 2951 DISPATCH_VERIFY_MIG(kr); 2952 (void)dispatch_assume_zero(kr); 2953 kr = mach_port_insert_right(mach_task_self(), mp, mp, 2954 MACH_MSG_TYPE_MAKE_SEND); 2955 DISPATCH_VERIFY_MIG(kr); 2956 (void)dispatch_assume_zero(kr); 2957 if (dq != &_dispatch_main_q) { 2958 struct mach_port_limits limits = { 2959 .mpl_qlimit = 1, 2960 }; 2961 kr = mach_port_set_attributes(mach_task_self(), mp, 2962 MACH_PORT_LIMITS_INFO, (mach_port_info_t)&limits, 2963 sizeof(limits)); 2964 DISPATCH_VERIFY_MIG(kr); 2965 (void)dispatch_assume_zero(kr); 2966 } 2967 dq->do_ctxt = (void*)(uintptr_t)mp; 2968 2969 _dispatch_program_is_probably_callback_driven = true; 2970} 2971 2972static void 2973_dispatch_runloop_queue_port_dispose(dispatch_queue_t dq) 2974{ 2975 mach_port_t mp = (mach_port_t)dq->do_ctxt; 2976 if (!mp) { 2977 return; 2978 } 2979 dq->do_ctxt = NULL; 2980 kern_return_t kr = mach_port_deallocate(mach_task_self(), mp); 2981 DISPATCH_VERIFY_MIG(kr); 2982 (void)dispatch_assume_zero(kr); 2983 kr = mach_port_mod_refs(mach_task_self(), mp, MACH_PORT_RIGHT_RECEIVE, -1); 2984 DISPATCH_VERIFY_MIG(kr); 2985 (void)dispatch_assume_zero(kr); 2986} 2987 2988#pragma mark - 2989#pragma mark dispatch_main_queue 2990 2991mach_port_t 2992_dispatch_get_main_queue_port_4CF(void) 2993{ 2994 dispatch_queue_t dq = &_dispatch_main_q; 2995 dispatch_once_f(&_dispatch_main_q_port_pred, dq, 2996 _dispatch_runloop_queue_port_init); 2997 return (mach_port_t)dq->do_ctxt; 2998} 2999 3000static bool main_q_is_draining; 3001 3002// 6618342 Contact the team that owns the Instrument DTrace probe before 3003// renaming this symbol 3004DISPATCH_NOINLINE 3005static void 3006_dispatch_queue_set_mainq_drain_state(bool arg) 3007{ 3008 main_q_is_draining = arg; 3009} 3010 3011void 3012_dispatch_main_queue_callback_4CF(mach_msg_header_t *msg DISPATCH_UNUSED) 3013{ 3014 if (main_q_is_draining) { 3015 return; 3016 } 3017 _dispatch_queue_set_mainq_drain_state(true); 3018 _dispatch_main_queue_drain(); 3019 _dispatch_queue_set_mainq_drain_state(false); 3020} 3021 3022#endif 3023 3024void 3025dispatch_main(void) 3026{ 3027#if HAVE_PTHREAD_MAIN_NP 3028 if (pthread_main_np()) { 3029#endif 3030 _dispatch_object_debug(&_dispatch_main_q, "%s", __func__); 3031 _dispatch_program_is_probably_callback_driven = true; 3032 pthread_exit(NULL); 3033 DISPATCH_CRASH("pthread_exit() returned"); 3034#if HAVE_PTHREAD_MAIN_NP 3035 } 3036 DISPATCH_CLIENT_CRASH("dispatch_main() must be called on the main thread"); 3037#endif 3038} 3039 3040DISPATCH_NOINLINE DISPATCH_NORETURN 3041static void 3042_dispatch_sigsuspend(void) 3043{ 3044 static const sigset_t mask; 3045 3046 for (;;) { 3047 sigsuspend(&mask); 3048 } 3049} 3050 3051DISPATCH_NORETURN 3052static void 3053_dispatch_sig_thread(void *ctxt DISPATCH_UNUSED) 3054{ 3055 // never returns, so burn bridges behind us 3056 _dispatch_clear_stack(0); 3057 _dispatch_sigsuspend(); 3058} 3059 3060DISPATCH_NOINLINE 3061static void 3062_dispatch_queue_cleanup2(void) 3063{ 3064 dispatch_queue_t dq = &_dispatch_main_q; 3065 (void)dispatch_atomic_dec2o(dq, dq_running, relaxed); 3066 unsigned int suspend_cnt = dispatch_atomic_sub2o(dq, do_suspend_cnt, 3067 DISPATCH_OBJECT_SUSPEND_LOCK, release); 3068 dq->dq_is_thread_bound = 0; 3069 if (suspend_cnt == 0) { 3070 _dispatch_wakeup(dq); 3071 } 3072 3073 // overload the "probably" variable to mean that dispatch_main() or 3074 // similar non-POSIX API was called 3075 // this has to run before the DISPATCH_COCOA_COMPAT below 3076 if (_dispatch_program_is_probably_callback_driven) { 3077 dispatch_async_f(_dispatch_get_root_queue(0, true), NULL, 3078 _dispatch_sig_thread); 3079 sleep(1); // workaround 6778970 3080 } 3081 3082#if DISPATCH_COCOA_COMPAT 3083 dispatch_once_f(&_dispatch_main_q_port_pred, dq, 3084 _dispatch_runloop_queue_port_init); 3085 _dispatch_runloop_queue_port_dispose(dq); 3086#endif 3087} 3088 3089static void 3090_dispatch_queue_cleanup(void *ctxt) 3091{ 3092 if (ctxt == &_dispatch_main_q) { 3093 return _dispatch_queue_cleanup2(); 3094 } 3095 // POSIX defines that destructors are only called if 'ctxt' is non-null 3096 DISPATCH_CRASH("Premature thread exit while a dispatch queue is running"); 3097} 3098