1/*
2 * Copyright (c) 1998-2013 Apple Inc. All rights reserved.
3 *
4 * @APPLE_LICENSE_HEADER_START@
5 *
6 * This file contains Original Code and/or Modifications of Original Code
7 * as defined in and that are subject to the Apple Public Source License
8 * Version 2.0 (the 'License'). You may not use this file except in
9 * compliance with the License. Please obtain a copy of the License at
10 * http://www.opensource.apple.com/apsl/ and read it before using this
11 * file.
12 *
13 * The Original Code and all software distributed under the License are
14 * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER
15 * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES,
16 * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY,
17 * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT.
18 * Please see the License for the specific language governing rights and
19 * limitations under the License.
20 *
21 * @APPLE_LICENSE_HEADER_END@
22 */
23
24#include "DAThread.h"
25
26#include <pthread.h>
27#include <sysexits.h>
28#include <mach/mach.h>
29
30enum
31{
32    __kDAThreadRunLoopSourceJobKindExecute = 0x00000001
33};
34
35typedef UInt32 __DAThreadRunLoopSourceJobKind;
36
37struct __DAThreadRunLoopSourceJob
38{
39    __DAThreadRunLoopSourceJobKind      kind;
40    struct __DAThreadRunLoopSourceJob * next;
41
42    union
43    {
44        struct
45        {
46            Boolean                 exited;
47            int                     status;
48            pthread_t               thread;
49            DAThreadExecuteCallback callback;
50            void *                  callbackContext;
51            DAThreadFunction        function;
52            void *                  functionContext;
53        } execute;
54    };
55};
56
57typedef struct __DAThreadRunLoopSourceJob __DAThreadRunLoopSourceJob;
58
59static __DAThreadRunLoopSourceJob * __gDAThreadRunLoopSourceJobs = NULL;
60static pthread_mutex_t              __gDAThreadRunLoopSourceLock = PTHREAD_MUTEX_INITIALIZER;
61static CFMachPortRef                __gDAThreadRunLoopSourcePort = NULL;
62
63static void * __DAThreadFunction( void * context )
64{
65    /*
66     * Run a thread.
67     */
68
69    __DAThreadRunLoopSourceJob * job;
70
71    pthread_mutex_lock( &__gDAThreadRunLoopSourceLock );
72
73    for ( job = __gDAThreadRunLoopSourceJobs; job; job = job->next )
74    {
75        assert( job->kind == __kDAThreadRunLoopSourceJobKindExecute );
76
77        if ( pthread_equal( job->execute.thread, pthread_self( ) ) )
78        {
79            break;
80        }
81    }
82
83    pthread_mutex_unlock( &__gDAThreadRunLoopSourceLock );
84
85    if ( job )
86    {
87        mach_msg_header_t message;
88        kern_return_t     status;
89
90        job->execute.status = ( ( DAThreadFunction ) job->execute.function )( job->execute.functionContext );
91
92        pthread_mutex_lock( &__gDAThreadRunLoopSourceLock );
93
94        job->execute.exited = TRUE;
95
96        pthread_mutex_unlock( &__gDAThreadRunLoopSourceLock );
97
98        message.msgh_bits        = MACH_MSGH_BITS( MACH_MSG_TYPE_COPY_SEND, 0 );
99        message.msgh_id          = 0;
100        message.msgh_local_port  = MACH_PORT_NULL;
101        message.msgh_remote_port = CFMachPortGetPort( __gDAThreadRunLoopSourcePort );
102        message.msgh_reserved    = 0;
103        message.msgh_size        = sizeof( message );
104
105        status = mach_msg( &message, MACH_SEND_MSG | MACH_SEND_TIMEOUT, message.msgh_size, 0, MACH_PORT_NULL, 0, MACH_PORT_NULL );
106
107        if ( status == MACH_SEND_TIMED_OUT )
108        {
109            mach_msg_destroy( &message );
110        }
111    }
112
113    pthread_detach( pthread_self( ) );
114
115    return NULL;
116}
117
118static void __DAThreadRunLoopSourceCallback( CFMachPortRef port, void * message, CFIndex messageSize, void * info )
119{
120    /*
121     * Process a DAThread CFRunLoopSource fire.
122     */
123
124    __DAThreadRunLoopSourceJob * job     = NULL;
125    __DAThreadRunLoopSourceJob * jobLast = NULL;
126
127    pthread_mutex_lock( &__gDAThreadRunLoopSourceLock );
128
129    /*
130     * Scan through job list.
131     */
132
133    for ( job = __gDAThreadRunLoopSourceJobs; job; jobLast = NULL )
134    {
135        for ( job = __gDAThreadRunLoopSourceJobs; job; jobLast = job, job = job->next )
136        {
137            assert( job->kind == __kDAThreadRunLoopSourceJobKindExecute );
138
139            if ( job->execute.exited )
140            {
141                /*
142                 * Process the job's callback.
143                 */
144
145                if ( jobLast )
146                {
147                    jobLast->next = job->next;
148                }
149                else
150                {
151                    __gDAThreadRunLoopSourceJobs = job->next;
152                }
153
154                pthread_mutex_unlock( &__gDAThreadRunLoopSourceLock );
155
156                /*
157                 * Issue the callback.
158                 */
159
160                if ( job->execute.callback )
161                {
162                    ( job->execute.callback )( job->execute.status, job->execute.callbackContext );
163                }
164
165                /*
166                 * Release our resources.
167                 */
168
169                free( job );
170
171                pthread_mutex_lock( &__gDAThreadRunLoopSourceLock );
172
173                break;
174            }
175        }
176    }
177
178    pthread_mutex_unlock( &__gDAThreadRunLoopSourceLock );
179}
180
181CFRunLoopSourceRef DAThreadCreateRunLoopSource( CFAllocatorRef allocator, CFIndex order )
182{
183    /*
184     * Create a CFRunLoopSource for DAThread callbacks.
185     */
186
187    CFRunLoopSourceRef source = NULL;
188
189    /*
190     * Initialize our minimal state.
191     */
192
193    if ( __gDAThreadRunLoopSourcePort == NULL )
194    {
195        /*
196         * Create the global CFMachPort.  It will be used to post jobs to the run loop.
197         */
198
199        __gDAThreadRunLoopSourcePort = CFMachPortCreate( kCFAllocatorDefault, __DAThreadRunLoopSourceCallback, NULL, NULL );
200
201        if ( __gDAThreadRunLoopSourcePort )
202        {
203            /*
204             * Set up the global CFMachPort.  It requires no more than one queue element.
205             */
206
207            mach_port_limits_t limits = { 0 };
208
209            limits.mpl_qlimit = 1;
210
211            mach_port_set_attributes( mach_task_self( ),
212                                      CFMachPortGetPort( __gDAThreadRunLoopSourcePort ),
213                                      MACH_PORT_LIMITS_INFO,
214                                      ( mach_port_info_t ) &limits,
215                                      MACH_PORT_LIMITS_INFO_COUNT );
216        }
217    }
218
219    /*
220     * Obtain the CFRunLoopSource for our CFMachPort.
221     */
222
223    if ( __gDAThreadRunLoopSourcePort )
224    {
225        source = CFMachPortCreateRunLoopSource( allocator, __gDAThreadRunLoopSourcePort, order );
226    }
227
228    return source;
229}
230
231void DAThreadExecute( DAThreadFunction function, void * functionContext, DAThreadExecuteCallback callback, void * callbackContext )
232{
233    /*
234     * Execute a thread.
235     */
236
237    pthread_t thread;
238    int       status;
239
240    /*
241     * State our assumptions.
242     */
243
244    assert( __gDAThreadRunLoopSourcePort );
245
246    /*
247     * Run the thread.
248     */
249
250    pthread_mutex_lock( &__gDAThreadRunLoopSourceLock );
251
252    status = pthread_create( &thread, NULL, __DAThreadFunction, NULL );
253
254    if ( status == 0 )
255    {
256        /*
257         * Register this callback job on our queue.
258         */
259
260        __DAThreadRunLoopSourceJob * job;
261
262        job = malloc( sizeof( __DAThreadRunLoopSourceJob ) );
263
264        if ( job )
265        {
266            job->kind = __kDAThreadRunLoopSourceJobKindExecute;
267            job->next = __gDAThreadRunLoopSourceJobs;
268
269            job->execute.exited          = FALSE;
270            job->execute.status          = 0;
271            job->execute.thread          = thread;
272            job->execute.callback        = callback;
273            job->execute.callbackContext = callbackContext;
274            job->execute.function        = function;
275            job->execute.functionContext = functionContext;
276
277            __gDAThreadRunLoopSourceJobs = job;
278        }
279    }
280
281    pthread_mutex_unlock( &__gDAThreadRunLoopSourceLock );
282
283    /*
284     * Complete the call in case we had a local failure.
285     */
286
287    if ( status )
288    {
289        if ( callback )
290        {
291            ( callback )( EX_OSERR, callbackContext );
292        }
293    }
294}
295