1/* 2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved. 3 * 4 * @APPLE_APACHE_LICENSE_HEADER_START@ 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 * @APPLE_APACHE_LICENSE_HEADER_END@ 19 */ 20 21#include "internal.h" 22 23typedef void (*dispatch_apply_function_t)(void *, size_t); 24 25DISPATCH_ALWAYS_INLINE 26static inline void 27_dispatch_apply_invoke2(void *ctxt, bool redirect) 28{ 29 dispatch_apply_t da = (dispatch_apply_t)ctxt; 30 size_t const iter = da->da_iterations; 31 size_t idx, done = 0; 32 33 idx = dispatch_atomic_inc_orig2o(da, da_index, acquire); 34 if (!fastpath(idx < iter)) goto out; 35 36 // da_dc is only safe to access once the 'index lock' has been acquired 37 dispatch_apply_function_t const func = (void *)da->da_dc->dc_func; 38 void *const da_ctxt = da->da_dc->dc_ctxt; 39 dispatch_queue_t dq = da->da_dc->dc_data; 40 41 _dispatch_perfmon_workitem_dec(); // this unit executes many items 42 43 // Handle nested dispatch_apply rdar://problem/9294578 44 size_t nested = (size_t)_dispatch_thread_getspecific(dispatch_apply_key); 45 _dispatch_thread_setspecific(dispatch_apply_key, (void*)da->da_nested); 46 47 dispatch_queue_t old_dq; 48 pthread_priority_t old_dp; 49 if (redirect) { 50 old_dq = _dispatch_thread_getspecific(dispatch_queue_key); 51 _dispatch_thread_setspecific(dispatch_queue_key, dq); 52 old_dp = _dispatch_set_defaultpriority(dq->dq_priority); 53 } 54 55 // Striding is the responsibility of the caller. 56 do { 57 _dispatch_client_callout2(da_ctxt, idx, func); 58 _dispatch_perfmon_workitem_inc(); 59 done++; 60 idx = dispatch_atomic_inc_orig2o(da, da_index, relaxed); 61 } while (fastpath(idx < iter)); 62 63 if (redirect) { 64 _dispatch_reset_defaultpriority(old_dp); 65 _dispatch_thread_setspecific(dispatch_queue_key, old_dq); 66 } 67 _dispatch_thread_setspecific(dispatch_apply_key, (void*)nested); 68 69 // The thread that finished the last workitem wakes up the possibly waiting 70 // thread that called dispatch_apply. They could be one and the same. 71 if (!dispatch_atomic_sub2o(da, da_todo, done, release)) { 72 _dispatch_thread_semaphore_signal(da->da_sema); 73 } 74out: 75 if (dispatch_atomic_dec2o(da, da_thr_cnt, release) == 0) { 76 _dispatch_continuation_free((dispatch_continuation_t)da); 77 } 78} 79 80DISPATCH_NOINLINE 81void 82_dispatch_apply_invoke(void *ctxt) 83{ 84 _dispatch_apply_invoke2(ctxt, false); 85} 86 87DISPATCH_NOINLINE 88void 89_dispatch_apply_redirect_invoke(void *ctxt) 90{ 91 _dispatch_apply_invoke2(ctxt, true); 92} 93 94static void 95_dispatch_apply_serial(void *ctxt) 96{ 97 dispatch_apply_t da = (dispatch_apply_t)ctxt; 98 dispatch_continuation_t dc = da->da_dc; 99 size_t const iter = da->da_iterations; 100 size_t idx = 0; 101 102 _dispatch_perfmon_workitem_dec(); // this unit executes many items 103 do { 104 _dispatch_client_callout2(dc->dc_ctxt, idx, (void*)dc->dc_func); 105 _dispatch_perfmon_workitem_inc(); 106 } while (++idx < iter); 107 108 _dispatch_continuation_free((dispatch_continuation_t)da); 109} 110 111DISPATCH_ALWAYS_INLINE 112static inline void 113_dispatch_apply_f2(dispatch_queue_t dq, dispatch_apply_t da, 114 dispatch_function_t func) 115{ 116 uint32_t i = 0; 117 dispatch_continuation_t head = NULL, tail = NULL; 118 119 // The current thread does not need a continuation 120 uint32_t continuation_cnt = da->da_thr_cnt - 1; 121 122 dispatch_assert(continuation_cnt); 123 124 for (i = 0; i < continuation_cnt; i++) { 125 dispatch_continuation_t next = _dispatch_continuation_alloc(); 126 next->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT; 127 next->dc_func = func; 128 next->dc_ctxt = da; 129 _dispatch_continuation_voucher_set(next, 0); 130 _dispatch_continuation_priority_set(next, 0, 0); 131 132 next->do_next = head; 133 head = next; 134 135 if (!tail) { 136 tail = next; 137 } 138 } 139 140 _dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore(); 141 da->da_sema = sema; 142 143 _dispatch_queue_push_list(dq, head, tail, head->dc_priority, 144 continuation_cnt); 145 // Call the first element directly 146 _dispatch_apply_invoke(da); 147 _dispatch_perfmon_workitem_inc(); 148 149 _dispatch_thread_semaphore_wait(sema); 150 _dispatch_put_thread_semaphore(sema); 151 152} 153 154static void 155_dispatch_apply_redirect(void *ctxt) 156{ 157 dispatch_apply_t da = (dispatch_apply_t)ctxt; 158 uint32_t da_width = 2 * (da->da_thr_cnt - 1); 159 dispatch_queue_t dq = da->da_dc->dc_data, rq = dq, tq; 160 161 do { 162 uint32_t running, width = rq->dq_width; 163 running = dispatch_atomic_add2o(rq, dq_running, da_width, relaxed); 164 if (slowpath(running > width)) { 165 uint32_t excess = width > 1 ? running - width : da_width; 166 for (tq = dq; 1; tq = tq->do_targetq) { 167 (void)dispatch_atomic_sub2o(tq, dq_running, excess, relaxed); 168 if (tq == rq) { 169 break; 170 } 171 } 172 da_width -= excess; 173 if (slowpath(!da_width)) { 174 return _dispatch_apply_serial(da); 175 } 176 da->da_thr_cnt -= excess / 2; 177 } 178 rq = rq->do_targetq; 179 } while (slowpath(rq->do_targetq)); 180 _dispatch_apply_f2(rq, da, _dispatch_apply_redirect_invoke); 181 do { 182 (void)dispatch_atomic_sub2o(dq, dq_running, da_width, relaxed); 183 dq = dq->do_targetq; 184 } while (slowpath(dq->do_targetq)); 185} 186 187#define DISPATCH_APPLY_MAX UINT16_MAX // must be < sqrt(SIZE_MAX) 188 189DISPATCH_NOINLINE 190void 191dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt, 192 void (*func)(void *, size_t)) 193{ 194 if (slowpath(iterations == 0)) { 195 return; 196 } 197 uint32_t thr_cnt = dispatch_hw_config(active_cpus); 198 size_t nested = (size_t)_dispatch_thread_getspecific(dispatch_apply_key); 199 if (!slowpath(nested)) { 200 nested = iterations; 201 } else { 202 thr_cnt = nested < thr_cnt ? thr_cnt / nested : 1; 203 nested = nested < DISPATCH_APPLY_MAX && iterations < DISPATCH_APPLY_MAX 204 ? nested * iterations : DISPATCH_APPLY_MAX; 205 } 206 if (iterations < thr_cnt) { 207 thr_cnt = (uint32_t)iterations; 208 } 209 struct dispatch_continuation_s dc = { 210 .dc_func = (void*)func, 211 .dc_ctxt = ctxt, 212 }; 213 dispatch_apply_t da = (typeof(da))_dispatch_continuation_alloc(); 214 da->da_index = 0; 215 da->da_todo = iterations; 216 da->da_iterations = iterations; 217 da->da_nested = nested; 218 da->da_thr_cnt = thr_cnt; 219 da->da_dc = &dc; 220 221 dispatch_queue_t old_dq; 222 old_dq = (dispatch_queue_t)_dispatch_thread_getspecific(dispatch_queue_key); 223 if (slowpath(dq == DISPATCH_APPLY_CURRENT_ROOT_QUEUE)) { 224 dq = old_dq ? old_dq : _dispatch_get_root_queue( 225 _DISPATCH_QOS_CLASS_DEFAULT, false); 226 while (slowpath(dq->do_targetq)) { 227 dq = dq->do_targetq; 228 } 229 } 230 if (slowpath(dq->dq_width <= 2) || slowpath(thr_cnt <= 1)) { 231 return dispatch_sync_f(dq, da, _dispatch_apply_serial); 232 } 233 if (slowpath(dq->do_targetq)) { 234 if (slowpath(dq == old_dq)) { 235 return dispatch_sync_f(dq, da, _dispatch_apply_serial); 236 } else { 237 dc.dc_data = dq; 238 return dispatch_sync_f(dq, da, _dispatch_apply_redirect); 239 } 240 } 241 _dispatch_thread_setspecific(dispatch_queue_key, dq); 242 _dispatch_apply_f2(dq, da, _dispatch_apply_invoke); 243 _dispatch_thread_setspecific(dispatch_queue_key, old_dq); 244} 245 246#ifdef __BLOCKS__ 247#if DISPATCH_COCOA_COMPAT 248DISPATCH_NOINLINE 249static void 250_dispatch_apply_slow(size_t iterations, dispatch_queue_t dq, 251 void (^work)(size_t)) 252{ 253 dispatch_block_t bb = _dispatch_Block_copy((void *)work); 254 dispatch_apply_f(iterations, dq, bb, 255 (dispatch_apply_function_t)_dispatch_Block_invoke(bb)); 256 Block_release(bb); 257} 258#endif 259 260void 261dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t)) 262{ 263#if DISPATCH_COCOA_COMPAT 264 // Under GC, blocks transferred to other threads must be Block_copy()ed 265 // rdar://problem/7455071 266 if (dispatch_begin_thread_4GC) { 267 return _dispatch_apply_slow(iterations, dq, work); 268 } 269#endif 270 dispatch_apply_f(iterations, dq, work, 271 (dispatch_apply_function_t)_dispatch_Block_invoke(work)); 272} 273#endif 274 275#if 0 276#ifdef __BLOCKS__ 277void 278dispatch_stride(size_t offset, size_t stride, size_t iterations, 279 dispatch_queue_t dq, void (^work)(size_t)) 280{ 281 dispatch_stride_f(offset, stride, iterations, dq, work, 282 (dispatch_apply_function_t)_dispatch_Block_invoke(work)); 283} 284#endif 285 286DISPATCH_NOINLINE 287void 288dispatch_stride_f(size_t offset, size_t stride, size_t iterations, 289 dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t)) 290{ 291 if (stride == 0) { 292 stride = 1; 293 } 294 dispatch_apply(iterations / stride, queue, ^(size_t idx) { 295 size_t i = idx * stride + offset; 296 size_t stop = i + stride; 297 do { 298 func(ctxt, i++); 299 } while (i < stop); 300 }); 301 302 dispatch_sync(queue, ^{ 303 size_t i; 304 for (i = iterations - (iterations % stride); i < iterations; i++) { 305 func(ctxt, i + offset); 306 } 307 }); 308} 309#endif 310