1/* yarn.c -- generic thread operations implemented using pthread functions 2 * Copyright (C) 2008, 2012 Mark Adler 3 * Version 1.3 13 Jan 2012 Mark Adler 4 * For conditions of distribution and use, see copyright notice in yarn.h 5 */ 6 7/* Basic thread operations implemented using the POSIX pthread library. All 8 pthread references are isolated within this module to allow alternate 9 implementations with other thread libraries. See yarn.h for the description 10 of these operations. */ 11 12/* Version history: 13 1.0 19 Oct 2008 First version 14 1.1 26 Oct 2008 No need to set the stack size -- remove 15 Add yarn_abort() function for clean-up on error exit 16 1.2 19 Dec 2011 (changes reversed in 1.3) 17 1.3 13 Jan 2012 Add large file #define for consistency with pigz.c 18 Update thread portability #defines per IEEE 1003.1-2008 19 Fix documentation in yarn.h for yarn_prefix 20 */ 21 22/* for thread portability */ 23#define _XOPEN_SOURCE 700 24#define _POSIX_C_SOURCE 200809L 25#define _THREAD_SAFE 26 27/* use large file functions if available */ 28#define _FILE_OFFSET_BITS 64 29 30/* external libraries and entities referenced */ 31#include <stdio.h> /* fprintf(), stderr */ 32#include <stdlib.h> /* exit(), malloc(), free(), NULL */ 33#include <pthread.h> /* pthread_t, pthread_create(), pthread_join(), */ 34 /* pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(), 35 PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(), 36 pthread_self(), pthread_equal(), 37 pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(), 38 pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(), 39 pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(), 40 pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() */ 41#include <errno.h> /* ENOMEM, EAGAIN, EINVAL */ 42 43/* interface definition */ 44#include "yarn.h" 45 46/* constants */ 47#define local static /* for non-exported functions and globals */ 48 49/* error handling external globals, resettable by application */ 50char *yarn_prefix = "yarn"; 51void (*yarn_abort)(int) = NULL; 52 53 54/* immediately exit -- use for errors that shouldn't ever happen */ 55local void fail(int err) 56{ 57 fprintf(stderr, "%s: %s (%d) -- aborting\n", yarn_prefix, 58 err == ENOMEM ? "out of memory" : "internal pthread error", err); 59 if (yarn_abort != NULL) 60 yarn_abort(err); 61 exit(err == ENOMEM || err == EAGAIN ? err : EINVAL); 62} 63 64/* memory handling routines provided by user -- if none are provided, malloc() 65 and free() are used, which are therefore assumed to be thread-safe */ 66typedef void *(*malloc_t)(size_t); 67typedef void (*free_t)(void *); 68local malloc_t my_malloc_f = malloc; 69local free_t my_free = free; 70 71/* use user-supplied allocation routines instead of malloc() and free() */ 72void yarn_mem(malloc_t lease, free_t vacate) 73{ 74 my_malloc_f = lease; 75 my_free = vacate; 76} 77 78/* memory allocation that cannot fail (from the point of view of the caller) */ 79local void *my_malloc(size_t size) 80{ 81 void *block; 82 83 if ((block = my_malloc_f(size)) == NULL) 84 fail(ENOMEM); 85 return block; 86} 87 88/* -- lock functions -- */ 89 90struct lock_s { 91 pthread_mutex_t mutex; 92 pthread_cond_t cond; 93 long value; 94}; 95 96lock *new_lock(long initial) 97{ 98 int ret; 99 lock *bolt; 100 101 bolt = my_malloc(sizeof(struct lock_s)); 102 if ((ret = pthread_mutex_init(&(bolt->mutex), NULL)) || 103 (ret = pthread_cond_init(&(bolt->cond), NULL))) 104 fail(ret); 105 bolt->value = initial; 106 return bolt; 107} 108 109void possess(lock *bolt) 110{ 111 int ret; 112 113 if ((ret = pthread_mutex_lock(&(bolt->mutex))) != 0) 114 fail(ret); 115} 116 117void release(lock *bolt) 118{ 119 int ret; 120 121 if ((ret = pthread_mutex_unlock(&(bolt->mutex))) != 0) 122 fail(ret); 123} 124 125void twist(lock *bolt, enum twist_op op, long val) 126{ 127 int ret; 128 129 if (op == TO) 130 bolt->value = val; 131 else if (op == BY) 132 bolt->value += val; 133 if ((ret = pthread_cond_broadcast(&(bolt->cond))) || 134 (ret = pthread_mutex_unlock(&(bolt->mutex)))) 135 fail(ret); 136} 137 138#define until(a) while(!(a)) 139 140void wait_for(lock *bolt, enum wait_op op, long val) 141{ 142 int ret; 143 144 switch (op) { 145 case TO_BE: 146 until (bolt->value == val) 147 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) 148 fail(ret); 149 break; 150 case NOT_TO_BE: 151 until (bolt->value != val) 152 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) 153 fail(ret); 154 break; 155 case TO_BE_MORE_THAN: 156 until (bolt->value > val) 157 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) 158 fail(ret); 159 break; 160 case TO_BE_LESS_THAN: 161 until (bolt->value < val) 162 if ((ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex))) != 0) 163 fail(ret); 164 } 165} 166 167long peek_lock(lock *bolt) 168{ 169 return bolt->value; 170} 171 172void free_lock(lock *bolt) 173{ 174 int ret; 175 if ((ret = pthread_cond_destroy(&(bolt->cond))) || 176 (ret = pthread_mutex_destroy(&(bolt->mutex)))) 177 fail(ret); 178 my_free(bolt); 179} 180 181/* -- thread functions (uses lock functions above) -- */ 182 183struct thread_s { 184 pthread_t id; 185 int done; /* true if this thread has exited */ 186 thread *next; /* for list of all launched threads */ 187}; 188 189/* list of threads launched but not joined, count of threads exited but not 190 joined (incremented by ignition() just before exiting) */ 191local lock threads_lock = { 192 PTHREAD_MUTEX_INITIALIZER, 193 PTHREAD_COND_INITIALIZER, 194 0 /* number of threads exited but not joined */ 195}; 196local thread *threads = NULL; /* list of extant threads */ 197 198/* structure in which to pass the probe and its payload to ignition() */ 199struct capsule { 200 void (*probe)(void *); 201 void *payload; 202}; 203 204/* mark the calling thread as done and alert join_all() */ 205local void reenter(void *dummy) 206{ 207 thread *match, **prior; 208 pthread_t me; 209 210 (void)dummy; 211 212 /* find this thread in the threads list by matching the thread id */ 213 me = pthread_self(); 214 possess(&(threads_lock)); 215 prior = &(threads); 216 while ((match = *prior) != NULL) { 217 if (pthread_equal(match->id, me)) 218 break; 219 prior = &(match->next); 220 } 221 if (match == NULL) 222 fail(EINVAL); 223 224 /* mark this thread as done and move it to the head of the list */ 225 match->done = 1; 226 if (threads != match) { 227 *prior = match->next; 228 match->next = threads; 229 threads = match; 230 } 231 232 /* update the count of threads to be joined and alert join_all() */ 233 twist(&(threads_lock), BY, +1); 234} 235 236/* all threads go through this routine so that just before the thread exits, 237 it marks itself as done in the threads list and alerts join_all() so that 238 the thread resources can be released -- use cleanup stack so that the 239 marking occurs even if the thread is cancelled */ 240local void *ignition(void *arg) 241{ 242 struct capsule *capsule = arg; 243 244 /* run reenter() before leaving */ 245 pthread_cleanup_push(reenter, NULL); 246 247 /* execute the requested function with argument */ 248 capsule->probe(capsule->payload); 249 my_free(capsule); 250 251 /* mark this thread as done and let join_all() know */ 252 pthread_cleanup_pop(1); 253 254 /* exit thread */ 255 return NULL; 256} 257 258/* not all POSIX implementations create threads as joinable by default, so that 259 is made explicit here */ 260thread *launch(void (*probe)(void *), void *payload) 261{ 262 int ret; 263 thread *th; 264 struct capsule *capsule; 265 pthread_attr_t attr; 266 267 /* construct the requested call and argument for the ignition() routine 268 (allocated instead of automatic so that we're sure this will still be 269 there when ignition() actually starts up -- ignition() will free this 270 allocation) */ 271 capsule = my_malloc(sizeof(struct capsule)); 272 capsule->probe = probe; 273 capsule->payload = payload; 274 275 /* assure this thread is in the list before join_all() or ignition() looks 276 for it */ 277 possess(&(threads_lock)); 278 279 /* create the thread and call ignition() from that thread */ 280 th = my_malloc(sizeof(struct thread_s)); 281 if ((ret = pthread_attr_init(&attr)) || 282 (ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE)) || 283 (ret = pthread_create(&(th->id), &attr, ignition, capsule)) || 284 (ret = pthread_attr_destroy(&attr))) 285 fail(ret); 286 287 /* put the thread in the threads list for join_all() */ 288 th->done = 0; 289 th->next = threads; 290 threads = th; 291 release(&(threads_lock)); 292 return th; 293} 294 295void join(thread *ally) 296{ 297 int ret; 298 thread *match, **prior; 299 300 /* wait for thread to exit and return its resources */ 301 if ((ret = pthread_join(ally->id, NULL)) != 0) 302 fail(ret); 303 304 /* find the thread in the threads list */ 305 possess(&(threads_lock)); 306 prior = &(threads); 307 while ((match = *prior) != NULL) { 308 if (match == ally) 309 break; 310 prior = &(match->next); 311 } 312 if (match == NULL) 313 fail(EINVAL); 314 315 /* remove thread from list and update exited count, free thread */ 316 if (match->done) 317 threads_lock.value--; 318 *prior = match->next; 319 release(&(threads_lock)); 320 my_free(ally); 321} 322 323/* This implementation of join_all() only attempts to join threads that have 324 announced that they have exited (see ignition()). When there are many 325 threads, this is faster than waiting for some random thread to exit while a 326 bunch of other threads have already exited. */ 327int join_all(void) 328{ 329 int ret, count; 330 thread *match, **prior; 331 332 /* grab the threads list and initialize the joined count */ 333 count = 0; 334 possess(&(threads_lock)); 335 336 /* do until threads list is empty */ 337 while (threads != NULL) { 338 /* wait until at least one thread has reentered */ 339 wait_for(&(threads_lock), NOT_TO_BE, 0); 340 341 /* find the first thread marked done (should be at or near the top) */ 342 prior = &(threads); 343 while ((match = *prior) != NULL) { 344 if (match->done) 345 break; 346 prior = &(match->next); 347 } 348 if (match == NULL) 349 fail(EINVAL); 350 351 /* join the thread (will be almost immediate), remove from the threads 352 list, update the reenter count, and free the thread */ 353 if ((ret = pthread_join(match->id, NULL)) != 0) 354 fail(ret); 355 threads_lock.value--; 356 *prior = match->next; 357 my_free(match); 358 count++; 359 } 360 361 /* let go of the threads list and return the number of threads joined */ 362 release(&(threads_lock)); 363 return count; 364} 365 366/* cancel and join the thread -- the thread will cancel when it gets to a file 367 operation, a sleep or pause, or a condition wait */ 368void destruct(thread *off_course) 369{ 370 int ret; 371 372 if ((ret = pthread_cancel(off_course->id)) != 0) 373 fail(ret); 374 join(off_course); 375} 376