1/*
2 * Copyright (c) 2008-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_APACHE_LICENSE_HEADER_START@
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * @APPLE_APACHE_LICENSE_HEADER_END@
19 */
20
21#include "internal.h"
22#if HAVE_MACH
23#include "protocol.h"
24#include "protocolServer.h"
25#endif
26#include <sys/mount.h>
27
28static void _dispatch_source_merge_kevent(dispatch_source_t ds,
29		const struct kevent64_s *ke);
30static bool _dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp);
31static void _dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg);
32static bool _dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
33		uint32_t del_flags);
34static void _dispatch_kevent_drain(struct kevent64_s *ke);
35static void _dispatch_kevent_merge(struct kevent64_s *ke);
36static void _dispatch_timers_kevent(struct kevent64_s *ke);
37static void _dispatch_timers_unregister(dispatch_source_t ds,
38		dispatch_kevent_t dk);
39static void _dispatch_timers_update(dispatch_source_t ds);
40static void _dispatch_timer_aggregates_check(void);
41static void _dispatch_timer_aggregates_register(dispatch_source_t ds);
42static void _dispatch_timer_aggregates_update(dispatch_source_t ds,
43		unsigned int tidx);
44static void _dispatch_timer_aggregates_unregister(dispatch_source_t ds,
45		unsigned int tidx);
46static inline unsigned long _dispatch_source_timer_data(
47		dispatch_source_refs_t dr, unsigned long prev);
48static long _dispatch_kq_update(const struct kevent64_s *);
49static void _dispatch_memorystatus_init(void);
50#if HAVE_MACH
51static void _dispatch_mach_host_calendar_change_register(void);
52static void _dispatch_mach_recv_msg_buf_init(void);
53static kern_return_t _dispatch_kevent_machport_resume(dispatch_kevent_t dk,
54		uint32_t new_flags, uint32_t del_flags);
55static kern_return_t _dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk,
56		uint32_t new_flags, uint32_t del_flags);
57static inline void _dispatch_kevent_mach_portset(struct kevent64_s *ke);
58#else
59static inline void _dispatch_mach_host_calendar_change_register(void) {}
60static inline void _dispatch_mach_recv_msg_buf_init(void) {}
61#endif
62static const char * _evfiltstr(short filt);
63#if DISPATCH_DEBUG
64static void _dispatch_kevent_debug(struct kevent64_s* kev, const char* str);
65static void _dispatch_kevent_debugger(void *context);
66#define DISPATCH_ASSERT_ON_MANAGER_QUEUE() \
67	dispatch_assert(_dispatch_queue_get_current() == &_dispatch_mgr_q)
68#else
69static inline void
70_dispatch_kevent_debug(struct kevent64_s* kev DISPATCH_UNUSED,
71		const char* str DISPATCH_UNUSED) {}
72#define DISPATCH_ASSERT_ON_MANAGER_QUEUE()
73#endif
74
75#pragma mark -
76#pragma mark dispatch_source_t
77
78dispatch_source_t
79dispatch_source_create(dispatch_source_type_t type,
80	uintptr_t handle,
81	unsigned long mask,
82	dispatch_queue_t q)
83{
84	const struct kevent64_s *proto_kev = &type->ke;
85	dispatch_source_t ds;
86	dispatch_kevent_t dk;
87
88	// input validation
89	if (type == NULL || (mask & ~type->mask)) {
90		return NULL;
91	}
92
93	switch (type->ke.filter) {
94	case EVFILT_SIGNAL:
95		if (handle >= NSIG) {
96			return NULL;
97		}
98		break;
99	case EVFILT_FS:
100#if DISPATCH_USE_VM_PRESSURE
101	case EVFILT_VM:
102#endif
103#if DISPATCH_USE_MEMORYSTATUS
104	case EVFILT_MEMORYSTATUS:
105#endif
106	case DISPATCH_EVFILT_CUSTOM_ADD:
107	case DISPATCH_EVFILT_CUSTOM_OR:
108		if (handle) {
109			return NULL;
110		}
111		break;
112	case DISPATCH_EVFILT_TIMER:
113		if (!!handle ^ !!type->ke.ident) {
114			return NULL;
115		}
116		break;
117	default:
118		break;
119	}
120
121	ds = _dispatch_alloc(DISPATCH_VTABLE(source),
122			sizeof(struct dispatch_source_s));
123	// Initialize as a queue first, then override some settings below.
124	_dispatch_queue_init((dispatch_queue_t)ds);
125	ds->dq_label = "source";
126
127	ds->do_ref_cnt++; // the reference the manager queue holds
128	ds->do_ref_cnt++; // since source is created suspended
129	ds->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
130	// The initial target queue is the manager queue, in order to get
131	// the source installed. <rdar://problem/8928171>
132	ds->do_targetq = &_dispatch_mgr_q;
133
134	dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
135	dk->dk_kevent = *proto_kev;
136	dk->dk_kevent.ident = handle;
137	dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
138	dk->dk_kevent.fflags |= (uint32_t)mask;
139	dk->dk_kevent.udata = (uintptr_t)dk;
140	TAILQ_INIT(&dk->dk_sources);
141
142	ds->ds_dkev = dk;
143	ds->ds_pending_data_mask = dk->dk_kevent.fflags;
144	ds->ds_ident_hack = (uintptr_t)dk->dk_kevent.ident;
145	if ((EV_DISPATCH|EV_ONESHOT) & proto_kev->flags) {
146		ds->ds_is_level = true;
147		ds->ds_needs_rearm = true;
148	} else if (!(EV_CLEAR & proto_kev->flags)) {
149		// we cheat and use EV_CLEAR to mean a "flag thingy"
150		ds->ds_is_adder = true;
151	}
152	// Some sources require special processing
153	if (type->init != NULL) {
154		type->init(ds, type, handle, mask, q);
155	}
156	dispatch_assert(!(ds->ds_is_level && ds->ds_is_adder));
157
158	if (fastpath(!ds->ds_refs)) {
159		ds->ds_refs = _dispatch_calloc(1ul,
160				sizeof(struct dispatch_source_refs_s));
161	}
162	ds->ds_refs->dr_source_wref = _dispatch_ptr2wref(ds);
163
164	// First item on the queue sets the user-specified target queue
165	dispatch_set_target_queue(ds, q);
166	_dispatch_object_debug(ds, "%s", __func__);
167	return ds;
168}
169
170void
171_dispatch_source_dispose(dispatch_source_t ds)
172{
173	_dispatch_object_debug(ds, "%s", __func__);
174	free(ds->ds_refs);
175	_dispatch_queue_destroy(ds);
176}
177
178void
179_dispatch_source_xref_dispose(dispatch_source_t ds)
180{
181	_dispatch_wakeup(ds);
182}
183
184void
185dispatch_source_cancel(dispatch_source_t ds)
186{
187	_dispatch_object_debug(ds, "%s", __func__);
188	// Right after we set the cancel flag, someone else
189	// could potentially invoke the source, do the cancelation,
190	// unregister the source, and deallocate it. We would
191	// need to therefore retain/release before setting the bit
192
193	_dispatch_retain(ds);
194	(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_CANCELED, relaxed);
195	_dispatch_wakeup(ds);
196	_dispatch_release(ds);
197}
198
199long
200dispatch_source_testcancel(dispatch_source_t ds)
201{
202	return (bool)(ds->ds_atomic_flags & DSF_CANCELED);
203}
204
205
206unsigned long
207dispatch_source_get_mask(dispatch_source_t ds)
208{
209	return ds->ds_pending_data_mask;
210}
211
212uintptr_t
213dispatch_source_get_handle(dispatch_source_t ds)
214{
215	return (unsigned int)ds->ds_ident_hack;
216}
217
218unsigned long
219dispatch_source_get_data(dispatch_source_t ds)
220{
221	return ds->ds_data;
222}
223
224void
225dispatch_source_merge_data(dispatch_source_t ds, unsigned long val)
226{
227	struct kevent64_s kev = {
228		.fflags = (typeof(kev.fflags))val,
229		.data = (typeof(kev.data))val,
230	};
231
232	dispatch_assert(
233			ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_ADD ||
234			ds->ds_dkev->dk_kevent.filter == DISPATCH_EVFILT_CUSTOM_OR);
235
236	_dispatch_source_merge_kevent(ds, &kev);
237}
238
239#pragma mark -
240#pragma mark dispatch_source_handler
241
242#ifdef __BLOCKS__
243// 6618342 Contact the team that owns the Instrument DTrace probe before
244//         renaming this symbol
245static void
246_dispatch_source_set_event_handler2(void *context)
247{
248	dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
249	dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
250	dispatch_source_refs_t dr = ds->ds_refs;
251
252	if (ds->ds_handler_is_block && dr->ds_handler_ctxt) {
253		Block_release(dr->ds_handler_ctxt);
254	}
255	dr->ds_handler_func = context ? _dispatch_Block_invoke(context) : NULL;
256	dr->ds_handler_ctxt = context;
257	ds->ds_handler_is_block = true;
258}
259
260void
261dispatch_source_set_event_handler(dispatch_source_t ds,
262		dispatch_block_t handler)
263{
264	handler = _dispatch_Block_copy(handler);
265	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler,
266			_dispatch_source_set_event_handler2);
267}
268#endif /* __BLOCKS__ */
269
270static void
271_dispatch_source_set_event_handler_f(void *context)
272{
273	dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
274	dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
275	dispatch_source_refs_t dr = ds->ds_refs;
276
277#ifdef __BLOCKS__
278	if (ds->ds_handler_is_block && dr->ds_handler_ctxt) {
279		Block_release(dr->ds_handler_ctxt);
280	}
281#endif
282	dr->ds_handler_func = context;
283	dr->ds_handler_ctxt = ds->do_ctxt;
284	ds->ds_handler_is_block = false;
285}
286
287void
288dispatch_source_set_event_handler_f(dispatch_source_t ds,
289	dispatch_function_t handler)
290{
291	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler,
292			_dispatch_source_set_event_handler_f);
293}
294
295#ifdef __BLOCKS__
296// 6618342 Contact the team that owns the Instrument DTrace probe before
297//         renaming this symbol
298static void
299_dispatch_source_set_cancel_handler2(void *context)
300{
301	dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
302	dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
303	dispatch_source_refs_t dr = ds->ds_refs;
304
305	if (ds->ds_cancel_is_block && dr->ds_cancel_handler) {
306		Block_release(dr->ds_cancel_handler);
307	}
308	dr->ds_cancel_handler = context;
309	ds->ds_cancel_is_block = true;
310}
311
312void
313dispatch_source_set_cancel_handler(dispatch_source_t ds,
314	dispatch_block_t handler)
315{
316	handler = _dispatch_Block_copy(handler);
317	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler,
318			_dispatch_source_set_cancel_handler2);
319}
320#endif /* __BLOCKS__ */
321
322static void
323_dispatch_source_set_cancel_handler_f(void *context)
324{
325	dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
326	dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
327	dispatch_source_refs_t dr = ds->ds_refs;
328
329#ifdef __BLOCKS__
330	if (ds->ds_cancel_is_block && dr->ds_cancel_handler) {
331		Block_release(dr->ds_cancel_handler);
332	}
333#endif
334	dr->ds_cancel_handler = context;
335	ds->ds_cancel_is_block = false;
336}
337
338void
339dispatch_source_set_cancel_handler_f(dispatch_source_t ds,
340	dispatch_function_t handler)
341{
342	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler,
343			_dispatch_source_set_cancel_handler_f);
344}
345
346#ifdef __BLOCKS__
347static void
348_dispatch_source_set_registration_handler2(void *context)
349{
350	dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
351	dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
352	dispatch_source_refs_t dr = ds->ds_refs;
353
354	if (ds->ds_registration_is_block && dr->ds_registration_handler) {
355		Block_release(dr->ds_registration_handler);
356	}
357	dr->ds_registration_handler = context;
358	ds->ds_registration_is_block = true;
359}
360
361void
362dispatch_source_set_registration_handler(dispatch_source_t ds,
363	dispatch_block_t handler)
364{
365	handler = _dispatch_Block_copy(handler);
366	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler,
367			_dispatch_source_set_registration_handler2);
368}
369#endif /* __BLOCKS__ */
370
371static void
372_dispatch_source_set_registration_handler_f(void *context)
373{
374	dispatch_source_t ds = (dispatch_source_t)_dispatch_queue_get_current();
375	dispatch_assert(dx_type(ds) == DISPATCH_SOURCE_KEVENT_TYPE);
376	dispatch_source_refs_t dr = ds->ds_refs;
377
378#ifdef __BLOCKS__
379	if (ds->ds_registration_is_block && dr->ds_registration_handler) {
380		Block_release(dr->ds_registration_handler);
381	}
382#endif
383	dr->ds_registration_handler = context;
384	ds->ds_registration_is_block = false;
385}
386
387void
388dispatch_source_set_registration_handler_f(dispatch_source_t ds,
389	dispatch_function_t handler)
390{
391	_dispatch_barrier_trysync_f((dispatch_queue_t)ds, handler,
392			_dispatch_source_set_registration_handler_f);
393}
394
395#pragma mark -
396#pragma mark dispatch_source_invoke
397
398static void
399_dispatch_source_registration_callout(dispatch_source_t ds)
400{
401	dispatch_source_refs_t dr = ds->ds_refs;
402
403	if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
404		// no registration callout if source is canceled rdar://problem/8955246
405#ifdef __BLOCKS__
406		if (ds->ds_registration_is_block) {
407			Block_release(dr->ds_registration_handler);
408		}
409	} else if (ds->ds_registration_is_block) {
410		dispatch_block_t b = dr->ds_registration_handler;
411		_dispatch_client_callout_block(b);
412		Block_release(dr->ds_registration_handler);
413#endif
414	} else {
415		dispatch_function_t f = dr->ds_registration_handler;
416		_dispatch_client_callout(ds->do_ctxt, f);
417	}
418	ds->ds_registration_is_block = false;
419	dr->ds_registration_handler = NULL;
420}
421
422static void
423_dispatch_source_cancel_callout(dispatch_source_t ds)
424{
425	dispatch_source_refs_t dr = ds->ds_refs;
426
427	ds->ds_pending_data_mask = 0;
428	ds->ds_pending_data = 0;
429	ds->ds_data = 0;
430
431#ifdef __BLOCKS__
432	if (ds->ds_handler_is_block) {
433		Block_release(dr->ds_handler_ctxt);
434		ds->ds_handler_is_block = false;
435		dr->ds_handler_func = NULL;
436		dr->ds_handler_ctxt = NULL;
437	}
438	if (ds->ds_registration_is_block) {
439		Block_release(dr->ds_registration_handler);
440		ds->ds_registration_is_block = false;
441		dr->ds_registration_handler = NULL;
442	}
443#endif
444
445	if (!dr->ds_cancel_handler) {
446		return;
447	}
448	if (ds->ds_cancel_is_block) {
449#ifdef __BLOCKS__
450		dispatch_block_t b = dr->ds_cancel_handler;
451		if (ds->ds_atomic_flags & DSF_CANCELED) {
452			_dispatch_client_callout_block(b);
453		}
454		Block_release(dr->ds_cancel_handler);
455		ds->ds_cancel_is_block = false;
456#endif
457	} else {
458		dispatch_function_t f = dr->ds_cancel_handler;
459		if (ds->ds_atomic_flags & DSF_CANCELED) {
460			_dispatch_client_callout(ds->do_ctxt, f);
461		}
462	}
463	dr->ds_cancel_handler = NULL;
464}
465
466static void
467_dispatch_source_latch_and_call(dispatch_source_t ds)
468{
469	unsigned long prev;
470
471	if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
472		return;
473	}
474	dispatch_source_refs_t dr = ds->ds_refs;
475	prev = dispatch_atomic_xchg2o(ds, ds_pending_data, 0, relaxed);
476	if (ds->ds_is_level) {
477		ds->ds_data = ~prev;
478	} else if (ds->ds_is_timer && ds_timer(dr).target && prev) {
479		ds->ds_data = _dispatch_source_timer_data(dr, prev);
480	} else {
481		ds->ds_data = prev;
482	}
483	if (dispatch_assume(prev) && dr->ds_handler_func) {
484		_dispatch_client_callout(dr->ds_handler_ctxt, dr->ds_handler_func);
485	}
486}
487
488static void
489_dispatch_source_kevent_unregister(dispatch_source_t ds)
490{
491	_dispatch_object_debug(ds, "%s", __func__);
492	dispatch_kevent_t dk = ds->ds_dkev;
493	ds->ds_dkev = NULL;
494	switch (dk->dk_kevent.filter) {
495	case DISPATCH_EVFILT_TIMER:
496		_dispatch_timers_unregister(ds, dk);
497		break;
498	default:
499		TAILQ_REMOVE(&dk->dk_sources, ds->ds_refs, dr_list);
500		_dispatch_kevent_unregister(dk, (uint32_t)ds->ds_pending_data_mask);
501		break;
502	}
503
504	(void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
505	ds->ds_needs_rearm = false; // re-arm is pointless and bad now
506	_dispatch_release(ds); // the retain is done at creation time
507}
508
509static void
510_dispatch_source_kevent_resume(dispatch_source_t ds, uint32_t new_flags)
511{
512	switch (ds->ds_dkev->dk_kevent.filter) {
513	case DISPATCH_EVFILT_TIMER:
514		return _dispatch_timers_update(ds);
515	case EVFILT_MACHPORT:
516		if (ds->ds_pending_data_mask & DISPATCH_MACH_RECV_MESSAGE) {
517			new_flags |= DISPATCH_MACH_RECV_MESSAGE; // emulate EV_DISPATCH
518		}
519		break;
520	}
521	if (_dispatch_kevent_resume(ds->ds_dkev, new_flags, 0)) {
522		_dispatch_source_kevent_unregister(ds);
523	}
524}
525
526static void
527_dispatch_source_kevent_register(dispatch_source_t ds)
528{
529	dispatch_assert_zero(ds->ds_is_installed);
530	switch (ds->ds_dkev->dk_kevent.filter) {
531	case DISPATCH_EVFILT_TIMER:
532		return _dispatch_timers_update(ds);
533	}
534	uint32_t flags;
535	bool do_resume = _dispatch_kevent_register(&ds->ds_dkev, &flags);
536	TAILQ_INSERT_TAIL(&ds->ds_dkev->dk_sources, ds->ds_refs, dr_list);
537	if (do_resume || ds->ds_needs_rearm) {
538		_dispatch_source_kevent_resume(ds, flags);
539	}
540	(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
541	_dispatch_object_debug(ds, "%s", __func__);
542}
543
544DISPATCH_ALWAYS_INLINE
545static inline dispatch_queue_t
546_dispatch_source_invoke2(dispatch_object_t dou,
547		_dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED)
548{
549	dispatch_source_t ds = dou._ds;
550	if (slowpath(_dispatch_queue_drain(ds))) {
551		DISPATCH_CLIENT_CRASH("Sync onto source");
552	}
553
554	// This function performs all source actions. Each action is responsible
555	// for verifying that it takes place on the appropriate queue. If the
556	// current queue is not the correct queue for this action, the correct queue
557	// will be returned and the invoke will be re-driven on that queue.
558
559	// The order of tests here in invoke and in probe should be consistent.
560
561	dispatch_queue_t dq = _dispatch_queue_get_current();
562	dispatch_source_refs_t dr = ds->ds_refs;
563
564	if (!ds->ds_is_installed) {
565		// The source needs to be installed on the manager queue.
566		if (dq != &_dispatch_mgr_q) {
567			return &_dispatch_mgr_q;
568		}
569		_dispatch_source_kevent_register(ds);
570		ds->ds_is_installed = true;
571		if (dr->ds_registration_handler) {
572			return ds->do_targetq;
573		}
574		if (slowpath(ds->do_xref_cnt == -1)) {
575			return &_dispatch_mgr_q; // rdar://problem/9558246
576		}
577	} else if (slowpath(DISPATCH_OBJECT_SUSPENDED(ds))) {
578		// Source suspended by an item drained from the source queue.
579		return NULL;
580	} else if (dr->ds_registration_handler) {
581		// The source has been registered and the registration handler needs
582		// to be delivered on the target queue.
583		if (dq != ds->do_targetq) {
584			return ds->do_targetq;
585		}
586		// clears ds_registration_handler
587		_dispatch_source_registration_callout(ds);
588		if (slowpath(ds->do_xref_cnt == -1)) {
589			return &_dispatch_mgr_q; // rdar://problem/9558246
590		}
591	} else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
592		// The source has been cancelled and needs to be uninstalled from the
593		// manager queue. After uninstallation, the cancellation handler needs
594		// to be delivered to the target queue.
595		if (ds->ds_dkev) {
596			if (dq != &_dispatch_mgr_q) {
597				return &_dispatch_mgr_q;
598			}
599			_dispatch_source_kevent_unregister(ds);
600		}
601		if (dr->ds_cancel_handler || ds->ds_handler_is_block ||
602				ds->ds_registration_is_block) {
603			if (dq != ds->do_targetq) {
604				return ds->do_targetq;
605			}
606		}
607		_dispatch_source_cancel_callout(ds);
608	} else if (ds->ds_pending_data) {
609		// The source has pending data to deliver via the event handler callback
610		// on the target queue. Some sources need to be rearmed on the manager
611		// queue after event delivery.
612		if (dq != ds->do_targetq) {
613			return ds->do_targetq;
614		}
615		_dispatch_source_latch_and_call(ds);
616		if (ds->ds_needs_rearm) {
617			return &_dispatch_mgr_q;
618		}
619	} else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
620		// The source needs to be rearmed on the manager queue.
621		if (dq != &_dispatch_mgr_q) {
622			return &_dispatch_mgr_q;
623		}
624		_dispatch_source_kevent_resume(ds, 0);
625		(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
626	}
627
628	return NULL;
629}
630
631DISPATCH_NOINLINE
632void
633_dispatch_source_invoke(dispatch_source_t ds)
634{
635	_dispatch_queue_class_invoke(ds, _dispatch_source_invoke2);
636}
637
638unsigned long
639_dispatch_source_probe(dispatch_source_t ds)
640{
641	// This function determines whether the source needs to be invoked.
642	// The order of tests here in probe and in invoke should be consistent.
643
644	dispatch_source_refs_t dr = ds->ds_refs;
645	if (!ds->ds_is_installed) {
646		// The source needs to be installed on the manager queue.
647		return true;
648	} else if (dr->ds_registration_handler) {
649		// The registration handler needs to be delivered to the target queue.
650		return true;
651	} else if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)){
652		// The source needs to be uninstalled from the manager queue, or the
653		// cancellation handler needs to be delivered to the target queue.
654		// Note: cancellation assumes installation.
655		if (ds->ds_dkev || dr->ds_cancel_handler
656#ifdef __BLOCKS__
657				|| ds->ds_handler_is_block || ds->ds_registration_is_block
658#endif
659		) {
660			return true;
661		}
662	} else if (ds->ds_pending_data) {
663		// The source has pending data to deliver to the target queue.
664		return true;
665	} else if (ds->ds_needs_rearm && !(ds->ds_atomic_flags & DSF_ARMED)) {
666		// The source needs to be rearmed on the manager queue.
667		return true;
668	}
669	return (ds->dq_items_tail != NULL);
670}
671
672static void
673_dispatch_source_merge_kevent(dispatch_source_t ds, const struct kevent64_s *ke)
674{
675	if ((ds->ds_atomic_flags & DSF_CANCELED) || (ds->do_xref_cnt == -1)) {
676		return;
677	}
678	if (ds->ds_is_level) {
679		// ke->data is signed and "negative available data" makes no sense
680		// zero bytes happens when EV_EOF is set
681		// 10A268 does not fail this assert with EVFILT_READ and a 10 GB file
682		dispatch_assert(ke->data >= 0l);
683		dispatch_atomic_store2o(ds, ds_pending_data, ~(unsigned long)ke->data,
684				relaxed);
685	} else if (ds->ds_is_adder) {
686		(void)dispatch_atomic_add2o(ds, ds_pending_data,
687				(unsigned long)ke->data, relaxed);
688	} else if (ke->fflags & ds->ds_pending_data_mask) {
689		(void)dispatch_atomic_or2o(ds, ds_pending_data,
690				ke->fflags & ds->ds_pending_data_mask, relaxed);
691	}
692	// EV_DISPATCH and EV_ONESHOT sources are no longer armed after delivery
693	if (ds->ds_needs_rearm) {
694		(void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
695	}
696
697	_dispatch_wakeup(ds);
698}
699
700#pragma mark -
701#pragma mark dispatch_kevent_t
702
703#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
704static void _dispatch_kevent_guard(dispatch_kevent_t dk);
705static void _dispatch_kevent_unguard(dispatch_kevent_t dk);
706#else
707static inline void _dispatch_kevent_guard(dispatch_kevent_t dk) { (void)dk; }
708static inline void _dispatch_kevent_unguard(dispatch_kevent_t dk) { (void)dk; }
709#endif
710
711static struct dispatch_kevent_s _dispatch_kevent_data_or = {
712	.dk_kevent = {
713		.filter = DISPATCH_EVFILT_CUSTOM_OR,
714		.flags = EV_CLEAR,
715	},
716	.dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_or.dk_sources),
717};
718static struct dispatch_kevent_s _dispatch_kevent_data_add = {
719	.dk_kevent = {
720		.filter = DISPATCH_EVFILT_CUSTOM_ADD,
721	},
722	.dk_sources = TAILQ_HEAD_INITIALIZER(_dispatch_kevent_data_add.dk_sources),
723};
724
725#define DSL_HASH(x) ((x) & (DSL_HASH_SIZE - 1))
726
727DISPATCH_CACHELINE_ALIGN
728static TAILQ_HEAD(, dispatch_kevent_s) _dispatch_sources[DSL_HASH_SIZE];
729
730static void
731_dispatch_kevent_init()
732{
733	unsigned int i;
734	for (i = 0; i < DSL_HASH_SIZE; i++) {
735		TAILQ_INIT(&_dispatch_sources[i]);
736	}
737
738	TAILQ_INSERT_TAIL(&_dispatch_sources[0],
739			&_dispatch_kevent_data_or, dk_list);
740	TAILQ_INSERT_TAIL(&_dispatch_sources[0],
741			&_dispatch_kevent_data_add, dk_list);
742	_dispatch_kevent_data_or.dk_kevent.udata =
743			(uintptr_t)&_dispatch_kevent_data_or;
744	_dispatch_kevent_data_add.dk_kevent.udata =
745			(uintptr_t)&_dispatch_kevent_data_add;
746}
747
748static inline uintptr_t
749_dispatch_kevent_hash(uint64_t ident, short filter)
750{
751	uint64_t value;
752#if HAVE_MACH
753	value = (filter == EVFILT_MACHPORT ||
754			filter == DISPATCH_EVFILT_MACH_NOTIFICATION ?
755			MACH_PORT_INDEX(ident) : ident);
756#else
757	value = ident;
758#endif
759	return DSL_HASH((uintptr_t)value);
760}
761
762static dispatch_kevent_t
763_dispatch_kevent_find(uint64_t ident, short filter)
764{
765	uintptr_t hash = _dispatch_kevent_hash(ident, filter);
766	dispatch_kevent_t dki;
767
768	TAILQ_FOREACH(dki, &_dispatch_sources[hash], dk_list) {
769		if (dki->dk_kevent.ident == ident && dki->dk_kevent.filter == filter) {
770			break;
771		}
772	}
773	return dki;
774}
775
776static void
777_dispatch_kevent_insert(dispatch_kevent_t dk)
778{
779	_dispatch_kevent_guard(dk);
780	uintptr_t hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
781			dk->dk_kevent.filter);
782	TAILQ_INSERT_TAIL(&_dispatch_sources[hash], dk, dk_list);
783}
784
785// Find existing kevents, and merge any new flags if necessary
786static bool
787_dispatch_kevent_register(dispatch_kevent_t *dkp, uint32_t *flgp)
788{
789	dispatch_kevent_t dk, ds_dkev = *dkp;
790	uint32_t new_flags;
791	bool do_resume = false;
792
793	dk = _dispatch_kevent_find(ds_dkev->dk_kevent.ident,
794			ds_dkev->dk_kevent.filter);
795	if (dk) {
796		// If an existing dispatch kevent is found, check to see if new flags
797		// need to be added to the existing kevent
798		new_flags = ~dk->dk_kevent.fflags & ds_dkev->dk_kevent.fflags;
799		dk->dk_kevent.fflags |= ds_dkev->dk_kevent.fflags;
800		free(ds_dkev);
801		*dkp = dk;
802		do_resume = new_flags;
803	} else {
804		dk = ds_dkev;
805		_dispatch_kevent_insert(dk);
806		new_flags = dk->dk_kevent.fflags;
807		do_resume = true;
808	}
809	// Re-register the kevent with the kernel if new flags were added
810	// by the dispatch kevent
811	if (do_resume) {
812		dk->dk_kevent.flags |= EV_ADD;
813	}
814	*flgp = new_flags;
815	return do_resume;
816}
817
818static bool
819_dispatch_kevent_resume(dispatch_kevent_t dk, uint32_t new_flags,
820		uint32_t del_flags)
821{
822	long r;
823	switch (dk->dk_kevent.filter) {
824	case DISPATCH_EVFILT_TIMER:
825	case DISPATCH_EVFILT_CUSTOM_ADD:
826	case DISPATCH_EVFILT_CUSTOM_OR:
827		// these types not registered with kevent
828		return 0;
829#if HAVE_MACH
830	case EVFILT_MACHPORT:
831		return _dispatch_kevent_machport_resume(dk, new_flags, del_flags);
832	case DISPATCH_EVFILT_MACH_NOTIFICATION:
833		return _dispatch_kevent_mach_notify_resume(dk, new_flags, del_flags);
834#endif
835	case EVFILT_PROC:
836		if (dk->dk_kevent.flags & EV_ONESHOT) {
837			return 0;
838		}
839		// fall through
840	default:
841		r = _dispatch_kq_update(&dk->dk_kevent);
842		if (dk->dk_kevent.flags & EV_DISPATCH) {
843			dk->dk_kevent.flags &= ~EV_ADD;
844		}
845		return r;
846	}
847}
848
849static void
850_dispatch_kevent_dispose(dispatch_kevent_t dk)
851{
852	uintptr_t hash;
853
854	switch (dk->dk_kevent.filter) {
855	case DISPATCH_EVFILT_TIMER:
856	case DISPATCH_EVFILT_CUSTOM_ADD:
857	case DISPATCH_EVFILT_CUSTOM_OR:
858		// these sources live on statically allocated lists
859		return;
860#if HAVE_MACH
861	case EVFILT_MACHPORT:
862		_dispatch_kevent_machport_resume(dk, 0, dk->dk_kevent.fflags);
863		break;
864	case DISPATCH_EVFILT_MACH_NOTIFICATION:
865		_dispatch_kevent_mach_notify_resume(dk, 0, dk->dk_kevent.fflags);
866		break;
867#endif
868	case EVFILT_PROC:
869		if (dk->dk_kevent.flags & EV_ONESHOT) {
870			break; // implicitly deleted
871		}
872		// fall through
873	default:
874		if (~dk->dk_kevent.flags & EV_DELETE) {
875			dk->dk_kevent.flags |= EV_DELETE;
876			dk->dk_kevent.flags &= ~(EV_ADD|EV_ENABLE);
877			_dispatch_kq_update(&dk->dk_kevent);
878		}
879		break;
880	}
881
882	hash = _dispatch_kevent_hash(dk->dk_kevent.ident,
883			dk->dk_kevent.filter);
884	TAILQ_REMOVE(&_dispatch_sources[hash], dk, dk_list);
885	_dispatch_kevent_unguard(dk);
886	free(dk);
887}
888
889static void
890_dispatch_kevent_unregister(dispatch_kevent_t dk, uint32_t flg)
891{
892	dispatch_source_refs_t dri;
893	uint32_t del_flags, fflags = 0;
894
895	if (TAILQ_EMPTY(&dk->dk_sources)) {
896		_dispatch_kevent_dispose(dk);
897	} else {
898		TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
899			dispatch_source_t dsi = _dispatch_source_from_refs(dri);
900			uint32_t mask = (uint32_t)dsi->ds_pending_data_mask;
901			fflags |= mask;
902		}
903		del_flags = flg & ~fflags;
904		if (del_flags) {
905			dk->dk_kevent.flags |= EV_ADD;
906			dk->dk_kevent.fflags = fflags;
907			_dispatch_kevent_resume(dk, 0, del_flags);
908		}
909	}
910}
911
912DISPATCH_NOINLINE
913static void
914_dispatch_kevent_proc_exit(struct kevent64_s *ke)
915{
916	// EVFILT_PROC may fail with ESRCH when the process exists but is a zombie
917	// <rdar://problem/5067725>. As a workaround, we simulate an exit event for
918	// any EVFILT_PROC with an invalid pid <rdar://problem/6626350>.
919	struct kevent64_s fake;
920	fake = *ke;
921	fake.flags &= ~EV_ERROR;
922	fake.fflags = NOTE_EXIT;
923	fake.data = 0;
924	_dispatch_kevent_drain(&fake);
925}
926
927DISPATCH_NOINLINE
928static void
929_dispatch_kevent_error(struct kevent64_s *ke)
930{
931	_dispatch_kevent_debug(ke, __func__);
932	if (ke->data) {
933		// log the unexpected error
934		_dispatch_bug_kevent_client("kevent", _evfiltstr(ke->filter),
935				ke->flags & EV_DELETE ? "delete" :
936				ke->flags & EV_ADD ? "add" :
937				ke->flags & EV_ENABLE ? "enable" : "monitor",
938				(int)ke->data);
939	}
940}
941
942static void
943_dispatch_kevent_drain(struct kevent64_s *ke)
944{
945#if DISPATCH_DEBUG
946	static dispatch_once_t pred;
947	dispatch_once_f(&pred, NULL, _dispatch_kevent_debugger);
948#endif
949	if (ke->filter == EVFILT_USER) {
950		return;
951	}
952	if (slowpath(ke->flags & EV_ERROR)) {
953		if (ke->filter == EVFILT_PROC) {
954			if (ke->flags & EV_DELETE) {
955				// Process exited while monitored
956				return;
957			} else if (ke->data == ESRCH) {
958				return _dispatch_kevent_proc_exit(ke);
959			}
960#if DISPATCH_USE_VM_PRESSURE
961		} else if (ke->filter == EVFILT_VM && ke->data == ENOTSUP) {
962			// Memory pressure kevent is not supported on all platforms
963			// <rdar://problem/8636227>
964			return;
965#endif
966#if DISPATCH_USE_MEMORYSTATUS
967		} else if (ke->filter == EVFILT_MEMORYSTATUS &&
968				(ke->data == EINVAL || ke->data == ENOTSUP)) {
969			// Memory status kevent is not supported on all platforms
970			return;
971#endif
972		}
973		return _dispatch_kevent_error(ke);
974	}
975	_dispatch_kevent_debug(ke, __func__);
976	if (ke->filter == EVFILT_TIMER) {
977		return _dispatch_timers_kevent(ke);
978	}
979#if HAVE_MACH
980	if (ke->filter == EVFILT_MACHPORT) {
981		return _dispatch_kevent_mach_portset(ke);
982	}
983#endif
984	return _dispatch_kevent_merge(ke);
985}
986
987DISPATCH_NOINLINE
988static void
989_dispatch_kevent_merge(struct kevent64_s *ke)
990{
991	dispatch_kevent_t dk;
992	dispatch_source_refs_t dri;
993
994	dk = (void*)ke->udata;
995	dispatch_assert(dk);
996
997	if (ke->flags & EV_ONESHOT) {
998		dk->dk_kevent.flags |= EV_ONESHOT;
999	}
1000	TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
1001		_dispatch_source_merge_kevent(_dispatch_source_from_refs(dri), ke);
1002	}
1003}
1004
1005#if DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1006static void
1007_dispatch_kevent_guard(dispatch_kevent_t dk)
1008{
1009	guardid_t guard;
1010	const unsigned int guard_flags = GUARD_CLOSE;
1011	int r, fd_flags = 0;
1012	switch (dk->dk_kevent.filter) {
1013	case EVFILT_READ:
1014	case EVFILT_WRITE:
1015	case EVFILT_VNODE:
1016		guard = &dk->dk_kevent;
1017		r = change_fdguard_np((int)dk->dk_kevent.ident, NULL, 0,
1018				&guard, guard_flags, &fd_flags);
1019		if (slowpath(r == -1)) {
1020			int err = errno;
1021			if (err != EPERM) {
1022				(void)dispatch_assume_zero(err);
1023			}
1024			return;
1025		}
1026		dk->dk_kevent.ext[0] = guard_flags;
1027		dk->dk_kevent.ext[1] = fd_flags;
1028		break;
1029	}
1030}
1031
1032static void
1033_dispatch_kevent_unguard(dispatch_kevent_t dk)
1034{
1035	guardid_t guard;
1036	unsigned int guard_flags;
1037	int r, fd_flags;
1038	switch (dk->dk_kevent.filter) {
1039	case EVFILT_READ:
1040	case EVFILT_WRITE:
1041	case EVFILT_VNODE:
1042		guard_flags = (unsigned int)dk->dk_kevent.ext[0];
1043		if (!guard_flags) {
1044			return;
1045		}
1046		guard = &dk->dk_kevent;
1047		fd_flags = (int)dk->dk_kevent.ext[1];
1048		r = change_fdguard_np((int)dk->dk_kevent.ident, &guard,
1049				guard_flags, NULL, 0, &fd_flags);
1050		if (slowpath(r == -1)) {
1051			(void)dispatch_assume_zero(errno);
1052			return;
1053		}
1054		dk->dk_kevent.ext[0] = 0;
1055		break;
1056	}
1057}
1058#endif // DISPATCH_USE_GUARDED_FD_CHANGE_FDGUARD
1059
1060#pragma mark -
1061#pragma mark dispatch_source_timer
1062
1063#if DISPATCH_USE_DTRACE && DISPATCH_USE_DTRACE_INTROSPECTION
1064static dispatch_source_refs_t
1065		_dispatch_trace_next_timer[DISPATCH_TIMER_QOS_COUNT];
1066#define _dispatch_trace_next_timer_set(x, q) \
1067		_dispatch_trace_next_timer[(q)] = (x)
1068#define _dispatch_trace_next_timer_program(d, q) \
1069		_dispatch_trace_timer_program(_dispatch_trace_next_timer[(q)], (d))
1070#define _dispatch_trace_next_timer_wake(q) \
1071		_dispatch_trace_timer_wake(_dispatch_trace_next_timer[(q)])
1072#else
1073#define _dispatch_trace_next_timer_set(x, q)
1074#define _dispatch_trace_next_timer_program(d, q)
1075#define _dispatch_trace_next_timer_wake(q)
1076#endif
1077
1078#define _dispatch_source_timer_telemetry_enabled() false
1079
1080DISPATCH_NOINLINE
1081static void
1082_dispatch_source_timer_telemetry_slow(dispatch_source_t ds,
1083		uintptr_t ident, struct dispatch_timer_source_s *values)
1084{
1085	if (_dispatch_trace_timer_configure_enabled()) {
1086		_dispatch_trace_timer_configure(ds, ident, values);
1087	}
1088}
1089
1090DISPATCH_ALWAYS_INLINE
1091static inline void
1092_dispatch_source_timer_telemetry(dispatch_source_t ds, uintptr_t ident,
1093		struct dispatch_timer_source_s *values)
1094{
1095	if (_dispatch_trace_timer_configure_enabled() ||
1096			_dispatch_source_timer_telemetry_enabled()) {
1097		_dispatch_source_timer_telemetry_slow(ds, ident, values);
1098		asm(""); // prevent tailcall
1099	}
1100}
1101
1102// approx 1 year (60s * 60m * 24h * 365d)
1103#define FOREVER_NSEC 31536000000000000ull
1104
1105DISPATCH_ALWAYS_INLINE
1106static inline uint64_t
1107_dispatch_source_timer_now(uint64_t nows[], unsigned int tidx)
1108{
1109	unsigned int tk = DISPATCH_TIMER_KIND(tidx);
1110	if (nows && fastpath(nows[tk])) {
1111		return nows[tk];
1112	}
1113	uint64_t now;
1114	switch (tk) {
1115	case DISPATCH_TIMER_KIND_MACH:
1116		now = _dispatch_absolute_time();
1117		break;
1118	case DISPATCH_TIMER_KIND_WALL:
1119		now = _dispatch_get_nanoseconds();
1120		break;
1121	}
1122	if (nows) {
1123		nows[tk] = now;
1124	}
1125	return now;
1126}
1127
1128static inline unsigned long
1129_dispatch_source_timer_data(dispatch_source_refs_t dr, unsigned long prev)
1130{
1131	// calculate the number of intervals since last fire
1132	unsigned long data, missed;
1133	uint64_t now;
1134	now = _dispatch_source_timer_now(NULL, _dispatch_source_timer_idx(dr));
1135	missed = (unsigned long)((now - ds_timer(dr).last_fire) /
1136			ds_timer(dr).interval);
1137	// correct for missed intervals already delivered last time
1138	data = prev - ds_timer(dr).missed + missed;
1139	ds_timer(dr).missed = missed;
1140	return data;
1141}
1142
1143struct dispatch_set_timer_params {
1144	dispatch_source_t ds;
1145	uintptr_t ident;
1146	struct dispatch_timer_source_s values;
1147};
1148
1149static void
1150_dispatch_source_set_timer3(void *context)
1151{
1152	// Called on the _dispatch_mgr_q
1153	struct dispatch_set_timer_params *params = context;
1154	dispatch_source_t ds = params->ds;
1155	ds->ds_ident_hack = params->ident;
1156	ds_timer(ds->ds_refs) = params->values;
1157	// Clear any pending data that might have accumulated on
1158	// older timer params <rdar://problem/8574886>
1159	ds->ds_pending_data = 0;
1160	// Re-arm in case we got disarmed because of pending set_timer suspension
1161	(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, release);
1162	dispatch_resume(ds);
1163	// Must happen after resume to avoid getting disarmed due to suspension
1164	_dispatch_timers_update(ds);
1165	dispatch_release(ds);
1166	if (params->values.flags & DISPATCH_TIMER_WALL_CLOCK) {
1167		_dispatch_mach_host_calendar_change_register();
1168	}
1169	free(params);
1170}
1171
1172static void
1173_dispatch_source_set_timer2(void *context)
1174{
1175	// Called on the source queue
1176	struct dispatch_set_timer_params *params = context;
1177	dispatch_suspend(params->ds);
1178	dispatch_barrier_async_f(&_dispatch_mgr_q, params,
1179			_dispatch_source_set_timer3);
1180}
1181
1182DISPATCH_NOINLINE
1183static struct dispatch_set_timer_params *
1184_dispatch_source_timer_params(dispatch_source_t ds, dispatch_time_t start,
1185		uint64_t interval, uint64_t leeway)
1186{
1187	struct dispatch_set_timer_params *params;
1188	params = _dispatch_calloc(1ul, sizeof(struct dispatch_set_timer_params));
1189	params->ds = ds;
1190	params->values.flags = ds_timer(ds->ds_refs).flags;
1191
1192	if (interval == 0) {
1193		// we use zero internally to mean disabled
1194		interval = 1;
1195	} else if ((int64_t)interval < 0) {
1196		// 6866347 - make sure nanoseconds won't overflow
1197		interval = INT64_MAX;
1198	}
1199	if ((int64_t)leeway < 0) {
1200		leeway = INT64_MAX;
1201	}
1202	if (start == DISPATCH_TIME_NOW) {
1203		start = _dispatch_absolute_time();
1204	} else if (start == DISPATCH_TIME_FOREVER) {
1205		start = INT64_MAX;
1206	}
1207
1208	if ((int64_t)start < 0) {
1209		// wall clock
1210		start = (dispatch_time_t)-((int64_t)start);
1211		params->values.flags |= DISPATCH_TIMER_WALL_CLOCK;
1212	} else {
1213		// absolute clock
1214		interval = _dispatch_time_nano2mach(interval);
1215		if (interval < 1) {
1216			// rdar://problem/7287561 interval must be at least one in
1217			// in order to avoid later division by zero when calculating
1218			// the missed interval count. (NOTE: the wall clock's
1219			// interval is already "fixed" to be 1 or more)
1220			interval = 1;
1221		}
1222		leeway = _dispatch_time_nano2mach(leeway);
1223		params->values.flags &= ~(unsigned long)DISPATCH_TIMER_WALL_CLOCK;
1224	}
1225	params->ident = DISPATCH_TIMER_IDENT(params->values.flags);
1226	params->values.target = start;
1227	params->values.deadline = (start < UINT64_MAX - leeway) ?
1228			start + leeway : UINT64_MAX;
1229	params->values.interval = interval;
1230	params->values.leeway = (interval == INT64_MAX || leeway < interval / 2) ?
1231			leeway : interval / 2;
1232	return params;
1233}
1234
1235DISPATCH_ALWAYS_INLINE
1236static inline void
1237_dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1238		uint64_t interval, uint64_t leeway, bool source_sync)
1239{
1240	if (slowpath(!ds->ds_is_timer) ||
1241			slowpath(ds_timer(ds->ds_refs).flags & DISPATCH_TIMER_INTERVAL)) {
1242		DISPATCH_CLIENT_CRASH("Attempt to set timer on a non-timer source");
1243	}
1244
1245	struct dispatch_set_timer_params *params;
1246	params = _dispatch_source_timer_params(ds, start, interval, leeway);
1247
1248	_dispatch_source_timer_telemetry(ds, params->ident, &params->values);
1249	// Suspend the source so that it doesn't fire with pending changes
1250	// The use of suspend/resume requires the external retain/release
1251	dispatch_retain(ds);
1252	if (source_sync) {
1253		return _dispatch_barrier_trysync_f((dispatch_queue_t)ds, params,
1254				_dispatch_source_set_timer2);
1255	} else {
1256		return _dispatch_source_set_timer2(params);
1257	}
1258}
1259
1260void
1261dispatch_source_set_timer(dispatch_source_t ds, dispatch_time_t start,
1262		uint64_t interval, uint64_t leeway)
1263{
1264	_dispatch_source_set_timer(ds, start, interval, leeway, true);
1265}
1266
1267void
1268_dispatch_source_set_runloop_timer_4CF(dispatch_source_t ds,
1269		dispatch_time_t start, uint64_t interval, uint64_t leeway)
1270{
1271	// Don't serialize through the source queue for CF timers <rdar://13833190>
1272	_dispatch_source_set_timer(ds, start, interval, leeway, false);
1273}
1274
1275void
1276_dispatch_source_set_interval(dispatch_source_t ds, uint64_t interval)
1277{
1278	dispatch_source_refs_t dr = ds->ds_refs;
1279	#define NSEC_PER_FRAME (NSEC_PER_SEC/60)
1280	const bool animation = ds_timer(dr).flags & DISPATCH_INTERVAL_UI_ANIMATION;
1281	if (fastpath(interval <= (animation ? FOREVER_NSEC/NSEC_PER_FRAME :
1282			FOREVER_NSEC/NSEC_PER_MSEC))) {
1283		interval *= animation ? NSEC_PER_FRAME : NSEC_PER_MSEC;
1284	} else {
1285		interval = FOREVER_NSEC;
1286	}
1287	interval = _dispatch_time_nano2mach(interval);
1288	uint64_t target = _dispatch_absolute_time() + interval;
1289	target = (target / interval) * interval;
1290	const uint64_t leeway = animation ?
1291			_dispatch_time_nano2mach(NSEC_PER_FRAME) : interval / 2;
1292	ds_timer(dr).target = target;
1293	ds_timer(dr).deadline = target + leeway;
1294	ds_timer(dr).interval = interval;
1295	ds_timer(dr).leeway = leeway;
1296	_dispatch_source_timer_telemetry(ds, ds->ds_ident_hack, &ds_timer(dr));
1297}
1298
1299#pragma mark -
1300#pragma mark dispatch_timers
1301
1302#define DISPATCH_TIMER_STRUCT(refs) \
1303	uint64_t target, deadline; \
1304	TAILQ_HEAD(, refs) dt_sources
1305
1306typedef struct dispatch_timer_s {
1307	DISPATCH_TIMER_STRUCT(dispatch_timer_source_refs_s);
1308} *dispatch_timer_t;
1309
1310#define DISPATCH_TIMER_INITIALIZER(tidx) \
1311	[tidx] = { \
1312		.target = UINT64_MAX, \
1313		.deadline = UINT64_MAX, \
1314		.dt_sources = TAILQ_HEAD_INITIALIZER( \
1315				_dispatch_timer[tidx].dt_sources), \
1316	}
1317#define DISPATCH_TIMER_INIT(kind, qos) \
1318		DISPATCH_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1319		DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1320
1321struct dispatch_timer_s _dispatch_timer[] =  {
1322	DISPATCH_TIMER_INIT(WALL, NORMAL),
1323	DISPATCH_TIMER_INIT(WALL, CRITICAL),
1324	DISPATCH_TIMER_INIT(WALL, BACKGROUND),
1325	DISPATCH_TIMER_INIT(MACH, NORMAL),
1326	DISPATCH_TIMER_INIT(MACH, CRITICAL),
1327	DISPATCH_TIMER_INIT(MACH, BACKGROUND),
1328};
1329#define DISPATCH_TIMER_COUNT \
1330		((sizeof(_dispatch_timer) / sizeof(_dispatch_timer[0])))
1331
1332#define DISPATCH_KEVENT_TIMER_UDATA(tidx) \
1333		(uintptr_t)&_dispatch_kevent_timer[tidx]
1334#ifdef __LP64__
1335#define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1336		.udata = DISPATCH_KEVENT_TIMER_UDATA(tidx)
1337#else // __LP64__
1338// dynamic initialization in _dispatch_timers_init()
1339#define DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx) \
1340		.udata = 0
1341#endif // __LP64__
1342#define DISPATCH_KEVENT_TIMER_INITIALIZER(tidx) \
1343	[tidx] = { \
1344		.dk_kevent = { \
1345			.ident = tidx, \
1346			.filter = DISPATCH_EVFILT_TIMER, \
1347			DISPATCH_KEVENT_TIMER_UDATA_INITIALIZER(tidx), \
1348		}, \
1349		.dk_sources = TAILQ_HEAD_INITIALIZER( \
1350				_dispatch_kevent_timer[tidx].dk_sources), \
1351	}
1352#define DISPATCH_KEVENT_TIMER_INIT(kind, qos) \
1353		DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX( \
1354		DISPATCH_TIMER_KIND_##kind, DISPATCH_TIMER_QOS_##qos))
1355
1356struct dispatch_kevent_s _dispatch_kevent_timer[] = {
1357	DISPATCH_KEVENT_TIMER_INIT(WALL, NORMAL),
1358	DISPATCH_KEVENT_TIMER_INIT(WALL, CRITICAL),
1359	DISPATCH_KEVENT_TIMER_INIT(WALL, BACKGROUND),
1360	DISPATCH_KEVENT_TIMER_INIT(MACH, NORMAL),
1361	DISPATCH_KEVENT_TIMER_INIT(MACH, CRITICAL),
1362	DISPATCH_KEVENT_TIMER_INIT(MACH, BACKGROUND),
1363	DISPATCH_KEVENT_TIMER_INITIALIZER(DISPATCH_TIMER_INDEX_DISARM),
1364};
1365#define DISPATCH_KEVENT_TIMER_COUNT \
1366		((sizeof(_dispatch_kevent_timer) / sizeof(_dispatch_kevent_timer[0])))
1367
1368#define DISPATCH_KEVENT_TIMEOUT_IDENT_MASK (~0ull << 8)
1369#define DISPATCH_KEVENT_TIMEOUT_INITIALIZER(qos, note) \
1370	[qos] = { \
1371		.ident = DISPATCH_KEVENT_TIMEOUT_IDENT_MASK|(qos), \
1372		.filter = EVFILT_TIMER, \
1373		.flags = EV_ONESHOT, \
1374		.fflags = NOTE_ABSOLUTE|NOTE_NSECONDS|NOTE_LEEWAY|(note), \
1375	}
1376#define DISPATCH_KEVENT_TIMEOUT_INIT(qos, note) \
1377		DISPATCH_KEVENT_TIMEOUT_INITIALIZER(DISPATCH_TIMER_QOS_##qos, note)
1378
1379struct kevent64_s _dispatch_kevent_timeout[] = {
1380	DISPATCH_KEVENT_TIMEOUT_INIT(NORMAL, 0),
1381	DISPATCH_KEVENT_TIMEOUT_INIT(CRITICAL, NOTE_CRITICAL),
1382	DISPATCH_KEVENT_TIMEOUT_INIT(BACKGROUND, NOTE_BACKGROUND),
1383};
1384
1385#define DISPATCH_KEVENT_COALESCING_WINDOW_INIT(qos, ms) \
1386		[DISPATCH_TIMER_QOS_##qos] = 2ull * (ms) * NSEC_PER_MSEC
1387
1388static const uint64_t _dispatch_kevent_coalescing_window[] = {
1389	DISPATCH_KEVENT_COALESCING_WINDOW_INIT(NORMAL, 75),
1390	DISPATCH_KEVENT_COALESCING_WINDOW_INIT(CRITICAL, 1),
1391	DISPATCH_KEVENT_COALESCING_WINDOW_INIT(BACKGROUND, 100),
1392};
1393
1394#define _dispatch_timers_insert(tidx, dra, dr, dr_list, dta, dt, dt_list) ({ \
1395	typeof(dr) dri = NULL; typeof(dt) dti; \
1396	if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1397		TAILQ_FOREACH(dri, &dra[tidx].dk_sources, dr_list) { \
1398			if (ds_timer(dr).target < ds_timer(dri).target) { \
1399				break; \
1400			} \
1401		} \
1402		TAILQ_FOREACH(dti, &dta[tidx].dt_sources, dt_list) { \
1403			if (ds_timer(dt).deadline < ds_timer(dti).deadline) { \
1404				break; \
1405			} \
1406		} \
1407		if (dti) { \
1408			TAILQ_INSERT_BEFORE(dti, dt, dt_list); \
1409		} else { \
1410			TAILQ_INSERT_TAIL(&dta[tidx].dt_sources, dt, dt_list); \
1411		} \
1412	} \
1413	if (dri) { \
1414		TAILQ_INSERT_BEFORE(dri, dr, dr_list); \
1415	} else { \
1416		TAILQ_INSERT_TAIL(&dra[tidx].dk_sources, dr, dr_list); \
1417	} \
1418	})
1419
1420#define _dispatch_timers_remove(tidx, dk, dra, dr, dr_list, dta, dt, dt_list) \
1421	({ \
1422	if (tidx != DISPATCH_TIMER_INDEX_DISARM) { \
1423		TAILQ_REMOVE(&dta[tidx].dt_sources, dt, dt_list); \
1424	} \
1425	TAILQ_REMOVE(dk ? &(*(dk)).dk_sources : &dra[tidx].dk_sources, dr, \
1426			dr_list); })
1427
1428#define _dispatch_timers_check(dra, dta) ({ \
1429	unsigned int qosm = _dispatch_timers_qos_mask; \
1430	bool update = false; \
1431	unsigned int tidx; \
1432	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) { \
1433		if (!(qosm & 1 << DISPATCH_TIMER_QOS(tidx))){ \
1434			continue; \
1435		} \
1436		dispatch_timer_source_refs_t dr = (dispatch_timer_source_refs_t) \
1437				TAILQ_FIRST(&dra[tidx].dk_sources); \
1438		dispatch_timer_source_refs_t dt = (dispatch_timer_source_refs_t) \
1439				TAILQ_FIRST(&dta[tidx].dt_sources); \
1440		uint64_t target = dr ? ds_timer(dr).target : UINT64_MAX; \
1441		uint64_t deadline = dr ? ds_timer(dt).deadline : UINT64_MAX; \
1442		if (target != dta[tidx].target) { \
1443			dta[tidx].target = target; \
1444			update = true; \
1445		} \
1446		if (deadline != dta[tidx].deadline) { \
1447			dta[tidx].deadline = deadline; \
1448			update = true; \
1449		} \
1450	} \
1451	update; })
1452
1453static bool _dispatch_timers_reconfigure, _dispatch_timer_expired;
1454static unsigned int _dispatch_timers_qos_mask;
1455static bool _dispatch_timers_force_max_leeway;
1456
1457static void
1458_dispatch_timers_init(void)
1459{
1460#ifndef __LP64__
1461	unsigned int tidx;
1462	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1463		_dispatch_kevent_timer[tidx].dk_kevent.udata = \
1464				DISPATCH_KEVENT_TIMER_UDATA(tidx);
1465	}
1466#endif // __LP64__
1467	_dispatch_timers_force_max_leeway =
1468			getenv("LIBDISPATCH_TIMERS_FORCE_MAX_LEEWAY");
1469}
1470
1471static inline void
1472_dispatch_timers_unregister(dispatch_source_t ds, dispatch_kevent_t dk)
1473{
1474	dispatch_source_refs_t dr = ds->ds_refs;
1475	unsigned int tidx = (unsigned int)dk->dk_kevent.ident;
1476
1477	if (slowpath(ds_timer_aggregate(ds))) {
1478		_dispatch_timer_aggregates_unregister(ds, tidx);
1479	}
1480	_dispatch_timers_remove(tidx, dk, _dispatch_kevent_timer, dr, dr_list,
1481			_dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
1482	if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1483		_dispatch_timers_reconfigure = true;
1484		_dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx);
1485	}
1486}
1487
1488// Updates the ordered list of timers based on next fire date for changes to ds.
1489// Should only be called from the context of _dispatch_mgr_q.
1490static void
1491_dispatch_timers_update(dispatch_source_t ds)
1492{
1493	dispatch_kevent_t dk = ds->ds_dkev;
1494	dispatch_source_refs_t dr = ds->ds_refs;
1495	unsigned int tidx;
1496
1497	DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1498
1499	// Do not reschedule timers unregistered with _dispatch_kevent_unregister()
1500	if (slowpath(!dk)) {
1501		return;
1502	}
1503	// Move timers that are disabled, suspended or have missed intervals to the
1504	// disarmed list, rearm after resume resp. source invoke will reenable them
1505	if (!ds_timer(dr).target || DISPATCH_OBJECT_SUSPENDED(ds) ||
1506			ds->ds_pending_data) {
1507		tidx = DISPATCH_TIMER_INDEX_DISARM;
1508		(void)dispatch_atomic_and2o(ds, ds_atomic_flags, ~DSF_ARMED, relaxed);
1509	} else {
1510		tidx = _dispatch_source_timer_idx(dr);
1511	}
1512	if (slowpath(ds_timer_aggregate(ds))) {
1513		_dispatch_timer_aggregates_register(ds);
1514	}
1515	if (slowpath(!ds->ds_is_installed)) {
1516		ds->ds_is_installed = true;
1517		if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1518			(void)dispatch_atomic_or2o(ds, ds_atomic_flags, DSF_ARMED, relaxed);
1519		}
1520		free(dk);
1521		_dispatch_object_debug(ds, "%s", __func__);
1522	} else {
1523		_dispatch_timers_unregister(ds, dk);
1524	}
1525	if (tidx != DISPATCH_TIMER_INDEX_DISARM) {
1526		_dispatch_timers_reconfigure = true;
1527		_dispatch_timers_qos_mask |= 1 << DISPATCH_TIMER_QOS(tidx);
1528	}
1529	if (dk != &_dispatch_kevent_timer[tidx]){
1530		ds->ds_dkev = &_dispatch_kevent_timer[tidx];
1531	}
1532	_dispatch_timers_insert(tidx, _dispatch_kevent_timer, dr, dr_list,
1533			_dispatch_timer, (dispatch_timer_source_refs_t)dr, dt_list);
1534	if (slowpath(ds_timer_aggregate(ds))) {
1535		_dispatch_timer_aggregates_update(ds, tidx);
1536	}
1537}
1538
1539static inline void
1540_dispatch_timers_run2(uint64_t nows[], unsigned int tidx)
1541{
1542	dispatch_source_refs_t dr;
1543	dispatch_source_t ds;
1544	uint64_t now, missed;
1545
1546	now = _dispatch_source_timer_now(nows, tidx);
1547	while ((dr = TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources))) {
1548		ds = _dispatch_source_from_refs(dr);
1549		// We may find timers on the wrong list due to a pending update from
1550		// dispatch_source_set_timer. Force an update of the list in that case.
1551		if (tidx != ds->ds_ident_hack) {
1552			_dispatch_timers_update(ds);
1553			continue;
1554		}
1555		if (!ds_timer(dr).target) {
1556			// No configured timers on the list
1557			break;
1558		}
1559		if (ds_timer(dr).target > now) {
1560			// Done running timers for now.
1561			break;
1562		}
1563		// Remove timers that are suspended or have missed intervals from the
1564		// list, rearm after resume resp. source invoke will reenable them
1565		if (DISPATCH_OBJECT_SUSPENDED(ds) || ds->ds_pending_data) {
1566			_dispatch_timers_update(ds);
1567			continue;
1568		}
1569		// Calculate number of missed intervals.
1570		missed = (now - ds_timer(dr).target) / ds_timer(dr).interval;
1571		if (++missed > INT_MAX) {
1572			missed = INT_MAX;
1573		}
1574		if (ds_timer(dr).interval < INT64_MAX) {
1575			ds_timer(dr).target += missed * ds_timer(dr).interval;
1576			ds_timer(dr).deadline = ds_timer(dr).target + ds_timer(dr).leeway;
1577		} else {
1578			ds_timer(dr).target = UINT64_MAX;
1579			ds_timer(dr).deadline = UINT64_MAX;
1580		}
1581		_dispatch_timers_update(ds);
1582		ds_timer(dr).last_fire = now;
1583
1584		unsigned long data;
1585		data = dispatch_atomic_add2o(ds, ds_pending_data,
1586				(unsigned long)missed, relaxed);
1587		_dispatch_trace_timer_fire(dr, data, (unsigned long)missed);
1588		_dispatch_wakeup(ds);
1589	}
1590}
1591
1592DISPATCH_NOINLINE
1593static void
1594_dispatch_timers_run(uint64_t nows[])
1595{
1596	unsigned int tidx;
1597	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1598		if (!TAILQ_EMPTY(&_dispatch_kevent_timer[tidx].dk_sources)) {
1599			_dispatch_timers_run2(nows, tidx);
1600		}
1601	}
1602}
1603
1604static inline unsigned int
1605_dispatch_timers_get_delay(uint64_t nows[], struct dispatch_timer_s timer[],
1606		uint64_t *delay, uint64_t *leeway, int qos)
1607{
1608	unsigned int tidx, ridx = DISPATCH_TIMER_COUNT;
1609	uint64_t tmp, delta = UINT64_MAX, dldelta = UINT64_MAX;
1610
1611	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1612		if (qos >= 0 && qos != DISPATCH_TIMER_QOS(tidx)){
1613			continue;
1614		}
1615		uint64_t target = timer[tidx].target;
1616		if (target == UINT64_MAX) {
1617			continue;
1618		}
1619		uint64_t deadline = timer[tidx].deadline;
1620		if (qos >= 0) {
1621			// Timer pre-coalescing <rdar://problem/13222034>
1622			uint64_t window = _dispatch_kevent_coalescing_window[qos];
1623			uint64_t latest = deadline > window ? deadline - window : 0;
1624			dispatch_source_refs_t dri;
1625			TAILQ_FOREACH(dri, &_dispatch_kevent_timer[tidx].dk_sources,
1626					dr_list) {
1627				tmp = ds_timer(dri).target;
1628				if (tmp > latest) break;
1629				target = tmp;
1630			}
1631		}
1632		uint64_t now = _dispatch_source_timer_now(nows, tidx);
1633		if (target <= now) {
1634			delta = 0;
1635			break;
1636		}
1637		tmp = target - now;
1638		if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) {
1639			tmp = _dispatch_time_mach2nano(tmp);
1640		}
1641		if (tmp < INT64_MAX && tmp < delta) {
1642			ridx = tidx;
1643			delta = tmp;
1644		}
1645		dispatch_assert(target <= deadline);
1646		tmp = deadline - now;
1647		if (DISPATCH_TIMER_KIND(tidx) != DISPATCH_TIMER_KIND_WALL) {
1648			tmp = _dispatch_time_mach2nano(tmp);
1649		}
1650		if (tmp < INT64_MAX && tmp < dldelta) {
1651			dldelta = tmp;
1652		}
1653	}
1654	*delay = delta;
1655	*leeway = delta && delta < UINT64_MAX ? dldelta - delta : UINT64_MAX;
1656	return ridx;
1657}
1658
1659static bool
1660_dispatch_timers_program2(uint64_t nows[], struct kevent64_s *ke,
1661		unsigned int qos)
1662{
1663	unsigned int tidx;
1664	bool poll;
1665	uint64_t delay, leeway;
1666
1667	tidx = _dispatch_timers_get_delay(nows, _dispatch_timer, &delay, &leeway,
1668			(int)qos);
1669	poll = (delay == 0);
1670	if (poll || delay == UINT64_MAX) {
1671		_dispatch_trace_next_timer_set(NULL, qos);
1672		if (!ke->data) {
1673			return poll;
1674		}
1675		ke->data = 0;
1676		ke->flags |= EV_DELETE;
1677		ke->flags &= ~(EV_ADD|EV_ENABLE);
1678	} else {
1679		_dispatch_trace_next_timer_set(
1680				TAILQ_FIRST(&_dispatch_kevent_timer[tidx].dk_sources), qos);
1681		_dispatch_trace_next_timer_program(delay, qos);
1682		delay += _dispatch_source_timer_now(nows, DISPATCH_TIMER_KIND_WALL);
1683		if (slowpath(_dispatch_timers_force_max_leeway)) {
1684			ke->data = (int64_t)(delay + leeway);
1685			ke->ext[1] = 0;
1686		} else {
1687			ke->data = (int64_t)delay;
1688			ke->ext[1] = leeway;
1689		}
1690		ke->flags |= EV_ADD|EV_ENABLE;
1691		ke->flags &= ~EV_DELETE;
1692	}
1693	_dispatch_kq_update(ke);
1694	return poll;
1695}
1696
1697DISPATCH_NOINLINE
1698static bool
1699_dispatch_timers_program(uint64_t nows[])
1700{
1701	bool poll = false;
1702	unsigned int qos, qosm = _dispatch_timers_qos_mask;
1703	for (qos = 0; qos < DISPATCH_TIMER_QOS_COUNT; qos++) {
1704		if (!(qosm & 1 << qos)){
1705			continue;
1706		}
1707		poll |= _dispatch_timers_program2(nows, &_dispatch_kevent_timeout[qos],
1708				qos);
1709	}
1710	return poll;
1711}
1712
1713DISPATCH_NOINLINE
1714static bool
1715_dispatch_timers_configure(void)
1716{
1717	_dispatch_timer_aggregates_check();
1718	// Find out if there is a new target/deadline on the timer lists
1719	return _dispatch_timers_check(_dispatch_kevent_timer, _dispatch_timer);
1720}
1721
1722static void
1723_dispatch_timers_calendar_change(void)
1724{
1725	// calendar change may have gone past the wallclock deadline
1726	_dispatch_timer_expired = true;
1727	_dispatch_timers_qos_mask = ~0u;
1728}
1729
1730static void
1731_dispatch_timers_kevent(struct kevent64_s *ke)
1732{
1733	dispatch_assert(ke->data > 0);
1734	dispatch_assert((ke->ident & DISPATCH_KEVENT_TIMEOUT_IDENT_MASK) ==
1735			DISPATCH_KEVENT_TIMEOUT_IDENT_MASK);
1736	unsigned int qos = ke->ident & ~DISPATCH_KEVENT_TIMEOUT_IDENT_MASK;
1737	dispatch_assert(qos < DISPATCH_TIMER_QOS_COUNT);
1738	dispatch_assert(_dispatch_kevent_timeout[qos].data);
1739	_dispatch_kevent_timeout[qos].data = 0; // kevent deleted via EV_ONESHOT
1740	_dispatch_timer_expired = true;
1741	_dispatch_timers_qos_mask |= 1 << qos;
1742	_dispatch_trace_next_timer_wake(qos);
1743}
1744
1745static inline bool
1746_dispatch_mgr_timers(void)
1747{
1748	uint64_t nows[DISPATCH_TIMER_KIND_COUNT] = {};
1749	bool expired = slowpath(_dispatch_timer_expired);
1750	if (expired) {
1751		_dispatch_timers_run(nows);
1752	}
1753	bool reconfigure = slowpath(_dispatch_timers_reconfigure);
1754	if (reconfigure || expired) {
1755		if (reconfigure) {
1756			reconfigure = _dispatch_timers_configure();
1757			_dispatch_timers_reconfigure = false;
1758		}
1759		if (reconfigure || expired) {
1760			expired = _dispatch_timer_expired = _dispatch_timers_program(nows);
1761			expired = expired || _dispatch_mgr_q.dq_items_tail;
1762		}
1763		_dispatch_timers_qos_mask = 0;
1764	}
1765	return expired;
1766}
1767
1768#pragma mark -
1769#pragma mark dispatch_timer_aggregate
1770
1771typedef struct {
1772	TAILQ_HEAD(, dispatch_timer_source_aggregate_refs_s) dk_sources;
1773} dispatch_timer_aggregate_refs_s;
1774
1775typedef struct dispatch_timer_aggregate_s {
1776	DISPATCH_STRUCT_HEADER(queue);
1777	DISPATCH_QUEUE_HEADER;
1778	TAILQ_ENTRY(dispatch_timer_aggregate_s) dta_list;
1779	dispatch_timer_aggregate_refs_s
1780			dta_kevent_timer[DISPATCH_KEVENT_TIMER_COUNT];
1781	struct {
1782		DISPATCH_TIMER_STRUCT(dispatch_timer_source_aggregate_refs_s);
1783	} dta_timer[DISPATCH_TIMER_COUNT];
1784	struct dispatch_timer_s dta_timer_data[DISPATCH_TIMER_COUNT];
1785	unsigned int dta_refcount;
1786} dispatch_timer_aggregate_s;
1787
1788typedef TAILQ_HEAD(, dispatch_timer_aggregate_s) dispatch_timer_aggregates_s;
1789static dispatch_timer_aggregates_s _dispatch_timer_aggregates =
1790		TAILQ_HEAD_INITIALIZER(_dispatch_timer_aggregates);
1791
1792dispatch_timer_aggregate_t
1793dispatch_timer_aggregate_create(void)
1794{
1795	unsigned int tidx;
1796	dispatch_timer_aggregate_t dta = _dispatch_alloc(DISPATCH_VTABLE(queue),
1797			sizeof(struct dispatch_timer_aggregate_s));
1798	_dispatch_queue_init((dispatch_queue_t)dta);
1799	dta->do_targetq = _dispatch_get_root_queue(DISPATCH_QUEUE_PRIORITY_HIGH,
1800			true);
1801	dta->dq_width = UINT32_MAX;
1802	//FIXME: aggregates need custom vtable
1803	//dta->dq_label = "timer-aggregate";
1804	for (tidx = 0; tidx < DISPATCH_KEVENT_TIMER_COUNT; tidx++) {
1805		TAILQ_INIT(&dta->dta_kevent_timer[tidx].dk_sources);
1806	}
1807	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1808		TAILQ_INIT(&dta->dta_timer[tidx].dt_sources);
1809		dta->dta_timer[tidx].target = UINT64_MAX;
1810		dta->dta_timer[tidx].deadline = UINT64_MAX;
1811		dta->dta_timer_data[tidx].target = UINT64_MAX;
1812		dta->dta_timer_data[tidx].deadline = UINT64_MAX;
1813	}
1814	return (dispatch_timer_aggregate_t)_dispatch_introspection_queue_create(
1815			(dispatch_queue_t)dta);
1816}
1817
1818typedef struct dispatch_timer_delay_s {
1819	dispatch_timer_t timer;
1820	uint64_t delay, leeway;
1821} *dispatch_timer_delay_t;
1822
1823static void
1824_dispatch_timer_aggregate_get_delay(void *ctxt)
1825{
1826	dispatch_timer_delay_t dtd = ctxt;
1827	struct { uint64_t nows[DISPATCH_TIMER_KIND_COUNT]; } dtn = {};
1828	_dispatch_timers_get_delay(dtn.nows, dtd->timer, &dtd->delay, &dtd->leeway,
1829			-1);
1830}
1831
1832uint64_t
1833dispatch_timer_aggregate_get_delay(dispatch_timer_aggregate_t dta,
1834		uint64_t *leeway_ptr)
1835{
1836	struct dispatch_timer_delay_s dtd = {
1837		.timer = dta->dta_timer_data,
1838	};
1839	dispatch_sync_f((dispatch_queue_t)dta, &dtd,
1840			_dispatch_timer_aggregate_get_delay);
1841	if (leeway_ptr) {
1842		*leeway_ptr = dtd.leeway;
1843	}
1844	return dtd.delay;
1845}
1846
1847static void
1848_dispatch_timer_aggregate_update(void *ctxt)
1849{
1850	dispatch_timer_aggregate_t dta = (void*)_dispatch_queue_get_current();
1851	dispatch_timer_t dtau = ctxt;
1852	unsigned int tidx;
1853	for (tidx = 0; tidx < DISPATCH_TIMER_COUNT; tidx++) {
1854		dta->dta_timer_data[tidx].target = dtau[tidx].target;
1855		dta->dta_timer_data[tidx].deadline = dtau[tidx].deadline;
1856	}
1857	free(dtau);
1858}
1859
1860DISPATCH_NOINLINE
1861static void
1862_dispatch_timer_aggregates_configure(void)
1863{
1864	dispatch_timer_aggregate_t dta;
1865	dispatch_timer_t dtau;
1866	TAILQ_FOREACH(dta, &_dispatch_timer_aggregates, dta_list) {
1867		if (!_dispatch_timers_check(dta->dta_kevent_timer, dta->dta_timer)) {
1868			continue;
1869		}
1870		dtau = _dispatch_calloc(DISPATCH_TIMER_COUNT, sizeof(*dtau));
1871		memcpy(dtau, dta->dta_timer, sizeof(dta->dta_timer));
1872		dispatch_barrier_async_f((dispatch_queue_t)dta, dtau,
1873				_dispatch_timer_aggregate_update);
1874	}
1875}
1876
1877static inline void
1878_dispatch_timer_aggregates_check(void)
1879{
1880	if (fastpath(TAILQ_EMPTY(&_dispatch_timer_aggregates))) {
1881		return;
1882	}
1883	_dispatch_timer_aggregates_configure();
1884}
1885
1886static void
1887_dispatch_timer_aggregates_register(dispatch_source_t ds)
1888{
1889	dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1890	if (!dta->dta_refcount++) {
1891		TAILQ_INSERT_TAIL(&_dispatch_timer_aggregates, dta, dta_list);
1892	}
1893}
1894
1895DISPATCH_NOINLINE
1896static void
1897_dispatch_timer_aggregates_update(dispatch_source_t ds, unsigned int tidx)
1898{
1899	dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1900	dispatch_timer_source_aggregate_refs_t dr;
1901	dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
1902	_dispatch_timers_insert(tidx, dta->dta_kevent_timer, dr, dra_list,
1903			dta->dta_timer, dr, dta_list);
1904}
1905
1906DISPATCH_NOINLINE
1907static void
1908_dispatch_timer_aggregates_unregister(dispatch_source_t ds, unsigned int tidx)
1909{
1910	dispatch_timer_aggregate_t dta = ds_timer_aggregate(ds);
1911	dispatch_timer_source_aggregate_refs_t dr;
1912	dr = (dispatch_timer_source_aggregate_refs_t)ds->ds_refs;
1913	_dispatch_timers_remove(tidx, (dispatch_timer_aggregate_refs_s*)NULL,
1914			dta->dta_kevent_timer, dr, dra_list, dta->dta_timer, dr, dta_list);
1915	if (!--dta->dta_refcount) {
1916		TAILQ_REMOVE(&_dispatch_timer_aggregates, dta, dta_list);
1917	}
1918}
1919
1920#pragma mark -
1921#pragma mark dispatch_select
1922
1923static int _dispatch_kq;
1924
1925static unsigned int _dispatch_select_workaround;
1926static fd_set _dispatch_rfds;
1927static fd_set _dispatch_wfds;
1928static uint64_t*_dispatch_rfd_ptrs;
1929static uint64_t*_dispatch_wfd_ptrs;
1930
1931DISPATCH_NOINLINE
1932static bool
1933_dispatch_select_register(struct kevent64_s *kev)
1934{
1935
1936	// Must execute on manager queue
1937	DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1938
1939	// If an EINVAL or ENOENT error occurred while adding/enabling a read or
1940	// write kevent, assume it was due to a type of filedescriptor not
1941	// supported by kqueue and fall back to select
1942	switch (kev->filter) {
1943	case EVFILT_READ:
1944		if ((kev->data == EINVAL || kev->data == ENOENT) &&
1945				dispatch_assume(kev->ident < FD_SETSIZE)) {
1946			FD_SET((int)kev->ident, &_dispatch_rfds);
1947			if (slowpath(!_dispatch_rfd_ptrs)) {
1948				_dispatch_rfd_ptrs = _dispatch_calloc(FD_SETSIZE,
1949						sizeof(*_dispatch_rfd_ptrs));
1950			}
1951			if (!_dispatch_rfd_ptrs[kev->ident]) {
1952				_dispatch_rfd_ptrs[kev->ident] = kev->udata;
1953				_dispatch_select_workaround++;
1954				_dispatch_debug("select workaround used to read fd %d: 0x%lx",
1955						(int)kev->ident, (long)kev->data);
1956			}
1957		}
1958		return true;
1959	case EVFILT_WRITE:
1960		if ((kev->data == EINVAL || kev->data == ENOENT) &&
1961				dispatch_assume(kev->ident < FD_SETSIZE)) {
1962			FD_SET((int)kev->ident, &_dispatch_wfds);
1963			if (slowpath(!_dispatch_wfd_ptrs)) {
1964				_dispatch_wfd_ptrs = _dispatch_calloc(FD_SETSIZE,
1965						sizeof(*_dispatch_wfd_ptrs));
1966			}
1967			if (!_dispatch_wfd_ptrs[kev->ident]) {
1968				_dispatch_wfd_ptrs[kev->ident] = kev->udata;
1969				_dispatch_select_workaround++;
1970				_dispatch_debug("select workaround used to write fd %d: 0x%lx",
1971						(int)kev->ident, (long)kev->data);
1972			}
1973		}
1974		return true;
1975	}
1976	return false;
1977}
1978
1979DISPATCH_NOINLINE
1980static bool
1981_dispatch_select_unregister(const struct kevent64_s *kev)
1982{
1983	// Must execute on manager queue
1984	DISPATCH_ASSERT_ON_MANAGER_QUEUE();
1985
1986	switch (kev->filter) {
1987	case EVFILT_READ:
1988		if (_dispatch_rfd_ptrs && kev->ident < FD_SETSIZE &&
1989				_dispatch_rfd_ptrs[kev->ident]) {
1990			FD_CLR((int)kev->ident, &_dispatch_rfds);
1991			_dispatch_rfd_ptrs[kev->ident] = 0;
1992			_dispatch_select_workaround--;
1993			return true;
1994		}
1995		break;
1996	case EVFILT_WRITE:
1997		if (_dispatch_wfd_ptrs && kev->ident < FD_SETSIZE &&
1998				_dispatch_wfd_ptrs[kev->ident]) {
1999			FD_CLR((int)kev->ident, &_dispatch_wfds);
2000			_dispatch_wfd_ptrs[kev->ident] = 0;
2001			_dispatch_select_workaround--;
2002			return true;
2003		}
2004		break;
2005	}
2006	return false;
2007}
2008
2009DISPATCH_NOINLINE
2010static bool
2011_dispatch_mgr_select(bool poll)
2012{
2013	static const struct timeval timeout_immediately = { 0, 0 };
2014	fd_set tmp_rfds, tmp_wfds;
2015	struct kevent64_s kev;
2016	int err, i, r;
2017	bool kevent_avail = false;
2018
2019	FD_COPY(&_dispatch_rfds, &tmp_rfds);
2020	FD_COPY(&_dispatch_wfds, &tmp_wfds);
2021
2022	r = select(FD_SETSIZE, &tmp_rfds, &tmp_wfds, NULL,
2023			poll ? (struct timeval*)&timeout_immediately : NULL);
2024	if (slowpath(r == -1)) {
2025		err = errno;
2026		if (err != EBADF) {
2027			if (err != EINTR) {
2028				(void)dispatch_assume_zero(err);
2029			}
2030			return false;
2031		}
2032		for (i = 0; i < FD_SETSIZE; i++) {
2033			if (i == _dispatch_kq) {
2034				continue;
2035			}
2036			if (!FD_ISSET(i, &_dispatch_rfds) && !FD_ISSET(i, &_dispatch_wfds)){
2037				continue;
2038			}
2039			r = dup(i);
2040			if (dispatch_assume(r != -1)) {
2041				close(r);
2042			} else {
2043				if (_dispatch_rfd_ptrs && _dispatch_rfd_ptrs[i]) {
2044					FD_CLR(i, &_dispatch_rfds);
2045					_dispatch_rfd_ptrs[i] = 0;
2046					_dispatch_select_workaround--;
2047				}
2048				if (_dispatch_wfd_ptrs && _dispatch_wfd_ptrs[i]) {
2049					FD_CLR(i, &_dispatch_wfds);
2050					_dispatch_wfd_ptrs[i] = 0;
2051					_dispatch_select_workaround--;
2052				}
2053			}
2054		}
2055		return false;
2056	}
2057	if (r > 0) {
2058		for (i = 0; i < FD_SETSIZE; i++) {
2059			if (FD_ISSET(i, &tmp_rfds)) {
2060				if (i == _dispatch_kq) {
2061					kevent_avail = true;
2062					continue;
2063				}
2064				FD_CLR(i, &_dispatch_rfds); // emulate EV_DISPATCH
2065				EV_SET64(&kev, i, EVFILT_READ,
2066						EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2067						_dispatch_rfd_ptrs[i], 0, 0);
2068				_dispatch_kevent_drain(&kev);
2069			}
2070			if (FD_ISSET(i, &tmp_wfds)) {
2071				FD_CLR(i, &_dispatch_wfds); // emulate EV_DISPATCH
2072				EV_SET64(&kev, i, EVFILT_WRITE,
2073						EV_ADD|EV_ENABLE|EV_DISPATCH, 0, 1,
2074						_dispatch_wfd_ptrs[i], 0, 0);
2075				_dispatch_kevent_drain(&kev);
2076			}
2077		}
2078	}
2079	return kevent_avail;
2080}
2081
2082#pragma mark -
2083#pragma mark dispatch_kqueue
2084
2085static void
2086_dispatch_kq_init(void *context DISPATCH_UNUSED)
2087{
2088	static const struct kevent64_s kev = {
2089		.ident = 1,
2090		.filter = EVFILT_USER,
2091		.flags = EV_ADD|EV_CLEAR,
2092	};
2093
2094	_dispatch_safe_fork = false;
2095#if DISPATCH_USE_GUARDED_FD
2096	guardid_t guard = (uintptr_t)&kev;
2097	_dispatch_kq = guarded_kqueue_np(&guard, GUARD_CLOSE | GUARD_DUP);
2098#else
2099	_dispatch_kq = kqueue();
2100#endif
2101	if (_dispatch_kq == -1) {
2102		DISPATCH_CLIENT_CRASH("kqueue() create failed: "
2103				"probably out of file descriptors");
2104	} else if (dispatch_assume(_dispatch_kq < FD_SETSIZE)) {
2105		// in case we fall back to select()
2106		FD_SET(_dispatch_kq, &_dispatch_rfds);
2107	}
2108
2109	(void)dispatch_assume_zero(kevent64(_dispatch_kq, &kev, 1, NULL, 0, 0,
2110			NULL));
2111	_dispatch_queue_push(_dispatch_mgr_q.do_targetq, &_dispatch_mgr_q);
2112}
2113
2114static int
2115_dispatch_get_kq(void)
2116{
2117	static dispatch_once_t pred;
2118
2119	dispatch_once_f(&pred, NULL, _dispatch_kq_init);
2120
2121	return _dispatch_kq;
2122}
2123
2124DISPATCH_NOINLINE
2125static long
2126_dispatch_kq_update(const struct kevent64_s *kev)
2127{
2128	int r;
2129	struct kevent64_s kev_copy;
2130
2131	if (slowpath(_dispatch_select_workaround) && (kev->flags & EV_DELETE)) {
2132		if (_dispatch_select_unregister(kev)) {
2133			return 0;
2134		}
2135	}
2136	kev_copy = *kev;
2137	// This ensures we don't get a pending kevent back while registering
2138	// a new kevent
2139	kev_copy.flags |= EV_RECEIPT;
2140retry:
2141	r = dispatch_assume(kevent64(_dispatch_get_kq(), &kev_copy, 1,
2142			&kev_copy, 1, 0, NULL));
2143	if (slowpath(r == -1)) {
2144		int err = errno;
2145		switch (err) {
2146		case EINTR:
2147			goto retry;
2148		case EBADF:
2149			DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2150			break;
2151		default:
2152			(void)dispatch_assume_zero(err);
2153			break;
2154		}
2155		return err;
2156	}
2157	switch (kev_copy.data) {
2158	case 0:
2159		return 0;
2160	case EBADF:
2161	case EPERM:
2162	case EINVAL:
2163	case ENOENT:
2164		if ((kev->flags & (EV_ADD|EV_ENABLE)) && !(kev->flags & EV_DELETE)) {
2165			if (_dispatch_select_register(&kev_copy)) {
2166				return 0;
2167			}
2168		}
2169		// fall through
2170	default:
2171		kev_copy.flags |= kev->flags;
2172		_dispatch_kevent_drain(&kev_copy);
2173		break;
2174	}
2175	return (long)kev_copy.data;
2176}
2177
2178#pragma mark -
2179#pragma mark dispatch_mgr
2180
2181static struct kevent64_s *_dispatch_kevent_enable;
2182
2183static void inline
2184_dispatch_mgr_kevent_reenable(struct kevent64_s *ke)
2185{
2186	dispatch_assert(!_dispatch_kevent_enable || _dispatch_kevent_enable == ke);
2187	_dispatch_kevent_enable = ke;
2188}
2189
2190unsigned long
2191_dispatch_mgr_wakeup(dispatch_queue_t dq DISPATCH_UNUSED)
2192{
2193	if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
2194		return false;
2195	}
2196
2197	static const struct kevent64_s kev = {
2198		.ident = 1,
2199		.filter = EVFILT_USER,
2200		.fflags = NOTE_TRIGGER,
2201	};
2202
2203#if DISPATCH_DEBUG && DISPATCH_MGR_QUEUE_DEBUG
2204	_dispatch_debug("waking up the dispatch manager queue: %p", dq);
2205#endif
2206
2207	_dispatch_kq_update(&kev);
2208
2209	return false;
2210}
2211
2212DISPATCH_NOINLINE
2213static void
2214_dispatch_mgr_init(void)
2215{
2216	(void)dispatch_atomic_inc2o(&_dispatch_mgr_q, dq_running, relaxed);
2217	_dispatch_thread_setspecific(dispatch_queue_key, &_dispatch_mgr_q);
2218	_dispatch_queue_set_bound_thread(&_dispatch_mgr_q);
2219	_dispatch_mgr_priority_init();
2220	_dispatch_kevent_init();
2221	_dispatch_timers_init();
2222	_dispatch_mach_recv_msg_buf_init();
2223	_dispatch_memorystatus_init();
2224}
2225
2226DISPATCH_NOINLINE DISPATCH_NORETURN
2227static void
2228_dispatch_mgr_invoke(void)
2229{
2230	static const struct timespec timeout_immediately = { 0, 0 };
2231	struct kevent64_s kev;
2232	bool poll;
2233	int r;
2234
2235	for (;;) {
2236		_dispatch_mgr_queue_drain();
2237		poll = _dispatch_mgr_timers();
2238		if (slowpath(_dispatch_select_workaround)) {
2239			poll = _dispatch_mgr_select(poll);
2240			if (!poll) continue;
2241		}
2242		r = kevent64(_dispatch_kq, _dispatch_kevent_enable,
2243				_dispatch_kevent_enable ? 1 : 0, &kev, 1, 0,
2244				poll ? &timeout_immediately : NULL);
2245		_dispatch_kevent_enable = NULL;
2246		if (slowpath(r == -1)) {
2247			int err = errno;
2248			switch (err) {
2249			case EINTR:
2250				break;
2251			case EBADF:
2252				DISPATCH_CLIENT_CRASH("Do not close random Unix descriptors");
2253				break;
2254			default:
2255				(void)dispatch_assume_zero(err);
2256				break;
2257			}
2258		} else if (r) {
2259			_dispatch_kevent_drain(&kev);
2260		}
2261	}
2262}
2263
2264DISPATCH_NORETURN
2265void
2266_dispatch_mgr_thread(dispatch_queue_t dq DISPATCH_UNUSED)
2267{
2268	_dispatch_mgr_init();
2269	// never returns, so burn bridges behind us & clear stack 2k ahead
2270	_dispatch_clear_stack(2048);
2271	_dispatch_mgr_invoke();
2272}
2273
2274#pragma mark -
2275#pragma mark dispatch_memorystatus
2276
2277#if DISPATCH_USE_MEMORYSTATUS_SOURCE
2278#define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_MEMORYSTATUS
2279#define DISPATCH_MEMORYSTATUS_SOURCE_MASK ( \
2280		DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL | \
2281		DISPATCH_MEMORYSTATUS_PRESSURE_WARN)
2282#elif DISPATCH_USE_VM_PRESSURE_SOURCE
2283#define DISPATCH_MEMORYSTATUS_SOURCE_TYPE DISPATCH_SOURCE_TYPE_VM
2284#define DISPATCH_MEMORYSTATUS_SOURCE_MASK DISPATCH_VM_PRESSURE
2285#endif
2286
2287#if DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2288static dispatch_source_t _dispatch_memorystatus_source;
2289
2290static void
2291_dispatch_memorystatus_handler(void *context DISPATCH_UNUSED)
2292{
2293#if DISPATCH_USE_MEMORYSTATUS_SOURCE
2294	unsigned long memorystatus;
2295	memorystatus = dispatch_source_get_data(_dispatch_memorystatus_source);
2296	if (memorystatus & DISPATCH_MEMORYSTATUS_PRESSURE_NORMAL) {
2297		_dispatch_continuation_cache_limit = DISPATCH_CONTINUATION_CACHE_LIMIT;
2298		return;
2299	}
2300	_dispatch_continuation_cache_limit =
2301			DISPATCH_CONTINUATION_CACHE_LIMIT_MEMORYSTATUS_PRESSURE_WARN;
2302#endif
2303	malloc_zone_pressure_relief(0,0);
2304}
2305
2306static void
2307_dispatch_memorystatus_init(void)
2308{
2309	_dispatch_memorystatus_source = dispatch_source_create(
2310			DISPATCH_MEMORYSTATUS_SOURCE_TYPE, 0,
2311			DISPATCH_MEMORYSTATUS_SOURCE_MASK,
2312			_dispatch_get_root_queue(0, true));
2313	dispatch_source_set_event_handler_f(_dispatch_memorystatus_source,
2314			_dispatch_memorystatus_handler);
2315	dispatch_resume(_dispatch_memorystatus_source);
2316}
2317#else
2318static inline void _dispatch_memorystatus_init(void) {}
2319#endif // DISPATCH_USE_MEMORYSTATUS_SOURCE || DISPATCH_USE_VM_PRESSURE_SOURCE
2320
2321#pragma mark -
2322#pragma mark dispatch_mach
2323
2324#if HAVE_MACH
2325
2326#if DISPATCH_DEBUG && DISPATCH_MACHPORT_DEBUG
2327#define _dispatch_debug_machport(name) \
2328		dispatch_debug_machport((name), __func__)
2329#else
2330#define _dispatch_debug_machport(name) ((void)(name))
2331#endif
2332
2333// Flags for all notifications that are registered/unregistered when a
2334// send-possible notification is requested/delivered
2335#define _DISPATCH_MACH_SP_FLAGS (DISPATCH_MACH_SEND_POSSIBLE| \
2336		DISPATCH_MACH_SEND_DEAD|DISPATCH_MACH_SEND_DELETED)
2337#define _DISPATCH_MACH_RECV_FLAGS (DISPATCH_MACH_RECV_MESSAGE| \
2338		DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2339		DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2340#define _DISPATCH_MACH_RECV_DIRECT_FLAGS ( \
2341		DISPATCH_MACH_RECV_MESSAGE_DIRECT| \
2342		DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE)
2343
2344#define _DISPATCH_IS_POWER_OF_TWO(v) (!(v & (v - 1)) && v)
2345#define _DISPATCH_HASH(x, y) (_DISPATCH_IS_POWER_OF_TWO(y) ? \
2346		(MACH_PORT_INDEX(x) & ((y) - 1)) : (MACH_PORT_INDEX(x) % (y)))
2347
2348#define _DISPATCH_MACHPORT_HASH_SIZE 32
2349#define _DISPATCH_MACHPORT_HASH(x) \
2350		_DISPATCH_HASH((x), _DISPATCH_MACHPORT_HASH_SIZE)
2351
2352#ifndef MACH_RCV_LARGE_IDENTITY
2353#define MACH_RCV_LARGE_IDENTITY 0x00000008
2354#endif
2355#define DISPATCH_MACH_RCV_TRAILER MACH_RCV_TRAILER_CTX
2356#define DISPATCH_MACH_RCV_OPTIONS ( \
2357		MACH_RCV_MSG | MACH_RCV_LARGE | MACH_RCV_LARGE_IDENTITY | \
2358		MACH_RCV_TRAILER_ELEMENTS(DISPATCH_MACH_RCV_TRAILER) | \
2359		MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0))
2360
2361#define DISPATCH_MACH_KEVENT_ARMED(dk) ((dk)->dk_kevent.ext[0])
2362
2363static void _dispatch_kevent_machport_drain(struct kevent64_s *ke);
2364static void _dispatch_kevent_mach_msg_drain(struct kevent64_s *ke);
2365static void _dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr);
2366static void _dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr);
2367static void _dispatch_source_merge_mach_msg(dispatch_source_t ds,
2368		dispatch_source_refs_t dr, dispatch_kevent_t dk,
2369		mach_msg_header_t *hdr, mach_msg_size_t siz);
2370static kern_return_t _dispatch_mach_notify_update(dispatch_kevent_t dk,
2371		uint32_t new_flags, uint32_t del_flags, uint32_t mask,
2372		mach_msg_id_t notify_msgid, mach_port_mscount_t notify_sync);
2373static void _dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr);
2374static void _dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
2375		dispatch_mach_reply_refs_t dmr, bool disconnected);
2376static void _dispatch_mach_msg_recv(dispatch_mach_t dm, mach_msg_header_t *hdr,
2377		mach_msg_size_t siz);
2378static void _dispatch_mach_merge_kevent(dispatch_mach_t dm,
2379		const struct kevent64_s *ke);
2380static void _dispatch_mach_kevent_unregister(dispatch_mach_t dm);
2381
2382static const size_t _dispatch_mach_recv_msg_size =
2383		DISPATCH_MACH_RECEIVE_MAX_INLINE_MESSAGE_SIZE;
2384static const size_t dispatch_mach_trailer_size =
2385		sizeof(dispatch_mach_trailer_t);
2386static const size_t _dispatch_mach_recv_msg_buf_size = mach_vm_round_page(
2387		_dispatch_mach_recv_msg_size + dispatch_mach_trailer_size);
2388static mach_port_t _dispatch_mach_portset, _dispatch_mach_recv_portset;
2389static mach_port_t _dispatch_mach_notify_port;
2390static struct kevent64_s _dispatch_mach_recv_kevent = {
2391	.filter = EVFILT_MACHPORT,
2392	.flags = EV_ADD|EV_ENABLE|EV_DISPATCH,
2393	.fflags = DISPATCH_MACH_RCV_OPTIONS,
2394};
2395static dispatch_source_t _dispatch_mach_notify_source;
2396static const
2397struct dispatch_source_type_s _dispatch_source_type_mach_recv_direct = {
2398	.ke = {
2399		.filter = EVFILT_MACHPORT,
2400		.flags = EV_CLEAR,
2401		.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT,
2402	},
2403};
2404
2405static void
2406_dispatch_mach_recv_msg_buf_init(void)
2407{
2408	mach_vm_size_t vm_size = _dispatch_mach_recv_msg_buf_size;
2409	mach_vm_address_t vm_addr = vm_page_size;
2410	kern_return_t kr;
2411
2412	while (slowpath(kr = mach_vm_allocate(mach_task_self(), &vm_addr, vm_size,
2413			VM_FLAGS_ANYWHERE))) {
2414		if (kr != KERN_NO_SPACE) {
2415			(void)dispatch_assume_zero(kr);
2416			DISPATCH_CLIENT_CRASH("Could not allocate mach msg receive buffer");
2417		}
2418		_dispatch_temporary_resource_shortage();
2419		vm_addr = vm_page_size;
2420	}
2421	_dispatch_mach_recv_kevent.ext[0] = (uintptr_t)vm_addr;
2422	_dispatch_mach_recv_kevent.ext[1] = _dispatch_mach_recv_msg_buf_size;
2423}
2424
2425static inline void*
2426_dispatch_get_mach_recv_msg_buf(void)
2427{
2428	return (void*)_dispatch_mach_recv_kevent.ext[0];
2429}
2430
2431static void
2432_dispatch_mach_recv_portset_init(void *context DISPATCH_UNUSED)
2433{
2434	kern_return_t kr;
2435
2436	kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
2437			&_dispatch_mach_recv_portset);
2438	DISPATCH_VERIFY_MIG(kr);
2439	if (dispatch_assume_zero(kr)) {
2440		DISPATCH_CLIENT_CRASH(
2441				"mach_port_allocate() failed: cannot create port set");
2442	}
2443	dispatch_assert(_dispatch_get_mach_recv_msg_buf());
2444	dispatch_assert(dispatch_mach_trailer_size ==
2445			REQUESTED_TRAILER_SIZE_NATIVE(MACH_RCV_TRAILER_ELEMENTS(
2446			DISPATCH_MACH_RCV_TRAILER)));
2447	_dispatch_mach_recv_kevent.ident = _dispatch_mach_recv_portset;
2448	_dispatch_kq_update(&_dispatch_mach_recv_kevent);
2449
2450	kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_RECEIVE,
2451			&_dispatch_mach_notify_port);
2452	DISPATCH_VERIFY_MIG(kr);
2453	if (dispatch_assume_zero(kr)) {
2454		DISPATCH_CLIENT_CRASH(
2455				"mach_port_allocate() failed: cannot create receive right");
2456	}
2457	_dispatch_mach_notify_source = dispatch_source_create(
2458			&_dispatch_source_type_mach_recv_direct,
2459			_dispatch_mach_notify_port, 0, &_dispatch_mgr_q);
2460	_dispatch_mach_notify_source->ds_refs->ds_handler_func =
2461			(void*)_dispatch_mach_notify_source_invoke;
2462	dispatch_assert(_dispatch_mach_notify_source);
2463	dispatch_resume(_dispatch_mach_notify_source);
2464}
2465
2466static mach_port_t
2467_dispatch_get_mach_recv_portset(void)
2468{
2469	static dispatch_once_t pred;
2470	dispatch_once_f(&pred, NULL, _dispatch_mach_recv_portset_init);
2471	return _dispatch_mach_recv_portset;
2472}
2473
2474static void
2475_dispatch_mach_portset_init(void *context DISPATCH_UNUSED)
2476{
2477	struct kevent64_s kev = {
2478		.filter = EVFILT_MACHPORT,
2479		.flags = EV_ADD,
2480	};
2481	kern_return_t kr;
2482
2483	kr = mach_port_allocate(mach_task_self(), MACH_PORT_RIGHT_PORT_SET,
2484			&_dispatch_mach_portset);
2485	DISPATCH_VERIFY_MIG(kr);
2486	if (dispatch_assume_zero(kr)) {
2487		DISPATCH_CLIENT_CRASH(
2488				"mach_port_allocate() failed: cannot create port set");
2489	}
2490	kev.ident = _dispatch_mach_portset;
2491	_dispatch_kq_update(&kev);
2492}
2493
2494static mach_port_t
2495_dispatch_get_mach_portset(void)
2496{
2497	static dispatch_once_t pred;
2498	dispatch_once_f(&pred, NULL, _dispatch_mach_portset_init);
2499	return _dispatch_mach_portset;
2500}
2501
2502static kern_return_t
2503_dispatch_mach_portset_update(dispatch_kevent_t dk, mach_port_t mps)
2504{
2505	mach_port_t mp = (mach_port_t)dk->dk_kevent.ident;
2506	kern_return_t kr;
2507
2508	_dispatch_debug_machport(mp);
2509	kr = mach_port_move_member(mach_task_self(), mp, mps);
2510	if (slowpath(kr)) {
2511		DISPATCH_VERIFY_MIG(kr);
2512		switch (kr) {
2513		case KERN_INVALID_RIGHT:
2514			if (mps) {
2515				_dispatch_bug_mach_client("_dispatch_kevent_machport_enable: "
2516						"mach_port_move_member() failed ", kr);
2517				break;
2518			}
2519			//fall through
2520		case KERN_INVALID_NAME:
2521#if DISPATCH_DEBUG
2522			_dispatch_log("Corruption: Mach receive right 0x%x destroyed "
2523					"prematurely", mp);
2524#endif
2525			break;
2526		default:
2527			(void)dispatch_assume_zero(kr);
2528			break;
2529		}
2530	}
2531	return mps ? kr : 0;
2532}
2533
2534static void
2535_dispatch_kevent_mach_recv_reenable(struct kevent64_s *ke DISPATCH_UNUSED)
2536{
2537#if (TARGET_IPHONE_SIMULATOR && \
2538		IPHONE_SIMULATOR_HOST_MIN_VERSION_REQUIRED < 1090) || \
2539		(!TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED < 1090)
2540	// delete and re-add kevent to workaround <rdar://problem/13924256>
2541	if (ke->ext[1] != _dispatch_mach_recv_kevent.ext[1]) {
2542		struct kevent64_s kev = _dispatch_mach_recv_kevent;
2543		kev.flags = EV_DELETE;
2544		_dispatch_kq_update(&kev);
2545	}
2546#endif
2547	_dispatch_mgr_kevent_reenable(&_dispatch_mach_recv_kevent);
2548}
2549
2550static kern_return_t
2551_dispatch_kevent_machport_resume(dispatch_kevent_t dk, uint32_t new_flags,
2552		uint32_t del_flags)
2553{
2554	kern_return_t kr = 0;
2555	dispatch_assert_zero(new_flags & del_flags);
2556	if ((new_flags & _DISPATCH_MACH_RECV_FLAGS) ||
2557			(del_flags & _DISPATCH_MACH_RECV_FLAGS)) {
2558		mach_port_t mps;
2559		if (new_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
2560			mps = _dispatch_get_mach_recv_portset();
2561		} else if ((new_flags & DISPATCH_MACH_RECV_MESSAGE) ||
2562				((del_flags & _DISPATCH_MACH_RECV_DIRECT_FLAGS) &&
2563				(dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE))) {
2564			mps = _dispatch_get_mach_portset();
2565		} else {
2566			mps = MACH_PORT_NULL;
2567		}
2568		kr = _dispatch_mach_portset_update(dk, mps);
2569	}
2570	return kr;
2571}
2572
2573static kern_return_t
2574_dispatch_kevent_mach_notify_resume(dispatch_kevent_t dk, uint32_t new_flags,
2575		uint32_t del_flags)
2576{
2577	kern_return_t kr = 0;
2578	dispatch_assert_zero(new_flags & del_flags);
2579	if ((new_flags & _DISPATCH_MACH_SP_FLAGS) ||
2580			(del_flags & _DISPATCH_MACH_SP_FLAGS)) {
2581		// Requesting a (delayed) non-sync send-possible notification
2582		// registers for both immediate dead-name notification and delayed-arm
2583		// send-possible notification for the port.
2584		// The send-possible notification is armed when a mach_msg() with the
2585		// the MACH_SEND_NOTIFY to the port times out.
2586		// If send-possible is unavailable, fall back to immediate dead-name
2587		// registration rdar://problem/2527840&9008724
2588		kr = _dispatch_mach_notify_update(dk, new_flags, del_flags,
2589				_DISPATCH_MACH_SP_FLAGS, MACH_NOTIFY_SEND_POSSIBLE,
2590				MACH_NOTIFY_SEND_POSSIBLE == MACH_NOTIFY_DEAD_NAME ? 1 : 0);
2591	}
2592	return kr;
2593}
2594
2595static inline void
2596_dispatch_kevent_mach_portset(struct kevent64_s *ke)
2597{
2598	if (ke->ident == _dispatch_mach_recv_portset) {
2599		return _dispatch_kevent_mach_msg_drain(ke);
2600	} else if (ke->ident == _dispatch_mach_portset) {
2601		return _dispatch_kevent_machport_drain(ke);
2602	} else {
2603		return _dispatch_kevent_error(ke);
2604	}
2605}
2606
2607DISPATCH_NOINLINE
2608static void
2609_dispatch_kevent_machport_drain(struct kevent64_s *ke)
2610{
2611	mach_port_t name = (mach_port_name_t)ke->data;
2612	dispatch_kevent_t dk;
2613	struct kevent64_s kev;
2614
2615	_dispatch_debug_machport(name);
2616	dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
2617	if (!dispatch_assume(dk)) {
2618		return;
2619	}
2620	_dispatch_mach_portset_update(dk, MACH_PORT_NULL); // emulate EV_DISPATCH
2621
2622	EV_SET64(&kev, name, EVFILT_MACHPORT, EV_ADD|EV_ENABLE|EV_DISPATCH,
2623			DISPATCH_MACH_RECV_MESSAGE, 0, (uintptr_t)dk, 0, 0);
2624	_dispatch_kevent_debug(&kev, __func__);
2625	_dispatch_kevent_merge(&kev);
2626}
2627
2628DISPATCH_NOINLINE
2629static void
2630_dispatch_kevent_mach_msg_drain(struct kevent64_s *ke)
2631{
2632	mach_msg_header_t *hdr = (mach_msg_header_t*)ke->ext[0];
2633	mach_msg_size_t siz, msgsiz;
2634	mach_msg_return_t kr = (mach_msg_return_t)ke->fflags;
2635
2636	_dispatch_kevent_mach_recv_reenable(ke);
2637	if (!dispatch_assume(hdr)) {
2638		DISPATCH_CRASH("EVFILT_MACHPORT with no message");
2639	}
2640	if (fastpath(!kr)) {
2641		return _dispatch_kevent_mach_msg_recv(hdr);
2642	} else if (kr != MACH_RCV_TOO_LARGE) {
2643		goto out;
2644	}
2645	if (!dispatch_assume(ke->ext[1] <= UINT_MAX -
2646			dispatch_mach_trailer_size)) {
2647		DISPATCH_CRASH("EVFILT_MACHPORT with overlarge message");
2648	}
2649	siz = (mach_msg_size_t)ke->ext[1] + dispatch_mach_trailer_size;
2650	hdr = malloc(siz);
2651	if (ke->data) {
2652		if (!dispatch_assume(hdr)) {
2653			// Kernel will discard message too large to fit
2654			hdr = _dispatch_get_mach_recv_msg_buf();
2655			siz = _dispatch_mach_recv_msg_buf_size;
2656		}
2657		mach_port_t name = (mach_port_name_t)ke->data;
2658		const mach_msg_option_t options = ((DISPATCH_MACH_RCV_OPTIONS |
2659				MACH_RCV_TIMEOUT) & ~MACH_RCV_LARGE);
2660		kr = mach_msg(hdr, options, 0, siz, name, MACH_MSG_TIMEOUT_NONE,
2661				MACH_PORT_NULL);
2662		if (fastpath(!kr)) {
2663			return _dispatch_kevent_mach_msg_recv(hdr);
2664		} else if (kr == MACH_RCV_TOO_LARGE) {
2665			_dispatch_log("BUG in libdispatch client: "
2666					"_dispatch_kevent_mach_msg_drain: dropped message too "
2667					"large to fit in memory: id = 0x%x, size = %lld",
2668					hdr->msgh_id, ke->ext[1]);
2669			kr = MACH_MSG_SUCCESS;
2670		}
2671	} else {
2672		// We don't know which port in the portset contains the large message,
2673		// so need to receive all messages pending on the portset to ensure the
2674		// large message is drained. <rdar://problem/13950432>
2675		bool received = false;
2676		for (;;) {
2677			if (!dispatch_assume(hdr)) {
2678				DISPATCH_CLIENT_CRASH("Message too large to fit in memory");
2679			}
2680			const mach_msg_option_t options = (DISPATCH_MACH_RCV_OPTIONS |
2681					MACH_RCV_TIMEOUT);
2682			kr = mach_msg(hdr, options, 0, siz, _dispatch_mach_recv_portset,
2683					MACH_MSG_TIMEOUT_NONE, MACH_PORT_NULL);
2684			if ((!kr || kr == MACH_RCV_TOO_LARGE) && !dispatch_assume(
2685					hdr->msgh_size <= UINT_MAX - dispatch_mach_trailer_size)) {
2686				DISPATCH_CRASH("Overlarge message");
2687			}
2688			if (fastpath(!kr)) {
2689				msgsiz = hdr->msgh_size + dispatch_mach_trailer_size;
2690				if (msgsiz < siz) {
2691					void *shrink = realloc(hdr, msgsiz);
2692					if (shrink) hdr = shrink;
2693				}
2694				_dispatch_kevent_mach_msg_recv(hdr);
2695				hdr = NULL;
2696				received = true;
2697			} else if (kr == MACH_RCV_TOO_LARGE) {
2698				siz = hdr->msgh_size + dispatch_mach_trailer_size;
2699			} else {
2700				if (kr == MACH_RCV_TIMED_OUT && received) {
2701					kr = MACH_MSG_SUCCESS;
2702				}
2703				break;
2704			}
2705			hdr = reallocf(hdr, siz);
2706		}
2707	}
2708	if (hdr != _dispatch_get_mach_recv_msg_buf()) {
2709		free(hdr);
2710	}
2711out:
2712	if (slowpath(kr)) {
2713		_dispatch_bug_mach_client("_dispatch_kevent_mach_msg_drain: "
2714				"message reception failed", kr);
2715	}
2716}
2717
2718static void
2719_dispatch_kevent_mach_msg_recv(mach_msg_header_t *hdr)
2720{
2721	dispatch_source_refs_t dri;
2722	dispatch_kevent_t dk;
2723	mach_port_t name = hdr->msgh_local_port;
2724	mach_msg_size_t siz = hdr->msgh_size + dispatch_mach_trailer_size;
2725
2726	if (!dispatch_assume(hdr->msgh_size <= UINT_MAX -
2727			dispatch_mach_trailer_size)) {
2728		_dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2729				"received overlarge message");
2730		return _dispatch_kevent_mach_msg_destroy(hdr);
2731	}
2732	if (!dispatch_assume(name)) {
2733		_dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2734				"received message with MACH_PORT_NULL port");
2735		return _dispatch_kevent_mach_msg_destroy(hdr);
2736	}
2737	_dispatch_debug_machport(name);
2738	dk = _dispatch_kevent_find(name, EVFILT_MACHPORT);
2739	if (!dispatch_assume(dk)) {
2740		_dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2741				"received message with unknown kevent");
2742		return _dispatch_kevent_mach_msg_destroy(hdr);
2743	}
2744	_dispatch_kevent_debug(&dk->dk_kevent, __func__);
2745	TAILQ_FOREACH(dri, &dk->dk_sources, dr_list) {
2746		dispatch_source_t dsi = _dispatch_source_from_refs(dri);
2747		if (dsi->ds_pending_data_mask & _DISPATCH_MACH_RECV_DIRECT_FLAGS) {
2748			return _dispatch_source_merge_mach_msg(dsi, dri, dk, hdr, siz);
2749		}
2750	}
2751	_dispatch_bug_client("_dispatch_kevent_mach_msg_recv: "
2752			"received message with no listeners");
2753	return _dispatch_kevent_mach_msg_destroy(hdr);
2754}
2755
2756static void
2757_dispatch_kevent_mach_msg_destroy(mach_msg_header_t *hdr)
2758{
2759	if (hdr) {
2760		mach_msg_destroy(hdr);
2761		if (hdr != _dispatch_get_mach_recv_msg_buf()) {
2762			free(hdr);
2763		}
2764	}
2765}
2766
2767static void
2768_dispatch_source_merge_mach_msg(dispatch_source_t ds, dispatch_source_refs_t dr,
2769		dispatch_kevent_t dk, mach_msg_header_t *hdr, mach_msg_size_t siz)
2770{
2771	if (ds == _dispatch_mach_notify_source) {
2772		_dispatch_mach_notify_source_invoke(hdr);
2773		return _dispatch_kevent_mach_msg_destroy(hdr);
2774	}
2775	if (dk->dk_kevent.fflags & DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE) {
2776		_dispatch_mach_reply_kevent_unregister((dispatch_mach_t)ds,
2777				(dispatch_mach_reply_refs_t)dr, false);
2778	}
2779	return _dispatch_mach_msg_recv((dispatch_mach_t)ds, hdr, siz);
2780}
2781
2782DISPATCH_ALWAYS_INLINE
2783static inline void
2784_dispatch_mach_notify_merge(mach_port_t name, uint32_t flag, bool final)
2785{
2786	dispatch_source_refs_t dri, dr_next;
2787	dispatch_kevent_t dk;
2788	struct kevent64_s kev;
2789	bool unreg;
2790
2791	dk = _dispatch_kevent_find(name, DISPATCH_EVFILT_MACH_NOTIFICATION);
2792	if (!dk) {
2793		return;
2794	}
2795
2796	// Update notification registration state.
2797	dk->dk_kevent.data &= ~_DISPATCH_MACH_SP_FLAGS;
2798	EV_SET64(&kev, name, DISPATCH_EVFILT_MACH_NOTIFICATION, EV_ADD|EV_ENABLE,
2799			flag, 0, (uintptr_t)dk, 0, 0);
2800	if (final) {
2801		// This can never happen again
2802		unreg = true;
2803	} else {
2804		// Re-register for notification before delivery
2805		unreg = _dispatch_kevent_resume(dk, flag, 0);
2806	}
2807	DISPATCH_MACH_KEVENT_ARMED(dk) = 0;
2808	TAILQ_FOREACH_SAFE(dri, &dk->dk_sources, dr_list, dr_next) {
2809		dispatch_source_t dsi = _dispatch_source_from_refs(dri);
2810		if (dx_type(dsi) == DISPATCH_MACH_CHANNEL_TYPE) {
2811			dispatch_mach_t dm = (dispatch_mach_t)dsi;
2812			_dispatch_mach_merge_kevent(dm, &kev);
2813			if (unreg && dm->dm_dkev) {
2814				_dispatch_mach_kevent_unregister(dm);
2815			}
2816		} else {
2817			_dispatch_source_merge_kevent(dsi, &kev);
2818			if (unreg) {
2819				_dispatch_source_kevent_unregister(dsi);
2820			}
2821		}
2822		if (!dr_next || DISPATCH_MACH_KEVENT_ARMED(dk)) {
2823			// current merge is last in list (dk might have been freed)
2824			// or it re-armed the notification
2825			return;
2826		}
2827	}
2828}
2829
2830static kern_return_t
2831_dispatch_mach_notify_update(dispatch_kevent_t dk, uint32_t new_flags,
2832		uint32_t del_flags, uint32_t mask, mach_msg_id_t notify_msgid,
2833		mach_port_mscount_t notify_sync)
2834{
2835	mach_port_t previous, port = (mach_port_t)dk->dk_kevent.ident;
2836	typeof(dk->dk_kevent.data) prev = dk->dk_kevent.data;
2837	kern_return_t kr, krr = 0;
2838
2839	// Update notification registration state.
2840	dk->dk_kevent.data |= (new_flags | dk->dk_kevent.fflags) & mask;
2841	dk->dk_kevent.data &= ~(del_flags & mask);
2842
2843	_dispatch_debug_machport(port);
2844	if ((dk->dk_kevent.data & mask) && !(prev & mask)) {
2845		// initialize _dispatch_mach_notify_port:
2846		(void)_dispatch_get_mach_recv_portset();
2847		_dispatch_debug("machport[0x%08x]: registering for send-possible "
2848				"notification", port);
2849		previous = MACH_PORT_NULL;
2850		krr = mach_port_request_notification(mach_task_self(), port,
2851				notify_msgid, notify_sync, _dispatch_mach_notify_port,
2852				MACH_MSG_TYPE_MAKE_SEND_ONCE, &previous);
2853		DISPATCH_VERIFY_MIG(krr);
2854
2855		switch(krr) {
2856		case KERN_INVALID_NAME:
2857		case KERN_INVALID_RIGHT:
2858			// Supress errors & clear registration state
2859			dk->dk_kevent.data &= ~mask;
2860			break;
2861		default:
2862			// Else, we dont expect any errors from mach. Log any errors
2863			if (dispatch_assume_zero(krr)) {
2864				// log the error & clear registration state
2865				dk->dk_kevent.data &= ~mask;
2866			} else if (dispatch_assume_zero(previous)) {
2867				// Another subsystem has beat libdispatch to requesting the
2868				// specified Mach notification on this port. We should
2869				// technically cache the previous port and message it when the
2870				// kernel messages our port. Or we can just say screw those
2871				// subsystems and deallocate the previous port.
2872				// They should adopt libdispatch :-P
2873				kr = mach_port_deallocate(mach_task_self(), previous);
2874				DISPATCH_VERIFY_MIG(kr);
2875				(void)dispatch_assume_zero(kr);
2876				previous = MACH_PORT_NULL;
2877			}
2878		}
2879	} else if (!(dk->dk_kevent.data & mask) && (prev & mask)) {
2880		_dispatch_debug("machport[0x%08x]: unregistering for send-possible "
2881				"notification", port);
2882		previous = MACH_PORT_NULL;
2883		kr = mach_port_request_notification(mach_task_self(), port,
2884				notify_msgid, notify_sync, MACH_PORT_NULL,
2885				MACH_MSG_TYPE_MOVE_SEND_ONCE, &previous);
2886		DISPATCH_VERIFY_MIG(kr);
2887
2888		switch (kr) {
2889		case KERN_INVALID_NAME:
2890		case KERN_INVALID_RIGHT:
2891		case KERN_INVALID_ARGUMENT:
2892			break;
2893		default:
2894			if (dispatch_assume_zero(kr)) {
2895				// log the error
2896			}
2897		}
2898	} else {
2899		return 0;
2900	}
2901	if (slowpath(previous)) {
2902		// the kernel has not consumed the send-once right yet
2903		(void)dispatch_assume_zero(
2904				_dispatch_send_consume_send_once_right(previous));
2905	}
2906	return krr;
2907}
2908
2909static void
2910_dispatch_mach_host_notify_update(void *context DISPATCH_UNUSED)
2911{
2912	(void)_dispatch_get_mach_recv_portset();
2913	_dispatch_debug("registering for calendar-change notification");
2914	kern_return_t kr = host_request_notification(mach_host_self(),
2915			HOST_NOTIFY_CALENDAR_CHANGE, _dispatch_mach_notify_port);
2916	DISPATCH_VERIFY_MIG(kr);
2917	(void)dispatch_assume_zero(kr);
2918}
2919
2920static void
2921_dispatch_mach_host_calendar_change_register(void)
2922{
2923	static dispatch_once_t pred;
2924	dispatch_once_f(&pred, NULL, _dispatch_mach_host_notify_update);
2925}
2926
2927static void
2928_dispatch_mach_notify_source_invoke(mach_msg_header_t *hdr)
2929{
2930	mig_reply_error_t reply;
2931	dispatch_assert(sizeof(mig_reply_error_t) == sizeof(union
2932		__ReplyUnion___dispatch_libdispatch_internal_protocol_subsystem));
2933	dispatch_assert(sizeof(mig_reply_error_t) < _dispatch_mach_recv_msg_size);
2934	boolean_t success = libdispatch_internal_protocol_server(hdr, &reply.Head);
2935	if (!success && reply.RetCode == MIG_BAD_ID && hdr->msgh_id == 950) {
2936		// host_notify_reply.defs: host_calendar_changed
2937		_dispatch_debug("calendar-change notification");
2938		_dispatch_timers_calendar_change();
2939		_dispatch_mach_host_notify_update(NULL);
2940		success = TRUE;
2941		reply.RetCode = KERN_SUCCESS;
2942	}
2943	if (dispatch_assume(success) && reply.RetCode != MIG_NO_REPLY) {
2944		(void)dispatch_assume_zero(reply.RetCode);
2945	}
2946}
2947
2948kern_return_t
2949_dispatch_mach_notify_port_deleted(mach_port_t notify DISPATCH_UNUSED,
2950		mach_port_name_t name)
2951{
2952#if DISPATCH_DEBUG
2953	_dispatch_log("Corruption: Mach send/send-once/dead-name right 0x%x "
2954			"deleted prematurely", name);
2955#endif
2956
2957	_dispatch_debug_machport(name);
2958	_dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DELETED, true);
2959
2960	return KERN_SUCCESS;
2961}
2962
2963kern_return_t
2964_dispatch_mach_notify_dead_name(mach_port_t notify DISPATCH_UNUSED,
2965		mach_port_name_t name)
2966{
2967	kern_return_t kr;
2968
2969	_dispatch_debug("machport[0x%08x]: dead-name notification", name);
2970	_dispatch_debug_machport(name);
2971	_dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_DEAD, true);
2972
2973	// the act of receiving a dead name notification allocates a dead-name
2974	// right that must be deallocated
2975	kr = mach_port_deallocate(mach_task_self(), name);
2976	DISPATCH_VERIFY_MIG(kr);
2977	//(void)dispatch_assume_zero(kr);
2978
2979	return KERN_SUCCESS;
2980}
2981
2982kern_return_t
2983_dispatch_mach_notify_send_possible(mach_port_t notify DISPATCH_UNUSED,
2984		mach_port_name_t name)
2985{
2986	_dispatch_debug("machport[0x%08x]: send-possible notification", name);
2987	_dispatch_debug_machport(name);
2988	_dispatch_mach_notify_merge(name, DISPATCH_MACH_SEND_POSSIBLE, false);
2989
2990	return KERN_SUCCESS;
2991}
2992
2993#pragma mark -
2994#pragma mark dispatch_mach_t
2995
2996#define DISPATCH_MACH_NEVER_CONNECTED (UINT32_MAX/2)
2997#define DISPATCH_MACH_PSEUDO_RECEIVED 0x1
2998#define DISPATCH_MACH_REGISTER_FOR_REPLY 0x2
2999#define DISPATCH_MACH_OPTIONS_MASK 0xffff
3000
3001static mach_port_t _dispatch_mach_msg_get_remote_port(dispatch_object_t dou);
3002static void _dispatch_mach_msg_disconnected(dispatch_mach_t dm,
3003		mach_port_t local_port, mach_port_t remote_port);
3004static bool _dispatch_mach_reconnect_invoke(dispatch_mach_t dm,
3005		dispatch_object_t dou);
3006static inline mach_msg_header_t* _dispatch_mach_msg_get_msg(
3007		dispatch_mach_msg_t dmsg);
3008
3009static dispatch_mach_t
3010_dispatch_mach_create(const char *label, dispatch_queue_t q, void *context,
3011		dispatch_mach_handler_function_t handler, bool handler_is_block)
3012{
3013	dispatch_mach_t dm;
3014	dispatch_mach_refs_t dr;
3015
3016	dm = _dispatch_alloc(DISPATCH_VTABLE(mach),
3017			sizeof(struct dispatch_mach_s));
3018	_dispatch_queue_init((dispatch_queue_t)dm);
3019	dm->dq_label = label;
3020
3021	dm->do_ref_cnt++; // the reference _dispatch_mach_cancel_invoke holds
3022	dm->do_ref_cnt++; // since channel is created suspended
3023	dm->do_suspend_cnt = DISPATCH_OBJECT_SUSPEND_INTERVAL;
3024	dm->do_targetq = &_dispatch_mgr_q;
3025
3026	dr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_refs_s));
3027	dr->dr_source_wref = _dispatch_ptr2wref(dm);
3028	dr->dm_handler_func = handler;
3029	dr->dm_handler_ctxt = context;
3030	dm->ds_refs = dr;
3031	dm->ds_handler_is_block = handler_is_block;
3032
3033	dm->dm_refs = _dispatch_calloc(1ul,
3034			sizeof(struct dispatch_mach_send_refs_s));
3035	dm->dm_refs->dr_source_wref = _dispatch_ptr2wref(dm);
3036	dm->dm_refs->dm_disconnect_cnt = DISPATCH_MACH_NEVER_CONNECTED;
3037	TAILQ_INIT(&dm->dm_refs->dm_replies);
3038
3039	// First item on the channel sets the user-specified target queue
3040	dispatch_set_target_queue(dm, q);
3041	_dispatch_object_debug(dm, "%s", __func__);
3042	return dm;
3043}
3044
3045dispatch_mach_t
3046dispatch_mach_create(const char *label, dispatch_queue_t q,
3047		dispatch_mach_handler_t handler)
3048{
3049	dispatch_block_t bb = _dispatch_Block_copy((void*)handler);
3050	return _dispatch_mach_create(label, q, bb,
3051			(dispatch_mach_handler_function_t)_dispatch_Block_invoke(bb), true);
3052}
3053
3054dispatch_mach_t
3055dispatch_mach_create_f(const char *label, dispatch_queue_t q, void *context,
3056		dispatch_mach_handler_function_t handler)
3057{
3058	return _dispatch_mach_create(label, q, context, handler, false);
3059}
3060
3061void
3062_dispatch_mach_dispose(dispatch_mach_t dm)
3063{
3064	_dispatch_object_debug(dm, "%s", __func__);
3065	dispatch_mach_refs_t dr = dm->ds_refs;
3066	if (dm->ds_handler_is_block && dr->dm_handler_ctxt) {
3067		Block_release(dr->dm_handler_ctxt);
3068	}
3069	free(dr);
3070	free(dm->dm_refs);
3071	_dispatch_queue_destroy(dm);
3072}
3073
3074void
3075dispatch_mach_connect(dispatch_mach_t dm, mach_port_t receive,
3076		mach_port_t send, dispatch_mach_msg_t checkin)
3077{
3078	dispatch_mach_send_refs_t dr = dm->dm_refs;
3079	dispatch_kevent_t dk;
3080
3081	if (MACH_PORT_VALID(receive)) {
3082		dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3083		dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
3084		dk->dk_kevent.ident = receive;
3085		dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3086		dk->dk_kevent.udata = (uintptr_t)dk;
3087		TAILQ_INIT(&dk->dk_sources);
3088		dm->ds_dkev = dk;
3089		dm->ds_pending_data_mask = dk->dk_kevent.fflags;
3090		_dispatch_retain(dm); // the reference the manager queue holds
3091	}
3092	dr->dm_send = send;
3093	if (MACH_PORT_VALID(send)) {
3094		if (checkin) {
3095			dispatch_retain(checkin);
3096			dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
3097		}
3098		dr->dm_checkin = checkin;
3099	}
3100	// monitor message reply ports
3101	dm->ds_pending_data_mask |= DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
3102	if (slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_disconnect_cnt,
3103			DISPATCH_MACH_NEVER_CONNECTED, 0, release))) {
3104		DISPATCH_CLIENT_CRASH("Channel already connected");
3105	}
3106	_dispatch_object_debug(dm, "%s", __func__);
3107	return dispatch_resume(dm);
3108}
3109
3110DISPATCH_NOINLINE
3111static void
3112_dispatch_mach_reply_kevent_unregister(dispatch_mach_t dm,
3113		dispatch_mach_reply_refs_t dmr, bool disconnected)
3114{
3115	dispatch_kevent_t dk = dmr->dm_dkev;
3116	mach_port_t local_port = (mach_port_t)dk->dk_kevent.ident;
3117	TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dmr, dr_list);
3118	_dispatch_kevent_unregister(dk, DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE);
3119	TAILQ_REMOVE(&dm->dm_refs->dm_replies, dmr, dm_list);
3120	free(dmr);
3121	if (disconnected) {
3122		_dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL);
3123	}
3124}
3125
3126DISPATCH_NOINLINE
3127static void
3128_dispatch_mach_reply_kevent_register(dispatch_mach_t dm, mach_port_t reply,
3129		void *ctxt)
3130{
3131	dispatch_kevent_t dk;
3132	dispatch_mach_reply_refs_t dmr;
3133
3134	dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3135	dk->dk_kevent = _dispatch_source_type_mach_recv_direct.ke;
3136	dk->dk_kevent.ident = reply;
3137	dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3138	dk->dk_kevent.fflags = DISPATCH_MACH_RECV_MESSAGE_DIRECT_ONCE;
3139	dk->dk_kevent.udata = (uintptr_t)dk;
3140	// make reply context visible to leaks rdar://11777199
3141	dk->dk_kevent.ext[1] = (uintptr_t)ctxt;
3142	TAILQ_INIT(&dk->dk_sources);
3143
3144	dmr = _dispatch_calloc(1ul, sizeof(struct dispatch_mach_reply_refs_s));
3145	dmr->dr_source_wref = _dispatch_ptr2wref(dm);
3146	dmr->dm_dkev = dk;
3147
3148	_dispatch_debug("machport[0x%08x]: registering for reply, ctxt %p", reply,
3149			ctxt);
3150	uint32_t flags;
3151	bool do_resume = _dispatch_kevent_register(&dmr->dm_dkev, &flags);
3152	TAILQ_INSERT_TAIL(&dmr->dm_dkev->dk_sources, (dispatch_source_refs_t)dmr,
3153			dr_list);
3154	TAILQ_INSERT_TAIL(&dm->dm_refs->dm_replies, dmr, dm_list);
3155	if (do_resume && _dispatch_kevent_resume(dmr->dm_dkev, flags, 0)) {
3156		_dispatch_mach_reply_kevent_unregister(dm, dmr, true);
3157	}
3158}
3159
3160DISPATCH_NOINLINE
3161static void
3162_dispatch_mach_kevent_unregister(dispatch_mach_t dm)
3163{
3164	dispatch_kevent_t dk = dm->dm_dkev;
3165	dm->dm_dkev = NULL;
3166	TAILQ_REMOVE(&dk->dk_sources, (dispatch_source_refs_t)dm->dm_refs,
3167			dr_list);
3168	dm->ds_pending_data_mask &= ~(unsigned long)
3169			(DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
3170	_dispatch_kevent_unregister(dk,
3171			DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD);
3172}
3173
3174DISPATCH_NOINLINE
3175static void
3176_dispatch_mach_kevent_register(dispatch_mach_t dm, mach_port_t send)
3177{
3178	dispatch_kevent_t dk;
3179
3180	dk = _dispatch_calloc(1ul, sizeof(struct dispatch_kevent_s));
3181	dk->dk_kevent = _dispatch_source_type_mach_send.ke;
3182	dk->dk_kevent.ident = send;
3183	dk->dk_kevent.flags |= EV_ADD|EV_ENABLE;
3184	dk->dk_kevent.fflags = DISPATCH_MACH_SEND_POSSIBLE|DISPATCH_MACH_SEND_DEAD;
3185	dk->dk_kevent.udata = (uintptr_t)dk;
3186	TAILQ_INIT(&dk->dk_sources);
3187
3188	dm->ds_pending_data_mask |= dk->dk_kevent.fflags;
3189
3190	uint32_t flags;
3191	bool do_resume = _dispatch_kevent_register(&dk, &flags);
3192	TAILQ_INSERT_TAIL(&dk->dk_sources,
3193			(dispatch_source_refs_t)dm->dm_refs, dr_list);
3194	dm->dm_dkev = dk;
3195	if (do_resume && _dispatch_kevent_resume(dm->dm_dkev, flags, 0)) {
3196		_dispatch_mach_kevent_unregister(dm);
3197	}
3198}
3199
3200static inline void
3201_dispatch_mach_push(dispatch_object_t dm, dispatch_object_t dou)
3202{
3203	return _dispatch_queue_push(dm._dq, dou);
3204}
3205
3206static inline void
3207_dispatch_mach_msg_set_options(dispatch_object_t dou, mach_msg_option_t options)
3208{
3209	dou._do->do_suspend_cnt = (unsigned int)options;
3210}
3211
3212static inline mach_msg_option_t
3213_dispatch_mach_msg_get_options(dispatch_object_t dou)
3214{
3215	mach_msg_option_t options = (mach_msg_option_t)dou._do->do_suspend_cnt;
3216	return options;
3217}
3218
3219static inline void
3220_dispatch_mach_msg_set_reason(dispatch_object_t dou, mach_error_t err,
3221		unsigned long reason)
3222{
3223	dispatch_assert_zero(reason & ~(unsigned long)code_emask);
3224	dou._do->do_suspend_cnt =  (unsigned int)((err || !reason) ? err :
3225			 err_local|err_sub(0x3e0)|(mach_error_t)reason);
3226}
3227
3228static inline unsigned long
3229_dispatch_mach_msg_get_reason(dispatch_object_t dou, mach_error_t *err_ptr)
3230{
3231	mach_error_t err = (mach_error_t)dou._do->do_suspend_cnt;
3232	dou._do->do_suspend_cnt = 0;
3233	if ((err & system_emask) == err_local && err_get_sub(err) == 0x3e0) {
3234		*err_ptr = 0;
3235		return err_get_code(err);
3236	}
3237	*err_ptr = err;
3238	return err ? DISPATCH_MACH_MESSAGE_SEND_FAILED : DISPATCH_MACH_MESSAGE_SENT;
3239}
3240
3241static void
3242_dispatch_mach_msg_recv(dispatch_mach_t dm, mach_msg_header_t *hdr,
3243		mach_msg_size_t siz)
3244{
3245	_dispatch_debug_machport(hdr->msgh_remote_port);
3246	_dispatch_debug("machport[0x%08x]: received msg id 0x%x, reply on 0x%08x",
3247			hdr->msgh_local_port, hdr->msgh_id, hdr->msgh_remote_port);
3248	if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3249		return _dispatch_kevent_mach_msg_destroy(hdr);
3250	}
3251	dispatch_mach_msg_t dmsg;
3252	dispatch_mach_msg_destructor_t destructor;
3253	destructor = (hdr == _dispatch_get_mach_recv_msg_buf()) ?
3254			DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT :
3255			DISPATCH_MACH_MSG_DESTRUCTOR_FREE;
3256	dmsg = dispatch_mach_msg_create(hdr, siz, destructor, NULL);
3257	_dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_MESSAGE_RECEIVED);
3258	return _dispatch_mach_push(dm, dmsg);
3259}
3260
3261static inline mach_port_t
3262_dispatch_mach_msg_get_remote_port(dispatch_object_t dou)
3263{
3264	mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg);
3265	mach_port_t remote = hdr->msgh_remote_port;
3266	return remote;
3267}
3268
3269static inline mach_port_t
3270_dispatch_mach_msg_get_reply_port(dispatch_mach_t dm, dispatch_object_t dou)
3271{
3272	mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dou._dmsg);
3273	mach_port_t reply = MACH_PORT_NULL;
3274	mach_msg_option_t msg_opts = _dispatch_mach_msg_get_options(dou);
3275	if (msg_opts & DISPATCH_MACH_PSEUDO_RECEIVED) {
3276		reply = hdr->msgh_reserved;
3277		hdr->msgh_reserved = 0;
3278	} else if (MACH_MSGH_BITS_LOCAL(hdr->msgh_bits) ==
3279			MACH_MSG_TYPE_MAKE_SEND_ONCE &&
3280			MACH_PORT_VALID(hdr->msgh_local_port) && (!dm->ds_dkev ||
3281			dm->ds_dkev->dk_kevent.ident != hdr->msgh_local_port)) {
3282		reply = hdr->msgh_local_port;
3283	}
3284	return reply;
3285}
3286
3287static inline void
3288_dispatch_mach_msg_disconnected(dispatch_mach_t dm, mach_port_t local_port,
3289		mach_port_t remote_port)
3290{
3291	mach_msg_header_t *hdr;
3292	dispatch_mach_msg_t dmsg;
3293	dmsg = dispatch_mach_msg_create(NULL, sizeof(mach_msg_header_t),
3294			DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT, &hdr);
3295	if (local_port) hdr->msgh_local_port = local_port;
3296	if (remote_port) hdr->msgh_remote_port = remote_port;
3297	_dispatch_mach_msg_set_reason(dmsg, 0, DISPATCH_MACH_DISCONNECTED);
3298	return _dispatch_mach_push(dm, dmsg);
3299}
3300
3301DISPATCH_NOINLINE
3302static void
3303_dispatch_mach_msg_not_sent(dispatch_mach_t dm, dispatch_object_t dou)
3304{
3305	mach_port_t reply = _dispatch_mach_msg_get_reply_port(dm, dou);
3306	_dispatch_mach_msg_set_reason(dou, 0, DISPATCH_MACH_MESSAGE_NOT_SENT);
3307	_dispatch_mach_push(dm, dou);
3308	if (reply) {
3309		_dispatch_mach_msg_disconnected(dm, reply, MACH_PORT_NULL);
3310	}
3311}
3312
3313DISPATCH_NOINLINE
3314static dispatch_object_t
3315_dispatch_mach_msg_send(dispatch_mach_t dm, dispatch_object_t dou)
3316{
3317	dispatch_mach_send_refs_t dr = dm->dm_refs;
3318	dispatch_mach_msg_t dmsg = dou._dmsg;
3319	dr->dm_needs_mgr = 0;
3320	if (slowpath(dr->dm_checkin) && dmsg != dr->dm_checkin) {
3321		// send initial checkin message
3322		if (dm->dm_dkev && slowpath(_dispatch_queue_get_current() !=
3323				&_dispatch_mgr_q)) {
3324			// send kevent must be uninstalled on the manager queue
3325			dr->dm_needs_mgr = 1;
3326			goto out;
3327		}
3328		dr->dm_checkin = _dispatch_mach_msg_send(dm, dr->dm_checkin)._dmsg;
3329		if (slowpath(dr->dm_checkin)) {
3330			goto out;
3331		}
3332	}
3333	mach_msg_header_t *msg = _dispatch_mach_msg_get_msg(dmsg);
3334	mach_msg_return_t kr = 0;
3335	mach_port_t reply = _dispatch_mach_msg_get_reply_port(dm, dmsg);
3336	mach_msg_option_t opts = 0, msg_opts = _dispatch_mach_msg_get_options(dmsg);
3337	if (!slowpath(msg_opts & DISPATCH_MACH_REGISTER_FOR_REPLY)) {
3338		opts = MACH_SEND_MSG | (msg_opts & DISPATCH_MACH_OPTIONS_MASK);
3339		if (MACH_MSGH_BITS_REMOTE(msg->msgh_bits) !=
3340				MACH_MSG_TYPE_MOVE_SEND_ONCE) {
3341			if (dmsg != dr->dm_checkin) {
3342				msg->msgh_remote_port = dr->dm_send;
3343			}
3344			if (_dispatch_queue_get_current() == &_dispatch_mgr_q) {
3345				if (slowpath(!dm->dm_dkev)) {
3346					_dispatch_mach_kevent_register(dm, msg->msgh_remote_port);
3347				}
3348				if (fastpath(dm->dm_dkev)) {
3349					if (DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) {
3350						goto out;
3351					}
3352					opts |= MACH_SEND_NOTIFY;
3353				}
3354			}
3355			opts |= MACH_SEND_TIMEOUT;
3356		}
3357		_dispatch_debug_machport(msg->msgh_remote_port);
3358		if (reply) _dispatch_debug_machport(reply);
3359		kr = mach_msg(msg, opts, msg->msgh_size, 0, MACH_PORT_NULL, 0,
3360				MACH_PORT_NULL);
3361	}
3362	_dispatch_debug("machport[0x%08x]: sent msg id 0x%x, ctxt %p, opts 0x%x, "
3363			"msg_opts 0x%x, reply on 0x%08x: %s - 0x%x", msg->msgh_remote_port,
3364			msg->msgh_id, dmsg->do_ctxt, opts, msg_opts, reply,
3365			mach_error_string(kr), kr);
3366	if (kr == MACH_SEND_TIMED_OUT && (opts & MACH_SEND_TIMEOUT)) {
3367		if (opts & MACH_SEND_NOTIFY) {
3368			_dispatch_debug("machport[0x%08x]: send-possible notification "
3369					"armed", (mach_port_t)dm->dm_dkev->dk_kevent.ident);
3370			DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) = 1;
3371		} else {
3372			// send kevent must be installed on the manager queue
3373			dr->dm_needs_mgr = 1;
3374		}
3375		if (reply) {
3376			_dispatch_mach_msg_set_options(dmsg, msg_opts |
3377					DISPATCH_MACH_PSEUDO_RECEIVED);
3378			msg->msgh_reserved = reply; // Remember the original reply port
3379		}
3380		goto out;
3381	}
3382	if (fastpath(!kr) && reply) {
3383		if (_dispatch_queue_get_current() != &_dispatch_mgr_q) {
3384			// reply receive kevent must be installed on the manager queue
3385			dr->dm_needs_mgr = 1;
3386			_dispatch_mach_msg_set_options(dmsg, msg_opts |
3387					DISPATCH_MACH_REGISTER_FOR_REPLY);
3388			if (msg_opts & DISPATCH_MACH_PSEUDO_RECEIVED) {
3389				msg->msgh_reserved = reply; // Remember the original reply port
3390			}
3391			goto out;
3392		}
3393		_dispatch_mach_reply_kevent_register(dm, reply, dmsg->do_ctxt);
3394	}
3395	if (slowpath(dmsg == dr->dm_checkin) && dm->dm_dkev) {
3396		_dispatch_mach_kevent_unregister(dm);
3397	}
3398	_dispatch_mach_msg_set_reason(dmsg, kr, 0);
3399	_dispatch_mach_push(dm, dmsg);
3400	dmsg = NULL;
3401	if (slowpath(kr) && reply) {
3402		// Send failed, so reply was never connected <rdar://problem/14309159>
3403		_dispatch_mach_msg_disconnected(dm, reply, MACH_PORT_NULL);
3404	}
3405out:
3406	return (dispatch_object_t)dmsg;
3407}
3408
3409static void
3410_dispatch_mach_send_push(dispatch_mach_t dm, dispatch_object_t dou)
3411{
3412	dispatch_mach_send_refs_t dr = dm->dm_refs;
3413	struct dispatch_object_s *prev, *dc = dou._do;
3414	dc->do_next = NULL;
3415
3416	prev = dispatch_atomic_xchg2o(dr, dm_tail, dc, release);
3417	if (fastpath(prev)) {
3418		prev->do_next = dc;
3419		return;
3420	}
3421	dr->dm_head = dc;
3422	_dispatch_wakeup(dm);
3423}
3424
3425DISPATCH_NOINLINE
3426static void
3427_dispatch_mach_send_drain(dispatch_mach_t dm)
3428{
3429	dispatch_mach_send_refs_t dr = dm->dm_refs;
3430	struct dispatch_object_s *dc = NULL, *next_dc = NULL;
3431	while (dr->dm_tail) {
3432		while (!(dc = fastpath(dr->dm_head))) {
3433			dispatch_hardware_pause();
3434		}
3435		do {
3436			next_dc = fastpath(dc->do_next);
3437			dr->dm_head = next_dc;
3438			if (!next_dc && !dispatch_atomic_cmpxchg2o(dr, dm_tail, dc, NULL,
3439					relaxed)) {
3440				// Enqueue is TIGHTLY controlled, we won't wait long.
3441				while (!(next_dc = fastpath(dc->do_next))) {
3442					dispatch_hardware_pause();
3443				}
3444				dr->dm_head = next_dc;
3445			}
3446			if (!DISPATCH_OBJ_IS_VTABLE(dc)) {
3447				if ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT) {
3448					// send barrier
3449					// leave send queue locked until barrier has completed
3450					return _dispatch_mach_push(dm, dc);
3451				}
3452#if DISPATCH_MACH_SEND_SYNC
3453				if (slowpath((long)dc->do_vtable & DISPATCH_OBJ_SYNC_SLOW_BIT)){
3454					_dispatch_thread_semaphore_signal(
3455							(_dispatch_thread_semaphore_t)dc->do_ctxt);
3456					continue;
3457				}
3458#endif // DISPATCH_MACH_SEND_SYNC
3459				if (slowpath(!_dispatch_mach_reconnect_invoke(dm, dc))) {
3460					goto out;
3461				}
3462				continue;
3463			}
3464			if (slowpath(dr->dm_disconnect_cnt) ||
3465					slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3466				_dispatch_mach_msg_not_sent(dm, dc);
3467				continue;
3468			}
3469			if (slowpath(dc = _dispatch_mach_msg_send(dm, dc)._do)) {
3470				goto out;
3471			}
3472		} while ((dc = next_dc));
3473	}
3474out:
3475	// if this is not a complete drain, we must undo some things
3476	if (slowpath(dc)) {
3477		if (!next_dc &&
3478				!dispatch_atomic_cmpxchg2o(dr, dm_tail, NULL, dc, relaxed)) {
3479			// wait for enqueue slow path to finish
3480			while (!(next_dc = fastpath(dr->dm_head))) {
3481				dispatch_hardware_pause();
3482			}
3483			dc->do_next = next_dc;
3484		}
3485		dr->dm_head = dc;
3486	}
3487	(void)dispatch_atomic_dec2o(dr, dm_sending, release);
3488	_dispatch_wakeup(dm);
3489}
3490
3491static inline void
3492_dispatch_mach_send(dispatch_mach_t dm)
3493{
3494	dispatch_mach_send_refs_t dr = dm->dm_refs;
3495	if (!fastpath(dr->dm_tail) || !fastpath(dispatch_atomic_cmpxchg2o(dr,
3496			dm_sending, 0, 1, acquire))) {
3497		return;
3498	}
3499	_dispatch_object_debug(dm, "%s", __func__);
3500	_dispatch_mach_send_drain(dm);
3501}
3502
3503DISPATCH_NOINLINE
3504static void
3505_dispatch_mach_merge_kevent(dispatch_mach_t dm, const struct kevent64_s *ke)
3506{
3507	if (!(ke->fflags & dm->ds_pending_data_mask)) {
3508		return;
3509	}
3510	_dispatch_mach_send(dm);
3511}
3512
3513DISPATCH_NOINLINE
3514void
3515dispatch_mach_send(dispatch_mach_t dm, dispatch_mach_msg_t dmsg,
3516		mach_msg_option_t options)
3517{
3518	dispatch_mach_send_refs_t dr = dm->dm_refs;
3519	if (slowpath(dmsg->do_next != DISPATCH_OBJECT_LISTLESS)) {
3520		DISPATCH_CLIENT_CRASH("Message already enqueued");
3521	}
3522	dispatch_retain(dmsg);
3523	dispatch_assert_zero(options & DISPATCH_MACH_OPTIONS_MASK);
3524	_dispatch_mach_msg_set_options(dmsg, options & ~DISPATCH_MACH_OPTIONS_MASK);
3525	if (slowpath(dr->dm_tail) || slowpath(dr->dm_disconnect_cnt) ||
3526			slowpath(dm->ds_atomic_flags & DSF_CANCELED) ||
3527			slowpath(!dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1,
3528					acquire))) {
3529		return _dispatch_mach_send_push(dm, dmsg);
3530	}
3531	if (slowpath(dmsg = _dispatch_mach_msg_send(dm, dmsg)._dmsg)) {
3532		(void)dispatch_atomic_dec2o(dr, dm_sending, release);
3533		return _dispatch_mach_send_push(dm, dmsg);
3534	}
3535	if (slowpath(dr->dm_tail)) {
3536		return _dispatch_mach_send_drain(dm);
3537	}
3538	(void)dispatch_atomic_dec2o(dr, dm_sending, release);
3539	_dispatch_wakeup(dm);
3540}
3541
3542static void
3543_dispatch_mach_disconnect(dispatch_mach_t dm)
3544{
3545	dispatch_mach_send_refs_t dr = dm->dm_refs;
3546	if (dm->dm_dkev) {
3547		_dispatch_mach_kevent_unregister(dm);
3548	}
3549	if (MACH_PORT_VALID(dr->dm_send)) {
3550		_dispatch_mach_msg_disconnected(dm, MACH_PORT_NULL, dr->dm_send);
3551	}
3552	dr->dm_send = MACH_PORT_NULL;
3553	if (dr->dm_checkin) {
3554		_dispatch_mach_msg_not_sent(dm, dr->dm_checkin);
3555		dr->dm_checkin = NULL;
3556	}
3557	if (!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3558		dispatch_mach_reply_refs_t dmr, tmp;
3559		TAILQ_FOREACH_SAFE(dmr, &dm->dm_refs->dm_replies, dm_list, tmp){
3560			_dispatch_mach_reply_kevent_unregister(dm, dmr, true);
3561		}
3562	}
3563}
3564
3565DISPATCH_NOINLINE
3566static bool
3567_dispatch_mach_cancel(dispatch_mach_t dm)
3568{
3569	dispatch_mach_send_refs_t dr = dm->dm_refs;
3570	if (!fastpath(dispatch_atomic_cmpxchg2o(dr, dm_sending, 0, 1, acquire))) {
3571		return false;
3572	}
3573	_dispatch_object_debug(dm, "%s", __func__);
3574	_dispatch_mach_disconnect(dm);
3575	if (dm->ds_dkev) {
3576		mach_port_t local_port = (mach_port_t)dm->ds_dkev->dk_kevent.ident;
3577		_dispatch_source_kevent_unregister((dispatch_source_t)dm);
3578		_dispatch_mach_msg_disconnected(dm, local_port, MACH_PORT_NULL);
3579	}
3580	(void)dispatch_atomic_dec2o(dr, dm_sending, release);
3581	return true;
3582}
3583
3584DISPATCH_NOINLINE
3585static bool
3586_dispatch_mach_reconnect_invoke(dispatch_mach_t dm, dispatch_object_t dou)
3587{
3588	if (dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3589		if (slowpath(_dispatch_queue_get_current() != &_dispatch_mgr_q)) {
3590			// send/reply kevents must be uninstalled on the manager queue
3591			return false;
3592		}
3593	}
3594	_dispatch_mach_disconnect(dm);
3595	dispatch_mach_send_refs_t dr = dm->dm_refs;
3596	dr->dm_checkin = dou._dc->dc_data;
3597	dr->dm_send = (mach_port_t)dou._dc->dc_other;
3598	_dispatch_continuation_free(dou._dc);
3599	(void)dispatch_atomic_dec2o(dr, dm_disconnect_cnt, relaxed);
3600	_dispatch_object_debug(dm, "%s", __func__);
3601	return true;
3602}
3603
3604DISPATCH_NOINLINE
3605void
3606dispatch_mach_reconnect(dispatch_mach_t dm, mach_port_t send,
3607		dispatch_mach_msg_t checkin)
3608{
3609	dispatch_mach_send_refs_t dr = dm->dm_refs;
3610	(void)dispatch_atomic_inc2o(dr, dm_disconnect_cnt, relaxed);
3611	if (MACH_PORT_VALID(send) && checkin) {
3612		dispatch_retain(checkin);
3613		dr->dm_checkin_port = _dispatch_mach_msg_get_remote_port(checkin);
3614	} else {
3615		checkin = NULL;
3616		dr->dm_checkin_port = MACH_PORT_NULL;
3617	}
3618	dispatch_continuation_t dc = _dispatch_continuation_alloc();
3619	dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
3620	dc->dc_func = (void*)_dispatch_mach_reconnect_invoke;
3621	dc->dc_ctxt = dc;
3622	dc->dc_data = checkin;
3623	dc->dc_other = (void*)(uintptr_t)send;
3624	return _dispatch_mach_send_push(dm, dc);
3625}
3626
3627#if DISPATCH_MACH_SEND_SYNC
3628DISPATCH_NOINLINE
3629static void
3630_dispatch_mach_send_sync_slow(dispatch_mach_t dm)
3631{
3632	_dispatch_thread_semaphore_t sema = _dispatch_get_thread_semaphore();
3633	struct dispatch_object_s dc = {
3634		.do_vtable = (void *)(DISPATCH_OBJ_SYNC_SLOW_BIT),
3635		.do_ctxt = (void*)sema,
3636	};
3637	_dispatch_mach_send_push(dm, &dc);
3638	_dispatch_thread_semaphore_wait(sema);
3639	_dispatch_put_thread_semaphore(sema);
3640}
3641#endif // DISPATCH_MACH_SEND_SYNC
3642
3643DISPATCH_NOINLINE
3644mach_port_t
3645dispatch_mach_get_checkin_port(dispatch_mach_t dm)
3646{
3647	dispatch_mach_send_refs_t dr = dm->dm_refs;
3648	if (slowpath(dm->ds_atomic_flags & DSF_CANCELED)) {
3649		return MACH_PORT_DEAD;
3650	}
3651	return dr->dm_checkin_port;
3652}
3653
3654DISPATCH_NOINLINE
3655static void
3656_dispatch_mach_connect_invoke(dispatch_mach_t dm)
3657{
3658	dispatch_mach_refs_t dr = dm->ds_refs;
3659	_dispatch_client_callout4(dr->dm_handler_ctxt,
3660			DISPATCH_MACH_CONNECTED, NULL, 0, dr->dm_handler_func);
3661	dm->dm_connect_handler_called = 1;
3662}
3663
3664DISPATCH_NOINLINE
3665void
3666_dispatch_mach_msg_invoke(dispatch_mach_msg_t dmsg)
3667{
3668	dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
3669	dispatch_mach_refs_t dr = dm->ds_refs;
3670	mach_error_t err;
3671	unsigned long reason = _dispatch_mach_msg_get_reason(dmsg, &err);
3672
3673	dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
3674	_dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq);
3675	if (slowpath(!dm->dm_connect_handler_called)) {
3676		_dispatch_mach_connect_invoke(dm);
3677	}
3678	_dispatch_client_callout4(dr->dm_handler_ctxt, reason, dmsg, err,
3679			dr->dm_handler_func);
3680	_dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm);
3681	_dispatch_introspection_queue_item_complete(dmsg);
3682	dispatch_release(dmsg);
3683}
3684
3685DISPATCH_NOINLINE
3686void
3687_dispatch_mach_barrier_invoke(void *ctxt)
3688{
3689	dispatch_mach_t dm = (dispatch_mach_t)_dispatch_queue_get_current();
3690	dispatch_mach_refs_t dr = dm->ds_refs;
3691	struct dispatch_continuation_s *dc = ctxt;
3692	void *context = dc->dc_data;
3693	dispatch_function_t barrier = dc->dc_other;
3694	bool send_barrier = ((long)dc->do_vtable & DISPATCH_OBJ_BARRIER_BIT);
3695
3696	_dispatch_thread_setspecific(dispatch_queue_key, dm->do_targetq);
3697	if (slowpath(!dm->dm_connect_handler_called)) {
3698		_dispatch_mach_connect_invoke(dm);
3699	}
3700	_dispatch_client_callout(context, barrier);
3701	_dispatch_client_callout4(dr->dm_handler_ctxt,
3702			DISPATCH_MACH_BARRIER_COMPLETED, NULL, 0, dr->dm_handler_func);
3703	_dispatch_thread_setspecific(dispatch_queue_key, (dispatch_queue_t)dm);
3704	if (send_barrier) {
3705		(void)dispatch_atomic_dec2o(dm->dm_refs, dm_sending, release);
3706	}
3707}
3708
3709DISPATCH_NOINLINE
3710void
3711dispatch_mach_send_barrier_f(dispatch_mach_t dm, void *context,
3712		dispatch_function_t barrier)
3713{
3714	dispatch_continuation_t dc = _dispatch_continuation_alloc();
3715	dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT | DISPATCH_OBJ_BARRIER_BIT);
3716	dc->dc_func = _dispatch_mach_barrier_invoke;
3717	dc->dc_ctxt = dc;
3718	dc->dc_data = context;
3719	dc->dc_other = barrier;
3720
3721	dispatch_mach_send_refs_t dr = dm->dm_refs;
3722	if (slowpath(dr->dm_tail) || slowpath(!dispatch_atomic_cmpxchg2o(dr,
3723			dm_sending, 0, 1, acquire))) {
3724		return _dispatch_mach_send_push(dm, dc);
3725	}
3726	// leave send queue locked until barrier has completed
3727	return _dispatch_mach_push(dm, dc);
3728}
3729
3730DISPATCH_NOINLINE
3731void
3732dispatch_mach_receive_barrier_f(dispatch_mach_t dm, void *context,
3733		dispatch_function_t barrier)
3734{
3735	dispatch_continuation_t dc = _dispatch_continuation_alloc();
3736	dc->do_vtable = (void *)(DISPATCH_OBJ_ASYNC_BIT);
3737	dc->dc_func = _dispatch_mach_barrier_invoke;
3738	dc->dc_ctxt = dc;
3739	dc->dc_data = context;
3740	dc->dc_other = barrier;
3741	return _dispatch_mach_push(dm, dc);
3742}
3743
3744DISPATCH_NOINLINE
3745void
3746dispatch_mach_send_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
3747{
3748	dispatch_mach_send_barrier_f(dm, _dispatch_Block_copy(barrier),
3749			_dispatch_call_block_and_release);
3750}
3751
3752DISPATCH_NOINLINE
3753void
3754dispatch_mach_receive_barrier(dispatch_mach_t dm, dispatch_block_t barrier)
3755{
3756	dispatch_mach_receive_barrier_f(dm, _dispatch_Block_copy(barrier),
3757			_dispatch_call_block_and_release);
3758}
3759
3760DISPATCH_NOINLINE
3761static void
3762_dispatch_mach_cancel_invoke(dispatch_mach_t dm)
3763{
3764	dispatch_mach_refs_t dr = dm->ds_refs;
3765	if (slowpath(!dm->dm_connect_handler_called)) {
3766		_dispatch_mach_connect_invoke(dm);
3767	}
3768	_dispatch_client_callout4(dr->dm_handler_ctxt,
3769			DISPATCH_MACH_CANCELED, NULL, 0, dr->dm_handler_func);
3770	dm->dm_cancel_handler_called = 1;
3771	_dispatch_release(dm); // the retain is done at creation time
3772}
3773
3774DISPATCH_NOINLINE
3775void
3776dispatch_mach_cancel(dispatch_mach_t dm)
3777{
3778	dispatch_source_cancel((dispatch_source_t)dm);
3779}
3780
3781DISPATCH_ALWAYS_INLINE
3782static inline dispatch_queue_t
3783_dispatch_mach_invoke2(dispatch_object_t dou,
3784		_dispatch_thread_semaphore_t *sema_ptr DISPATCH_UNUSED)
3785{
3786	dispatch_mach_t dm = dou._dm;
3787
3788	// This function performs all mach channel actions. Each action is
3789	// responsible for verifying that it takes place on the appropriate queue.
3790	// If the current queue is not the correct queue for this action, the
3791	// correct queue will be returned and the invoke will be re-driven on that
3792	// queue.
3793
3794	// The order of tests here in invoke and in probe should be consistent.
3795
3796	dispatch_queue_t dq = _dispatch_queue_get_current();
3797	dispatch_mach_send_refs_t dr = dm->dm_refs;
3798
3799	if (slowpath(!dm->ds_is_installed)) {
3800		// The channel needs to be installed on the manager queue.
3801		if (dq != &_dispatch_mgr_q) {
3802			return &_dispatch_mgr_q;
3803		}
3804		if (dm->ds_dkev) {
3805			_dispatch_source_kevent_register((dispatch_source_t)dm);
3806		}
3807		dm->ds_is_installed = true;
3808		_dispatch_mach_send(dm);
3809		// Apply initial target queue change
3810		_dispatch_queue_drain(dou);
3811		if (dm->dq_items_tail) {
3812			return dm->do_targetq;
3813		}
3814	} else if (dm->dq_items_tail) {
3815		// The channel has pending messages to deliver to the target queue.
3816		if (dq != dm->do_targetq) {
3817			return dm->do_targetq;
3818		}
3819		dispatch_queue_t tq = dm->do_targetq;
3820		if (slowpath(_dispatch_queue_drain(dou))) {
3821			DISPATCH_CLIENT_CRASH("Sync onto mach channel");
3822		}
3823		if (slowpath(tq != dm->do_targetq)) {
3824			// An item on the channel changed the target queue
3825			return dm->do_targetq;
3826		}
3827	} else if (dr->dm_tail) {
3828		if (slowpath(dr->dm_needs_mgr) || (slowpath(dr->dm_disconnect_cnt) &&
3829				(dm->dm_dkev || !TAILQ_EMPTY(&dm->dm_refs->dm_replies)))) {
3830			// Send/reply kevents need to be installed or uninstalled
3831			if (dq != &_dispatch_mgr_q) {
3832				return &_dispatch_mgr_q;
3833			}
3834		}
3835		if (!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) ||
3836				(dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt) {
3837			// The channel has pending messages to send.
3838			_dispatch_mach_send(dm);
3839		}
3840	} else if (dm->ds_atomic_flags & DSF_CANCELED){
3841		// The channel has been cancelled and needs to be uninstalled from the
3842		// manager queue. After uninstallation, the cancellation handler needs
3843		// to be delivered to the target queue.
3844		if (dm->ds_dkev || dm->dm_dkev || dr->dm_send ||
3845				!TAILQ_EMPTY(&dm->dm_refs->dm_replies)) {
3846			if (dq != &_dispatch_mgr_q) {
3847				return &_dispatch_mgr_q;
3848			}
3849			if (!_dispatch_mach_cancel(dm)) {
3850				return NULL;
3851			}
3852		}
3853		if (!dm->dm_cancel_handler_called) {
3854			if (dq != dm->do_targetq) {
3855				return dm->do_targetq;
3856			}
3857			_dispatch_mach_cancel_invoke(dm);
3858		}
3859	}
3860	return NULL;
3861}
3862
3863DISPATCH_NOINLINE
3864void
3865_dispatch_mach_invoke(dispatch_mach_t dm)
3866{
3867	_dispatch_queue_class_invoke(dm, _dispatch_mach_invoke2);
3868}
3869
3870unsigned long
3871_dispatch_mach_probe(dispatch_mach_t dm)
3872{
3873	// This function determines whether the mach channel needs to be invoked.
3874	// The order of tests here in probe and in invoke should be consistent.
3875
3876	dispatch_mach_send_refs_t dr = dm->dm_refs;
3877
3878	if (slowpath(!dm->ds_is_installed)) {
3879		// The channel needs to be installed on the manager queue.
3880		return true;
3881	} else if (dm->dq_items_tail) {
3882		// The source has pending messages to deliver to the target queue.
3883		return true;
3884	} else if (dr->dm_tail &&
3885			(!(dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev)) ||
3886			(dm->ds_atomic_flags & DSF_CANCELED) || dr->dm_disconnect_cnt)) {
3887		// The channel has pending messages to send.
3888		return true;
3889	} else if (dm->ds_atomic_flags & DSF_CANCELED) {
3890		if (dm->ds_dkev || dm->dm_dkev || dr->dm_send ||
3891				!TAILQ_EMPTY(&dm->dm_refs->dm_replies) ||
3892				!dm->dm_cancel_handler_called) {
3893			// The channel needs to be uninstalled from the manager queue, or
3894			// the cancellation handler needs to be delivered to the target
3895			// queue.
3896			return true;
3897		}
3898	}
3899	// Nothing to do.
3900	return false;
3901}
3902
3903#pragma mark -
3904#pragma mark dispatch_mach_msg_t
3905
3906dispatch_mach_msg_t
3907dispatch_mach_msg_create(mach_msg_header_t *msg, size_t size,
3908		dispatch_mach_msg_destructor_t destructor, mach_msg_header_t **msg_ptr)
3909{
3910	if (slowpath(size < sizeof(mach_msg_header_t)) ||
3911			slowpath(destructor && !msg)) {
3912		DISPATCH_CLIENT_CRASH("Empty message");
3913	}
3914	dispatch_mach_msg_t dmsg = _dispatch_alloc(DISPATCH_VTABLE(mach_msg),
3915			sizeof(struct dispatch_mach_msg_s) +
3916			(destructor ? 0 : size - sizeof(dmsg->msg)));
3917	if (destructor) {
3918		dmsg->msg = msg;
3919	} else if (msg) {
3920		memcpy(dmsg->buf, msg, size);
3921	}
3922	dmsg->do_next = DISPATCH_OBJECT_LISTLESS;
3923	dmsg->do_targetq = _dispatch_get_root_queue(0, false);
3924	dmsg->destructor = destructor;
3925	dmsg->size = size;
3926	if (msg_ptr) {
3927		*msg_ptr = _dispatch_mach_msg_get_msg(dmsg);
3928	}
3929	return dmsg;
3930}
3931
3932void
3933_dispatch_mach_msg_dispose(dispatch_mach_msg_t dmsg)
3934{
3935	switch (dmsg->destructor) {
3936	case DISPATCH_MACH_MSG_DESTRUCTOR_DEFAULT:
3937		break;
3938	case DISPATCH_MACH_MSG_DESTRUCTOR_FREE:
3939		free(dmsg->msg);
3940		break;
3941	case DISPATCH_MACH_MSG_DESTRUCTOR_VM_DEALLOCATE: {
3942		mach_vm_size_t vm_size = dmsg->size;
3943		mach_vm_address_t vm_addr = (uintptr_t)dmsg->msg;
3944		(void)dispatch_assume_zero(mach_vm_deallocate(mach_task_self(),
3945				vm_addr, vm_size));
3946		break;
3947	}}
3948}
3949
3950static inline mach_msg_header_t*
3951_dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg)
3952{
3953	return dmsg->destructor ? dmsg->msg : (mach_msg_header_t*)dmsg->buf;
3954}
3955
3956mach_msg_header_t*
3957dispatch_mach_msg_get_msg(dispatch_mach_msg_t dmsg, size_t *size_ptr)
3958{
3959	if (size_ptr) {
3960		*size_ptr = dmsg->size;
3961	}
3962	return _dispatch_mach_msg_get_msg(dmsg);
3963}
3964
3965size_t
3966_dispatch_mach_msg_debug(dispatch_mach_msg_t dmsg, char* buf, size_t bufsiz)
3967{
3968	size_t offset = 0;
3969	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
3970			dx_kind(dmsg), dmsg);
3971	offset += dsnprintf(&buf[offset], bufsiz - offset, "xrefcnt = 0x%x, "
3972			"refcnt = 0x%x, ", dmsg->do_xref_cnt + 1, dmsg->do_ref_cnt + 1);
3973	offset += dsnprintf(&buf[offset], bufsiz - offset, "opts/err = 0x%x, "
3974			"msgh[%p] = { ", dmsg->do_suspend_cnt, dmsg->buf);
3975	mach_msg_header_t *hdr = _dispatch_mach_msg_get_msg(dmsg);
3976	if (hdr->msgh_id) {
3977		offset += dsnprintf(&buf[offset], bufsiz - offset, "id 0x%x, ",
3978				hdr->msgh_id);
3979	}
3980	if (hdr->msgh_size) {
3981		offset += dsnprintf(&buf[offset], bufsiz - offset, "size %u, ",
3982				hdr->msgh_size);
3983	}
3984	if (hdr->msgh_bits) {
3985		offset += dsnprintf(&buf[offset], bufsiz - offset, "bits <l %u, r %u",
3986				MACH_MSGH_BITS_LOCAL(hdr->msgh_bits),
3987				MACH_MSGH_BITS_REMOTE(hdr->msgh_bits));
3988		if (MACH_MSGH_BITS_OTHER(hdr->msgh_bits)) {
3989			offset += dsnprintf(&buf[offset], bufsiz - offset, ", o 0x%x",
3990					MACH_MSGH_BITS_OTHER(hdr->msgh_bits));
3991		}
3992		offset += dsnprintf(&buf[offset], bufsiz - offset, ">, ");
3993	}
3994	if (hdr->msgh_local_port && hdr->msgh_remote_port) {
3995		offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x, "
3996				"remote 0x%x", hdr->msgh_local_port, hdr->msgh_remote_port);
3997	} else if (hdr->msgh_local_port) {
3998		offset += dsnprintf(&buf[offset], bufsiz - offset, "local 0x%x",
3999				hdr->msgh_local_port);
4000	} else if (hdr->msgh_remote_port) {
4001		offset += dsnprintf(&buf[offset], bufsiz - offset, "remote 0x%x",
4002				hdr->msgh_remote_port);
4003	} else {
4004		offset += dsnprintf(&buf[offset], bufsiz - offset, "no ports");
4005	}
4006	offset += dsnprintf(&buf[offset], bufsiz - offset, " } }");
4007	return offset;
4008}
4009
4010#pragma mark -
4011#pragma mark dispatch_mig_server
4012
4013mach_msg_return_t
4014dispatch_mig_server(dispatch_source_t ds, size_t maxmsgsz,
4015		dispatch_mig_callback_t callback)
4016{
4017	mach_msg_options_t options = MACH_RCV_MSG | MACH_RCV_TIMEOUT
4018		| MACH_RCV_TRAILER_ELEMENTS(MACH_RCV_TRAILER_CTX)
4019		| MACH_RCV_TRAILER_TYPE(MACH_MSG_TRAILER_FORMAT_0);
4020	mach_msg_options_t tmp_options;
4021	mig_reply_error_t *bufTemp, *bufRequest, *bufReply;
4022	mach_msg_return_t kr = 0;
4023	uint64_t assertion_token = 0;
4024	unsigned int cnt = 1000; // do not stall out serial queues
4025	boolean_t demux_success;
4026	bool received = false;
4027	size_t rcv_size = maxmsgsz + MAX_TRAILER_SIZE;
4028
4029	// XXX FIXME -- allocate these elsewhere
4030	bufRequest = alloca(rcv_size);
4031	bufReply = alloca(rcv_size);
4032	bufReply->Head.msgh_size = 0;
4033	bufRequest->RetCode = 0;
4034
4035#if DISPATCH_DEBUG
4036	options |= MACH_RCV_LARGE; // rdar://problem/8422992
4037#endif
4038	tmp_options = options;
4039	// XXX FIXME -- change this to not starve out the target queue
4040	for (;;) {
4041		if (DISPATCH_OBJECT_SUSPENDED(ds) || (--cnt == 0)) {
4042			options &= ~MACH_RCV_MSG;
4043			tmp_options &= ~MACH_RCV_MSG;
4044
4045			if (!(tmp_options & MACH_SEND_MSG)) {
4046				goto out;
4047			}
4048		}
4049		kr = mach_msg(&bufReply->Head, tmp_options, bufReply->Head.msgh_size,
4050				(mach_msg_size_t)rcv_size, (mach_port_t)ds->ds_ident_hack, 0,0);
4051
4052		tmp_options = options;
4053
4054		if (slowpath(kr)) {
4055			switch (kr) {
4056			case MACH_SEND_INVALID_DEST:
4057			case MACH_SEND_TIMED_OUT:
4058				if (bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX) {
4059					mach_msg_destroy(&bufReply->Head);
4060				}
4061				break;
4062			case MACH_RCV_TIMED_OUT:
4063				// Don't return an error if a message was sent this time or
4064				// a message was successfully received previously
4065				// rdar://problems/7363620&7791738
4066				if(bufReply->Head.msgh_remote_port || received) {
4067					kr = MACH_MSG_SUCCESS;
4068				}
4069				break;
4070			case MACH_RCV_INVALID_NAME:
4071				break;
4072#if DISPATCH_DEBUG
4073			case MACH_RCV_TOO_LARGE:
4074				// receive messages that are too large and log their id and size
4075				// rdar://problem/8422992
4076				tmp_options &= ~MACH_RCV_LARGE;
4077				size_t large_size = bufReply->Head.msgh_size + MAX_TRAILER_SIZE;
4078				void *large_buf = malloc(large_size);
4079				if (large_buf) {
4080					rcv_size = large_size;
4081					bufReply = large_buf;
4082				}
4083				if (!mach_msg(&bufReply->Head, tmp_options, 0,
4084						(mach_msg_size_t)rcv_size,
4085						(mach_port_t)ds->ds_ident_hack, 0, 0)) {
4086					_dispatch_log("BUG in libdispatch client: "
4087							"dispatch_mig_server received message larger than "
4088							"requested size %zd: id = 0x%x, size = %d",
4089							maxmsgsz, bufReply->Head.msgh_id,
4090							bufReply->Head.msgh_size);
4091				}
4092				if (large_buf) {
4093					free(large_buf);
4094				}
4095				// fall through
4096#endif
4097			default:
4098				_dispatch_bug_mach_client(
4099						"dispatch_mig_server: mach_msg() failed", kr);
4100				break;
4101			}
4102			goto out;
4103		}
4104
4105		if (!(tmp_options & MACH_RCV_MSG)) {
4106			goto out;
4107		}
4108
4109		if (assertion_token) {
4110#if DISPATCH_USE_IMPORTANCE_ASSERTION
4111			int r = proc_importance_assertion_complete(assertion_token);
4112			(void)dispatch_assume_zero(r);
4113#endif
4114			assertion_token = 0;
4115		}
4116		received = true;
4117
4118		bufTemp = bufRequest;
4119		bufRequest = bufReply;
4120		bufReply = bufTemp;
4121
4122#if DISPATCH_USE_IMPORTANCE_ASSERTION
4123		int r = proc_importance_assertion_begin_with_msg(&bufRequest->Head,
4124				NULL, &assertion_token);
4125		if (r && slowpath(r != EIO)) {
4126			(void)dispatch_assume_zero(r);
4127		}
4128#endif
4129
4130		demux_success = callback(&bufRequest->Head, &bufReply->Head);
4131
4132		if (!demux_success) {
4133			// destroy the request - but not the reply port
4134			bufRequest->Head.msgh_remote_port = 0;
4135			mach_msg_destroy(&bufRequest->Head);
4136		} else if (!(bufReply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)) {
4137			// if MACH_MSGH_BITS_COMPLEX is _not_ set, then bufReply->RetCode
4138			// is present
4139			if (slowpath(bufReply->RetCode)) {
4140				if (bufReply->RetCode == MIG_NO_REPLY) {
4141					continue;
4142				}
4143
4144				// destroy the request - but not the reply port
4145				bufRequest->Head.msgh_remote_port = 0;
4146				mach_msg_destroy(&bufRequest->Head);
4147			}
4148		}
4149
4150		if (bufReply->Head.msgh_remote_port) {
4151			tmp_options |= MACH_SEND_MSG;
4152			if (MACH_MSGH_BITS_REMOTE(bufReply->Head.msgh_bits) !=
4153					MACH_MSG_TYPE_MOVE_SEND_ONCE) {
4154				tmp_options |= MACH_SEND_TIMEOUT;
4155			}
4156		}
4157	}
4158
4159out:
4160	if (assertion_token) {
4161#if DISPATCH_USE_IMPORTANCE_ASSERTION
4162		int r = proc_importance_assertion_complete(assertion_token);
4163		(void)dispatch_assume_zero(r);
4164#endif
4165	}
4166
4167	return kr;
4168}
4169
4170#endif /* HAVE_MACH */
4171
4172#pragma mark -
4173#pragma mark dispatch_source_debug
4174
4175DISPATCH_NOINLINE
4176static const char *
4177_evfiltstr(short filt)
4178{
4179	switch (filt) {
4180#define _evfilt2(f) case (f): return #f
4181	_evfilt2(EVFILT_READ);
4182	_evfilt2(EVFILT_WRITE);
4183	_evfilt2(EVFILT_AIO);
4184	_evfilt2(EVFILT_VNODE);
4185	_evfilt2(EVFILT_PROC);
4186	_evfilt2(EVFILT_SIGNAL);
4187	_evfilt2(EVFILT_TIMER);
4188#ifdef EVFILT_VM
4189	_evfilt2(EVFILT_VM);
4190#endif
4191#ifdef EVFILT_MEMORYSTATUS
4192	_evfilt2(EVFILT_MEMORYSTATUS);
4193#endif
4194#if HAVE_MACH
4195	_evfilt2(EVFILT_MACHPORT);
4196	_evfilt2(DISPATCH_EVFILT_MACH_NOTIFICATION);
4197#endif
4198	_evfilt2(EVFILT_FS);
4199	_evfilt2(EVFILT_USER);
4200
4201	_evfilt2(DISPATCH_EVFILT_TIMER);
4202	_evfilt2(DISPATCH_EVFILT_CUSTOM_ADD);
4203	_evfilt2(DISPATCH_EVFILT_CUSTOM_OR);
4204	default:
4205		return "EVFILT_missing";
4206	}
4207}
4208
4209static size_t
4210_dispatch_source_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
4211{
4212	dispatch_queue_t target = ds->do_targetq;
4213	return dsnprintf(buf, bufsiz, "target = %s[%p], ident = 0x%lx, "
4214			"pending_data = 0x%lx, pending_data_mask = 0x%lx, ",
4215			target && target->dq_label ? target->dq_label : "", target,
4216			ds->ds_ident_hack, ds->ds_pending_data, ds->ds_pending_data_mask);
4217}
4218
4219static size_t
4220_dispatch_timer_debug_attr(dispatch_source_t ds, char* buf, size_t bufsiz)
4221{
4222	dispatch_source_refs_t dr = ds->ds_refs;
4223	return dsnprintf(buf, bufsiz, "timer = { target = 0x%llx, deadline = 0x%llx,"
4224			" last_fire = 0x%llx, interval = 0x%llx, flags = 0x%lx }, ",
4225			ds_timer(dr).target, ds_timer(dr).deadline, ds_timer(dr).last_fire,
4226			ds_timer(dr).interval, ds_timer(dr).flags);
4227}
4228
4229size_t
4230_dispatch_source_debug(dispatch_source_t ds, char* buf, size_t bufsiz)
4231{
4232	size_t offset = 0;
4233	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4234			dx_kind(ds), ds);
4235	offset += _dispatch_object_debug_attr(ds, &buf[offset], bufsiz - offset);
4236	offset += _dispatch_source_debug_attr(ds, &buf[offset], bufsiz - offset);
4237	if (ds->ds_is_timer) {
4238		offset += _dispatch_timer_debug_attr(ds, &buf[offset], bufsiz - offset);
4239	}
4240	offset += dsnprintf(&buf[offset], bufsiz - offset, "filter = %s }",
4241			ds->ds_dkev ? _evfiltstr(ds->ds_dkev->dk_kevent.filter) : "????");
4242	return offset;
4243}
4244
4245static size_t
4246_dispatch_mach_debug_attr(dispatch_mach_t dm, char* buf, size_t bufsiz)
4247{
4248	dispatch_queue_t target = dm->do_targetq;
4249	return dsnprintf(buf, bufsiz, "target = %s[%p], receive = 0x%x, "
4250			"send = 0x%x, send-possible = 0x%x%s, checkin = 0x%x%s, "
4251			"sending = %d, disconnected = %d, canceled = %d ",
4252			target && target->dq_label ? target->dq_label : "", target,
4253			dm->ds_dkev ?(mach_port_t)dm->ds_dkev->dk_kevent.ident:0,
4254			dm->dm_refs->dm_send,
4255			dm->dm_dkev ?(mach_port_t)dm->dm_dkev->dk_kevent.ident:0,
4256			dm->dm_dkev && DISPATCH_MACH_KEVENT_ARMED(dm->dm_dkev) ?
4257			" (armed)" : "", dm->dm_refs->dm_checkin_port,
4258			dm->dm_refs->dm_checkin ? " (pending)" : "",
4259			dm->dm_refs->dm_sending, dm->dm_refs->dm_disconnect_cnt,
4260			(bool)(dm->ds_atomic_flags & DSF_CANCELED));
4261}
4262size_t
4263_dispatch_mach_debug(dispatch_mach_t dm, char* buf, size_t bufsiz)
4264{
4265	size_t offset = 0;
4266	offset += dsnprintf(&buf[offset], bufsiz - offset, "%s[%p] = { ",
4267			dm->dq_label ? dm->dq_label : dx_kind(dm), dm);
4268	offset += _dispatch_object_debug_attr(dm, &buf[offset], bufsiz - offset);
4269	offset += _dispatch_mach_debug_attr(dm, &buf[offset], bufsiz - offset);
4270	offset += dsnprintf(&buf[offset], bufsiz - offset, "}");
4271	return offset;
4272}
4273
4274#if DISPATCH_DEBUG
4275static void
4276_dispatch_kevent_debug(struct kevent64_s* kev, const char* str)
4277{
4278	_dispatch_log("kevent[%p] = { ident = 0x%llx, filter = %s, flags = 0x%x, "
4279			"fflags = 0x%x, data = 0x%llx, udata = 0x%llx, ext[0] = 0x%llx, "
4280			"ext[1] = 0x%llx }: %s", kev, kev->ident, _evfiltstr(kev->filter),
4281			kev->flags, kev->fflags, kev->data, kev->udata, kev->ext[0],
4282			kev->ext[1], str);
4283}
4284
4285static void
4286_dispatch_kevent_debugger2(void *context)
4287{
4288	struct sockaddr sa;
4289	socklen_t sa_len = sizeof(sa);
4290	int c, fd = (int)(long)context;
4291	unsigned int i;
4292	dispatch_kevent_t dk;
4293	dispatch_source_t ds;
4294	dispatch_source_refs_t dr;
4295	FILE *debug_stream;
4296
4297	c = accept(fd, &sa, &sa_len);
4298	if (c == -1) {
4299		if (errno != EAGAIN) {
4300			(void)dispatch_assume_zero(errno);
4301		}
4302		return;
4303	}
4304#if 0
4305	int r = fcntl(c, F_SETFL, 0); // disable non-blocking IO
4306	if (r == -1) {
4307		(void)dispatch_assume_zero(errno);
4308	}
4309#endif
4310	debug_stream = fdopen(c, "a");
4311	if (!dispatch_assume(debug_stream)) {
4312		close(c);
4313		return;
4314	}
4315
4316	fprintf(debug_stream, "HTTP/1.0 200 OK\r\n");
4317	fprintf(debug_stream, "Content-type: text/html\r\n");
4318	fprintf(debug_stream, "Pragma: nocache\r\n");
4319	fprintf(debug_stream, "\r\n");
4320	fprintf(debug_stream, "<html>\n");
4321	fprintf(debug_stream, "<head><title>PID %u</title></head>\n", getpid());
4322	fprintf(debug_stream, "<body>\n<ul>\n");
4323
4324	//fprintf(debug_stream, "<tr><td>DK</td><td>DK</td><td>DK</td><td>DK</td>"
4325	//		"<td>DK</td><td>DK</td><td>DK</td></tr>\n");
4326
4327	for (i = 0; i < DSL_HASH_SIZE; i++) {
4328		if (TAILQ_EMPTY(&_dispatch_sources[i])) {
4329			continue;
4330		}
4331		TAILQ_FOREACH(dk, &_dispatch_sources[i], dk_list) {
4332			fprintf(debug_stream, "\t<br><li>DK %p ident %lu filter %s flags "
4333					"0x%hx fflags 0x%x data 0x%lx udata %p\n",
4334					dk, (unsigned long)dk->dk_kevent.ident,
4335					_evfiltstr(dk->dk_kevent.filter), dk->dk_kevent.flags,
4336					dk->dk_kevent.fflags, (unsigned long)dk->dk_kevent.data,
4337					(void*)dk->dk_kevent.udata);
4338			fprintf(debug_stream, "\t\t<ul>\n");
4339			TAILQ_FOREACH(dr, &dk->dk_sources, dr_list) {
4340				ds = _dispatch_source_from_refs(dr);
4341				fprintf(debug_stream, "\t\t\t<li>DS %p refcnt 0x%x suspend "
4342						"0x%x data 0x%lx mask 0x%lx flags 0x%x</li>\n",
4343						ds, ds->do_ref_cnt + 1, ds->do_suspend_cnt,
4344						ds->ds_pending_data, ds->ds_pending_data_mask,
4345						ds->ds_atomic_flags);
4346				if (ds->do_suspend_cnt == DISPATCH_OBJECT_SUSPEND_LOCK) {
4347					dispatch_queue_t dq = ds->do_targetq;
4348					fprintf(debug_stream, "\t\t<br>DQ: %p refcnt 0x%x suspend "
4349							"0x%x label: %s\n", dq, dq->do_ref_cnt + 1,
4350							dq->do_suspend_cnt, dq->dq_label ? dq->dq_label:"");
4351				}
4352			}
4353			fprintf(debug_stream, "\t\t</ul>\n");
4354			fprintf(debug_stream, "\t</li>\n");
4355		}
4356	}
4357	fprintf(debug_stream, "</ul>\n</body>\n</html>\n");
4358	fflush(debug_stream);
4359	fclose(debug_stream);
4360}
4361
4362static void
4363_dispatch_kevent_debugger2_cancel(void *context)
4364{
4365	int ret, fd = (int)(long)context;
4366
4367	ret = close(fd);
4368	if (ret != -1) {
4369		(void)dispatch_assume_zero(errno);
4370	}
4371}
4372
4373static void
4374_dispatch_kevent_debugger(void *context DISPATCH_UNUSED)
4375{
4376	union {
4377		struct sockaddr_in sa_in;
4378		struct sockaddr sa;
4379	} sa_u = {
4380		.sa_in = {
4381			.sin_family = AF_INET,
4382			.sin_addr = { htonl(INADDR_LOOPBACK), },
4383		},
4384	};
4385	dispatch_source_t ds;
4386	const char *valstr;
4387	int val, r, fd, sock_opt = 1;
4388	socklen_t slen = sizeof(sa_u);
4389
4390	if (issetugid()) {
4391		return;
4392	}
4393	valstr = getenv("LIBDISPATCH_DEBUGGER");
4394	if (!valstr) {
4395		return;
4396	}
4397	val = atoi(valstr);
4398	if (val == 2) {
4399		sa_u.sa_in.sin_addr.s_addr = 0;
4400	}
4401	fd = socket(PF_INET, SOCK_STREAM, 0);
4402	if (fd == -1) {
4403		(void)dispatch_assume_zero(errno);
4404		return;
4405	}
4406	r = setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (void *)&sock_opt,
4407			(socklen_t) sizeof sock_opt);
4408	if (r == -1) {
4409		(void)dispatch_assume_zero(errno);
4410		goto out_bad;
4411	}
4412#if 0
4413	r = fcntl(fd, F_SETFL, O_NONBLOCK);
4414	if (r == -1) {
4415		(void)dispatch_assume_zero(errno);
4416		goto out_bad;
4417	}
4418#endif
4419	r = bind(fd, &sa_u.sa, sizeof(sa_u));
4420	if (r == -1) {
4421		(void)dispatch_assume_zero(errno);
4422		goto out_bad;
4423	}
4424	r = listen(fd, SOMAXCONN);
4425	if (r == -1) {
4426		(void)dispatch_assume_zero(errno);
4427		goto out_bad;
4428	}
4429	r = getsockname(fd, &sa_u.sa, &slen);
4430	if (r == -1) {
4431		(void)dispatch_assume_zero(errno);
4432		goto out_bad;
4433	}
4434
4435	ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, (uintptr_t)fd, 0,
4436			&_dispatch_mgr_q);
4437	if (dispatch_assume(ds)) {
4438		_dispatch_log("LIBDISPATCH: debug port: %hu",
4439				(in_port_t)ntohs(sa_u.sa_in.sin_port));
4440
4441		/* ownership of fd transfers to ds */
4442		dispatch_set_context(ds, (void *)(long)fd);
4443		dispatch_source_set_event_handler_f(ds, _dispatch_kevent_debugger2);
4444		dispatch_source_set_cancel_handler_f(ds,
4445				_dispatch_kevent_debugger2_cancel);
4446		dispatch_resume(ds);
4447
4448		return;
4449	}
4450out_bad:
4451	close(fd);
4452}
4453
4454#if HAVE_MACH
4455
4456#ifndef MACH_PORT_TYPE_SPREQUEST
4457#define MACH_PORT_TYPE_SPREQUEST 0x40000000
4458#endif
4459
4460DISPATCH_NOINLINE
4461void
4462dispatch_debug_machport(mach_port_t name, const char* str)
4463{
4464	mach_port_type_t type;
4465	mach_msg_bits_t ns = 0, nr = 0, nso = 0, nd = 0;
4466	unsigned int dnreqs = 0, dnrsiz;
4467	kern_return_t kr = mach_port_type(mach_task_self(), name, &type);
4468	if (kr) {
4469		_dispatch_log("machport[0x%08x] = { error(0x%x) \"%s\" }: %s", name,
4470				kr, mach_error_string(kr), str);
4471		return;
4472	}
4473	if (type & MACH_PORT_TYPE_SEND) {
4474		(void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4475				MACH_PORT_RIGHT_SEND, &ns));
4476	}
4477	if (type & MACH_PORT_TYPE_SEND_ONCE) {
4478		(void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4479				MACH_PORT_RIGHT_SEND_ONCE, &nso));
4480	}
4481	if (type & MACH_PORT_TYPE_DEAD_NAME) {
4482		(void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4483				MACH_PORT_RIGHT_DEAD_NAME, &nd));
4484	}
4485	if (type & (MACH_PORT_TYPE_RECEIVE|MACH_PORT_TYPE_SEND)) {
4486		(void)dispatch_assume_zero(mach_port_dnrequest_info(mach_task_self(),
4487				name, &dnrsiz, &dnreqs));
4488	}
4489	if (type & MACH_PORT_TYPE_RECEIVE) {
4490		mach_port_status_t status = { .mps_pset = 0, };
4491		mach_msg_type_number_t cnt = MACH_PORT_RECEIVE_STATUS_COUNT;
4492		(void)dispatch_assume_zero(mach_port_get_refs(mach_task_self(), name,
4493				MACH_PORT_RIGHT_RECEIVE, &nr));
4494		(void)dispatch_assume_zero(mach_port_get_attributes(mach_task_self(),
4495				name, MACH_PORT_RECEIVE_STATUS, (void*)&status, &cnt));
4496		_dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4497				"dnreqs(%03u) spreq(%s) nsreq(%s) pdreq(%s) srights(%s) "
4498				"sorights(%03u) qlim(%03u) msgcount(%03u) mkscount(%03u) "
4499				"seqno(%03u) }: %s", name, nr, ns, nso, nd, dnreqs,
4500				type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N",
4501				status.mps_nsrequest ? "Y":"N", status.mps_pdrequest ? "Y":"N",
4502				status.mps_srights ? "Y":"N", status.mps_sorights,
4503				status.mps_qlimit, status.mps_msgcount, status.mps_mscount,
4504				status.mps_seqno, str);
4505	} else if (type & (MACH_PORT_TYPE_SEND|MACH_PORT_TYPE_SEND_ONCE|
4506			MACH_PORT_TYPE_DEAD_NAME)) {
4507		_dispatch_log("machport[0x%08x] = { R(%03u) S(%03u) SO(%03u) D(%03u) "
4508				"dnreqs(%03u) spreq(%s) }: %s", name, nr, ns, nso, nd, dnreqs,
4509				type & MACH_PORT_TYPE_SPREQUEST ? "Y":"N", str);
4510	} else {
4511		_dispatch_log("machport[0x%08x] = { type(0x%08x) }: %s", name, type,
4512				str);
4513	}
4514}
4515
4516#endif // HAVE_MACH
4517
4518#endif // DISPATCH_DEBUG
4519