1/* yarn.c -- generic thread operations implemented using pthread functions
2 * Copyright (C) 2008, 2012 Mark Adler
3 * Version 1.3  13 Jan 2012  Mark Adler
4 * For conditions of distribution and use, see copyright notice in yarn.h
5 */
6
7/* Basic thread operations implemented using the POSIX pthread library.  All
8   pthread references are isolated within this module to allow alternate
9   implementations with other thread libraries.  See yarn.h for the description
10   of these operations. */
11
12/* Version history:
13   1.0    19 Oct 2008  First version
14   1.1    26 Oct 2008  No need to set the stack size -- remove
15                       Add yarn_abort() function for clean-up on error exit
16   1.2    19 Dec 2011  (changes reversed in 1.3)
17   1.3    13 Jan 2012  Add large file #define for consistency with pigz.c
18                       Update thread portability #defines per IEEE 1003.1-2008
19                       Fix documentation in yarn.h for yarn_prefix
20 */
21
22/* for thread portability */
23#define _XOPEN_SOURCE 700
24#define _POSIX_C_SOURCE 200809L
25#define _THREAD_SAFE
26
27/* use large file functions if available */
28#define _FILE_OFFSET_BITS 64
29
30/* external libraries and entities referenced */
31#include <stdio.h>      /* fprintf(), stderr */
32#include <stdlib.h>     /* exit(), malloc(), free(), NULL */
33#include <pthread.h>    /* pthread_t, pthread_create(), pthread_join(), */
34    /* pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(),
35       PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(),
36       pthread_self(), pthread_equal(),
37       pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(),
38       pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(),
39       pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(),
40       pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() */
41#include <errno.h>      /* ENOMEM, EAGAIN, EINVAL */
42
43/* interface definition */
44#include "yarn.h"
45
46/* constants */
47#define local static            /* for non-exported functions and globals */
48
49/* error handling external globals, resettable by application */
50char *yarn_prefix = "yarn";
51void (*yarn_abort)(int) = NULL;
52
53
54/* immediately exit -- use for errors that shouldn't ever happen */
55local void fail(int err)
56{
57    fprintf(stderr, "%s: %s (%d) -- aborting\n", yarn_prefix,
58            err == ENOMEM ? "out of memory" : "internal pthread error", err);
59    if (yarn_abort != NULL)
60        yarn_abort(err);
61    exit(err == ENOMEM || err == EAGAIN ? err : EINVAL);
62}
63
64/* memory handling routines provided by user -- if none are provided, malloc()
65   and free() are used, which are therefore assumed to be thread-safe */
66typedef void *(*malloc_t)(size_t);
67typedef void (*free_t)(void *);
68local malloc_t my_malloc_f = malloc;
69local free_t my_free = free;
70
71/* use user-supplied allocation routines instead of malloc() and free() */
72void yarn_mem(malloc_t lease, free_t vacate)
73{
74    my_malloc_f = lease;
75    my_free = vacate;
76}
77
78/* memory allocation that cannot fail (from the point of view of the caller) */
79local void *my_malloc(size_t size)
80{
81    void *block;
82
83    if ((block = my_malloc_f(size)) == NULL)
84        fail(ENOMEM);
85    return block;
86}
87
88/* -- lock functions -- */
89
90struct lock_s {
91    pthread_mutex_t mutex;
92    pthread_cond_t cond;
93    long value;
94};
95
96lock *new_lock(long initial)
97{
98    int ret;
99    lock *bolt;
100
101    bolt = my_malloc(sizeof(struct lock_s));
102    if ((ret = pthread_mutex_init(&(bolt->mutex), NULL)) ||
103        (ret = pthread_cond_init(&(bolt->cond), NULL)))
104        fail(ret);
105    bolt->value = initial;
106    return bolt;
107}
108
109void possess(lock *bolt)
110{
111    int ret;
112
113    if ((ret = pthread_mutex_lock(&(bolt->mutex))) != 0)
114        fail(ret);
115}
116
117void release(lock *bolt)
118{
119    int ret;
120
121    if ((ret = pthread_mutex_unlock(&(bolt->mutex))) != 0)
122        fail(ret);
123}
124
125void twist(lock *bolt, enum twist_op op, long val)
126{
127    int ret;
128
129    if (op == TO)
130        bolt->value = val;
131    else if (op == BY)
132        bolt->value += val;
133    if ((ret = pthread_cond_broadcast(&(bolt->cond))) ||
134        (ret = pthread_mutex_unlock(&(bolt->mutex))))
135        fail(ret);
136}
137
138#define until(a) while(!(a))
139
140void wait_for(lock *bolt, enum wait_op op, long val)
141{
142    int ret;
143
144    switch (op) {
145    case TO_BE:
146        until (bolt->value == val)
147            if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
148                fail(ret);
149        break;
150    case NOT_TO_BE:
151        until (bolt->value != val)
152            if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
153                fail(ret);
154        break;
155    case TO_BE_MORE_THAN:
156        until (bolt->value > val)
157            if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
158                fail(ret);
159        break;
160    case TO_BE_LESS_THAN:
161        until (bolt->value < val)
162            if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0)
163                fail(ret);
164    }
165}
166
167long peek_lock(lock *bolt)
168{
169    return bolt->value;
170}
171
172void free_lock(lock *bolt)
173{
174    int ret;
175    if ((ret = pthread_cond_destroy(&(bolt->cond))) ||
176        (ret = pthread_mutex_destroy(&(bolt->mutex))))
177        fail(ret);
178    my_free(bolt);
179}
180
181/* -- thread functions (uses lock functions above) -- */
182
183struct thread_s {
184    pthread_t id;
185    int done;                   /* true if this thread has exited */
186    thread *next;               /* for list of all launched threads */
187};
188
189/* list of threads launched but not joined, count of threads exited but not
190   joined (incremented by ignition() just before exiting) */
191local lock threads_lock = {
192    PTHREAD_MUTEX_INITIALIZER,
193    PTHREAD_COND_INITIALIZER,
194    0                           /* number of threads exited but not joined */
195};
196local thread *threads = NULL;       /* list of extant threads */
197
198/* structure in which to pass the probe and its payload to ignition() */
199struct capsule {
200    void (*probe)(void *);
201    void *payload;
202};
203
204/* mark the calling thread as done and alert join_all() */
205local void reenter(void *dummy)
206{
207    thread *match, **prior;
208    pthread_t me;
209
210    (void)dummy;
211
212    /* find this thread in the threads list by matching the thread id */
213    me = pthread_self();
214    possess(&(threads_lock));
215    prior = &(threads);
216    while ((match = *prior) != NULL) {
217        if (pthread_equal(match->id, me))
218            break;
219        prior = &(match->next);
220    }
221    if (match == NULL)
222        fail(EINVAL);
223
224    /* mark this thread as done and move it to the head of the list */
225    match->done = 1;
226    if (threads != match) {
227        *prior = match->next;
228        match->next = threads;
229        threads = match;
230    }
231
232    /* update the count of threads to be joined and alert join_all() */
233    twist(&(threads_lock), BY, +1);
234}
235
236/* all threads go through this routine so that just before the thread exits,
237   it marks itself as done in the threads list and alerts join_all() so that
238   the thread resources can be released -- use cleanup stack so that the
239   marking occurs even if the thread is cancelled */
240local void *ignition(void *arg)
241{
242    struct capsule *capsule = arg;
243
244    /* run reenter() before leaving */
245    pthread_cleanup_push(reenter, NULL);
246
247    /* execute the requested function with argument */
248    capsule->probe(capsule->payload);
249    my_free(capsule);
250
251    /* mark this thread as done and let join_all() know */
252    pthread_cleanup_pop(1);
253
254    /* exit thread */
255    return NULL;
256}
257
258/* not all POSIX implementations create threads as joinable by default, so that
259   is made explicit here */
260thread *launch(void (*probe)(void *), void *payload)
261{
262    int ret;
263    thread *th;
264    struct capsule *capsule;
265    pthread_attr_t attr;
266
267    /* construct the requested call and argument for the ignition() routine
268       (allocated instead of automatic so that we're sure this will still be
269       there when ignition() actually starts up -- ignition() will free this
270       allocation) */
271    capsule = my_malloc(sizeof(struct capsule));
272    capsule->probe = probe;
273    capsule->payload = payload;
274
275    /* assure this thread is in the list before join_all() or ignition() looks
276       for it */
277    possess(&(threads_lock));
278
279    /* create the thread and call ignition() from that thread */
280    th = my_malloc(sizeof(struct thread_s));
281    if ((ret = pthread_attr_init(&attr)) ||
282        (ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE)) ||
283        (ret = pthread_create(&(th->id), &attr, ignition, capsule)) ||
284        (ret = pthread_attr_destroy(&attr)))
285        fail(ret);
286
287    /* put the thread in the threads list for join_all() */
288    th->done = 0;
289    th->next = threads;
290    threads = th;
291    release(&(threads_lock));
292    return th;
293}
294
295void join(thread *ally)
296{
297    int ret;
298    thread *match, **prior;
299
300    /* wait for thread to exit and return its resources */
301    if ((ret = pthread_join(ally->id, NULL)) != 0)
302        fail(ret);
303
304    /* find the thread in the threads list */
305    possess(&(threads_lock));
306    prior = &(threads);
307    while ((match = *prior) != NULL) {
308        if (match == ally)
309            break;
310        prior = &(match->next);
311    }
312    if (match == NULL)
313        fail(EINVAL);
314
315    /* remove thread from list and update exited count, free thread */
316    if (match->done)
317        threads_lock.value--;
318    *prior = match->next;
319    release(&(threads_lock));
320    my_free(ally);
321}
322
323/* This implementation of join_all() only attempts to join threads that have
324   announced that they have exited (see ignition()).  When there are many
325   threads, this is faster than waiting for some random thread to exit while a
326   bunch of other threads have already exited. */
327int join_all(void)
328{
329    int ret, count;
330    thread *match, **prior;
331
332    /* grab the threads list and initialize the joined count */
333    count = 0;
334    possess(&(threads_lock));
335
336    /* do until threads list is empty */
337    while (threads != NULL) {
338        /* wait until at least one thread has reentered */
339        wait_for(&(threads_lock), NOT_TO_BE, 0);
340
341        /* find the first thread marked done (should be at or near the top) */
342        prior = &(threads);
343        while ((match = *prior) != NULL) {
344            if (match->done)
345                break;
346            prior = &(match->next);
347        }
348        if (match == NULL)
349            fail(EINVAL);
350
351        /* join the thread (will be almost immediate), remove from the threads
352           list, update the reenter count, and free the thread */
353        if ((ret = pthread_join(match->id, NULL)) != 0)
354            fail(ret);
355        threads_lock.value--;
356        *prior = match->next;
357        my_free(match);
358        count++;
359    }
360
361    /* let go of the threads list and return the number of threads joined */
362    release(&(threads_lock));
363    return count;
364}
365
366/* cancel and join the thread -- the thread will cancel when it gets to a file
367   operation, a sleep or pause, or a condition wait */
368void destruct(thread *off_course)
369{
370    int ret;
371
372    if ((ret = pthread_cancel(off_course->id)) != 0)
373        fail(ret);
374    join(off_course);
375}
376