1/*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21#include "internal.h"
22
23typedef void (*dispatch_apply_function_t)(void *, size_t);
24
25DISPATCH_ALWAYS_INLINE
26static inline void
27_dispatch_apply_invoke2(void *ctxt, bool redirect)
28{
29	dispatch_apply_t da = (dispatch_apply_t)ctxt;
30	size_t const iter = da->da_iterations;
31	size_t idx, done = 0;
32
33	idx = dispatch_atomic_inc_orig2o(da, da_index, acquire);
34	if (!fastpath(idx < iter)) goto out;
35
36	// da_dc is only safe to access once the 'index lock' has been acquired
37	dispatch_apply_function_t const func = (void *)da->da_dc->dc_func;
38	void *const da_ctxt = da->da_dc->dc_ctxt;
39	dispatch_queue_t dq = da->da_dc->dc_data;
40
41	_dispatch_perfmon_workitem_dec(); // this unit executes many items
42
43	// Handle nested dispatch_apply rdar://problem/9294578
44	size_t nested = (size_t)_dispatch_thread_getspecific(dispatch_apply_key);
45	_dispatch_thread_setspecific(dispatch_apply_key, (void*)da->da_nested);
46
47	dispatch_queue_t old_dq;
48	pthread_priority_t old_dp;
49	if (redirect) {
50		old_dq = _dispatch_thread_getspecific(dispatch_queue_key);
51		_dispatch_thread_setspecific(dispatch_queue_key, dq);
52		old_dp = _dispatch_set_defaultpriority(dq->dq_priority);
53	}
54
55	// Striding is the responsibility of the caller.
56	do {
57		_dispatch_client_callout2(da_ctxt, idx, func);
58		_dispatch_perfmon_workitem_inc();
59		done++;
60		idx = dispatch_atomic_inc_orig2o(da, da_index, relaxed);
61	} while (fastpath(idx < iter));
62
63	if (redirect) {
64		_dispatch_reset_defaultpriority(old_dp);
65		_dispatch_thread_setspecific(dispatch_queue_key, old_dq);
66	}
67	_dispatch_thread_setspecific(dispatch_apply_key, (void*)nested);
68
69	// The thread that finished the last workitem wakes up the possibly waiting
70	// thread that called dispatch_apply. They could be one and the same.
71	if (!dispatch_atomic_sub2o(da, da_todo, done, release)) {
72		_dispatch_thread_semaphore_signal(da->da_sema);
73	}
74out:
75	if (dispatch_atomic_dec2o(da, da_thr_cnt, release) == 0) {
76		_dispatch_continuation_free((dispatch_continuation_t)da);
77	}
78}
79
80DISPATCH_NOINLINE
81void
82_dispatch_apply_invoke(void *ctxt)
83{
84	_dispatch_apply_invoke2(ctxt, false);
85}
86
87DISPATCH_NOINLINE
88void
89_dispatch_apply_redirect_invoke(void *ctxt)
90{
91	_dispatch_apply_invoke2(ctxt, true);
92}
93
94static void
95_dispatch_apply_serial(void *ctxt)
96{
97	dispatch_apply_t da = (dispatch_apply_t)ctxt;
98	dispatch_continuation_t dc = da->da_dc;
99	size_t const iter = da->da_iterations;
100	size_t idx = 0;
101
102	_dispatch_perfmon_workitem_dec(); // this unit executes many items
103	do {
104		_dispatch_client_callout2(dc->dc_ctxt, idx, (void*)dc->dc_func);
105		_dispatch_perfmon_workitem_inc();
106	} while (++idx < iter);
107
108	_dispatch_continuation_free((dispatch_continuation_t)da);
109}
110
111DISPATCH_ALWAYS_INLINE
112static inline void
113_dispatch_apply_f2(dispatch_queue_t dq, dispatch_apply_t da,
114		dispatch_function_t func)
115{
116	uint32_t i = 0;
117	dispatch_continuation_t head = NULL, tail = NULL;
118
119	// The current thread does not need a continuation
120	uint32_t continuation_cnt = da->da_thr_cnt - 1;
121
122	dispatch_assert(continuation_cnt);
123
124	for (i = 0; i < continuation_cnt; i++) {
125		dispatch_continuation_t next = _dispatch_continuation_alloc();
126		next->do_vtable = (void *)DISPATCH_OBJ_ASYNC_BIT;
127		next->dc_func = func;
128		next->dc_ctxt = da;
129		_dispatch_continuation_voucher_set(next, 0);
130		_dispatch_continuation_priority_set(next, 0, 0);
131
132		next->do_next = head;
133		head = next;
134
135		if (!tail) {
136			tail = next;
137		}
138	}
139
140	_dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
141	da->da_sema = sema;
142
143	_dispatch_queue_push_list(dq, head, tail, head->dc_priority,
144			continuation_cnt);
145	// Call the first element directly
146	_dispatch_apply_invoke(da);
147	_dispatch_perfmon_workitem_inc();
148
149	_dispatch_thread_semaphore_wait(sema);
150	_dispatch_put_thread_semaphore(sema);
151
152}
153
154static void
155_dispatch_apply_redirect(void *ctxt)
156{
157	dispatch_apply_t da = (dispatch_apply_t)ctxt;
158	uint32_t da_width = 2 * (da->da_thr_cnt - 1);
159	dispatch_queue_t dq = da->da_dc->dc_data, rq = dq, tq;
160
161	do {
162		uint32_t running, width = rq->dq_width;
163		running = dispatch_atomic_add2o(rq, dq_running, da_width, relaxed);
164		if (slowpath(running > width)) {
165			uint32_t excess = width > 1 ? running - width : da_width;
166			for (tq = dq; 1; tq = tq->do_targetq) {
167				(void)dispatch_atomic_sub2o(tq, dq_running, excess, relaxed);
168				if (tq == rq) {
169					break;
170				}
171			}
172			da_width -= excess;
173			if (slowpath(!da_width)) {
174				return _dispatch_apply_serial(da);
175			}
176			da->da_thr_cnt -= excess / 2;
177		}
178		rq = rq->do_targetq;
179	} while (slowpath(rq->do_targetq));
180	_dispatch_apply_f2(rq, da, _dispatch_apply_redirect_invoke);
181	do {
182		(void)dispatch_atomic_sub2o(dq, dq_running, da_width, relaxed);
183		dq = dq->do_targetq;
184	} while (slowpath(dq->do_targetq));
185}
186
187#define DISPATCH_APPLY_MAX UINT16_MAX // must be < sqrt(SIZE_MAX)
188
189DISPATCH_NOINLINE
190void
191dispatch_apply_f(size_t iterations, dispatch_queue_t dq, void *ctxt,
192		void (*func)(void *, size_t))
193{
194	if (slowpath(iterations == 0)) {
195		return;
196	}
197	uint32_t thr_cnt = dispatch_hw_config(active_cpus);
198	size_t nested = (size_t)_dispatch_thread_getspecific(dispatch_apply_key);
199	if (!slowpath(nested)) {
200		nested = iterations;
201	} else {
202		thr_cnt = nested < thr_cnt ? thr_cnt / nested : 1;
203		nested = nested < DISPATCH_APPLY_MAX && iterations < DISPATCH_APPLY_MAX
204				? nested * iterations : DISPATCH_APPLY_MAX;
205	}
206	if (iterations < thr_cnt) {
207		thr_cnt = (uint32_t)iterations;
208	}
209	struct dispatch_continuation_s dc = {
210		.dc_func = (void*)func,
211		.dc_ctxt = ctxt,
212	};
213	dispatch_apply_t da = (typeof(da))_dispatch_continuation_alloc();
214	da->da_index = 0;
215	da->da_todo = iterations;
216	da->da_iterations = iterations;
217	da->da_nested = nested;
218	da->da_thr_cnt = thr_cnt;
219	da->da_dc = &dc;
220
221	dispatch_queue_t old_dq;
222	old_dq = (dispatch_queue_t)_dispatch_thread_getspecific(dispatch_queue_key);
223	if (slowpath(dq == DISPATCH_APPLY_CURRENT_ROOT_QUEUE)) {
224		dq = old_dq ? old_dq : _dispatch_get_root_queue(
225				_DISPATCH_QOS_CLASS_DEFAULT, false);
226		while (slowpath(dq->do_targetq)) {
227			dq = dq->do_targetq;
228		}
229	}
230	if (slowpath(dq->dq_width <= 2) || slowpath(thr_cnt <= 1)) {
231		return dispatch_sync_f(dq, da, _dispatch_apply_serial);
232	}
233	if (slowpath(dq->do_targetq)) {
234		if (slowpath(dq == old_dq)) {
235			return dispatch_sync_f(dq, da, _dispatch_apply_serial);
236		} else {
237			dc.dc_data = dq;
238			return dispatch_sync_f(dq, da, _dispatch_apply_redirect);
239		}
240	}
241	_dispatch_thread_setspecific(dispatch_queue_key, dq);
242	_dispatch_apply_f2(dq, da, _dispatch_apply_invoke);
243	_dispatch_thread_setspecific(dispatch_queue_key, old_dq);
244}
245
246#ifdef __BLOCKS__
247#if DISPATCH_COCOA_COMPAT
248DISPATCH_NOINLINE
249static void
250_dispatch_apply_slow(size_t iterations, dispatch_queue_t dq,
251		void (^work)(size_t))
252{
253	dispatch_block_t bb = _dispatch_Block_copy((void *)work);
254	dispatch_apply_f(iterations, dq, bb,
255			(dispatch_apply_function_t)_dispatch_Block_invoke(bb));
256	Block_release(bb);
257}
258#endif
259
260void
261dispatch_apply(size_t iterations, dispatch_queue_t dq, void (^work)(size_t))
262{
263#if DISPATCH_COCOA_COMPAT
264	// Under GC, blocks transferred to other threads must be Block_copy()ed
265	// rdar://problem/7455071
266	if (dispatch_begin_thread_4GC) {
267		return _dispatch_apply_slow(iterations, dq, work);
268	}
269#endif
270	dispatch_apply_f(iterations, dq, work,
271			(dispatch_apply_function_t)_dispatch_Block_invoke(work));
272}
273#endif
274
275#if 0
276#ifdef __BLOCKS__
277void
278dispatch_stride(size_t offset, size_t stride, size_t iterations,
279		dispatch_queue_t dq, void (^work)(size_t))
280{
281	dispatch_stride_f(offset, stride, iterations, dq, work,
282			(dispatch_apply_function_t)_dispatch_Block_invoke(work));
283}
284#endif
285
286DISPATCH_NOINLINE
287void
288dispatch_stride_f(size_t offset, size_t stride, size_t iterations,
289		dispatch_queue_t dq, void *ctxt, void (*func)(void *, size_t))
290{
291	if (stride == 0) {
292		stride = 1;
293	}
294	dispatch_apply(iterations / stride, queue, ^(size_t idx) {
295		size_t i = idx * stride + offset;
296		size_t stop = i + stride;
297		do {
298			func(ctxt, i++);
299		} while (i < stop);
300	});
301
302	dispatch_sync(queue, ^{
303		size_t i;
304		for (i = iterations - (iterations % stride); i < iterations; i++) {
305			func(ctxt, i + offset);
306		}
307	});
308}
309#endif
310