/* * Copyright (c) 2009 Apple Inc. All rights reserved. * * @APPLE_OSREFERENCE_LICENSE_HEADER_START@ * * This file contains Original Code and/or Modifications of Original Code * as defined in and that are subject to the Apple Public Source License * Version 2.0 (the 'License'). You may not use this file except in * compliance with the License. The rights granted to you under the License * may not be used to create, or enable the creation or redistribution of, * unlawful or unlicensed copies of an Apple operating system, or to * circumvent, violate, or enable the circumvention or violation of, any * terms of an Apple operating system software license agreement. * * Please obtain a copy of the License at * http://www.opensource.apple.com/apsl/ and read it before using this file. * * The Original Code and all software distributed under the License are * distributed on an 'AS IS' basis, WITHOUT WARRANTY OF ANY KIND, EITHER * EXPRESS OR IMPLIED, AND APPLE HEREBY DISCLAIMS ALL SUCH WARRANTIES, * INCLUDING WITHOUT LIMITATION, ANY WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE, QUIET ENJOYMENT OR NON-INFRINGEMENT. * Please see the License for the specific language governing rights and * limitations under the License. * * @APPLE_OSREFERENCE_LICENSE_HEADER_END@ */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include typedef enum wake_type { WAKE_BROADCAST_ONESEM, WAKE_BROADCAST_PERTHREAD, WAKE_CHAIN } wake_type_t; typedef enum my_policy_type { MY_POLICY_REALTIME, MY_POLICY_TIMESHARE, MY_POLICY_FIXEDPRI } my_policy_type_t; #define assert(truth, label) do { if(!(truth)) { printf("Thread %p: failure on line %d\n", pthread_self(), __LINE__); goto label; } } while (0) #define CONSTRAINT_NANOS (20000000ll) /* 20 ms */ #define COMPUTATION_NANOS (10000000ll) /* 10 ms */ #define TRACEWORTHY_NANOS (10000000ll) /* 10 ms */ #if DEBUG #define debug_log(args...) printf(args) #else #define debug_log(args...) do { } while(0) #endif /* Declarations */ void* child_thread_func(void *arg); void print_usage(); int thread_setup(); my_policy_type_t parse_thread_policy(const char *str); int thread_finish_iteration(); /* Global variables (general) */ int g_numthreads; wake_type_t g_waketype; policy_t g_policy; int g_iterations; struct mach_timebase_info g_mti; semaphore_t g_main_sem; uint64_t *g_thread_endtimes_abs; volatile int32_t g_done_threads; boolean_t g_do_spin = FALSE; boolean_t g_verbose = FALSE; uint64_t g_starttime_abs; #if MIMIC_DIGI_LEAD_TIME int g_long_spinid; uint64_t g_spinlength_abs; #endif /* MIMIC_DIGI_LEAD_TIME */ /* Global variables (broadcast) */ semaphore_t g_machsem; semaphore_t g_leadersem; /* Global variables (chain) */ semaphore_t *g_semarr; uint64_t abs_to_nanos(uint64_t abstime) { return (uint64_t)(abstime * (((double)g_mti.numer) / ((double)g_mti.denom))); } uint64_t nanos_to_abs(uint64_t ns) { return (uint64_t)(ns * (((double)g_mti.denom) / ((double)g_mti.numer))); } /* * Figure out what thread policy to use */ my_policy_type_t parse_thread_policy(const char *str) { if (strcmp(str, "timeshare") == 0) { return MY_POLICY_TIMESHARE; } else if (strcmp(str, "realtime") == 0) { return MY_POLICY_REALTIME; } else if (strcmp(str, "fixed") == 0) { return MY_POLICY_FIXEDPRI; } else { printf("Invalid thread policy %s\n", str); exit(1); } } /* * Figure out what wakeup pattern to use */ wake_type_t parse_wakeup_pattern(const char *str) { if (strcmp(str, "chain") == 0) { return WAKE_CHAIN; } else if (strcmp(str, "broadcast-single-sem") == 0) { return WAKE_BROADCAST_ONESEM; } else if (strcmp(str, "broadcast-per-thread") == 0) { return WAKE_BROADCAST_PERTHREAD; } else { print_usage(); exit(1); } } /* * Set policy */ int thread_setup() { int res; switch (g_policy) { case MY_POLICY_TIMESHARE: { return 0; } case MY_POLICY_REALTIME: { thread_time_constraint_policy_data_t pol; /* Hard-coded realtime parameters (similar to what Digi uses) */ pol.period = 100000; pol.constraint = nanos_to_abs(CONSTRAINT_NANOS); pol.computation = nanos_to_abs(COMPUTATION_NANOS); pol.preemptible = 0; /* Ignored by OS */ res = thread_policy_set(mach_thread_self(), THREAD_TIME_CONSTRAINT_POLICY, (thread_policy_t) &pol, THREAD_TIME_CONSTRAINT_POLICY_COUNT); assert(res == 0, fail); break; } case MY_POLICY_FIXEDPRI: { thread_extended_policy_data_t pol; pol.timeshare = 0; res = thread_policy_set(mach_thread_self(), THREAD_EXTENDED_POLICY, (thread_policy_t) &pol, THREAD_EXTENDED_POLICY_COUNT); assert(res == 0, fail); break; } default: { printf("invalid policy type\n"); return 1; } } return 0; fail: return 1; } /* * Wake up main thread if everyone's done */ int thread_finish_iteration(int id) { int32_t new; int res = 0; volatile float x = 0.0; volatile float y = 0.0; debug_log("Thread %p finished iteration.\n", pthread_self()); #if MIMIC_DIGI_LEAD_TIME /* * One randomly chosen thread determines when everybody gets to stop. */ if (g_do_spin) { if (g_long_spinid == id) { uint64_t endspin; /* This thread took up fully half of his computation */ endspin = g_starttime_abs + g_spinlength_abs; while (mach_absolute_time() < endspin) { y = y + 1.5 + x; x = sqrt(y); } } } #endif /* MIMIC_DIGI_LEAD_TIME */ new = OSAtomicIncrement32(&g_done_threads); debug_log("New value is %d\n", new); /* * When the last thread finishes, everyone gets to go back to sleep. */ if (new == g_numthreads) { debug_log("Thread %p signalling main thread.\n", pthread_self()); res = semaphore_signal(g_main_sem); } else { if (g_do_spin) { while (g_done_threads < g_numthreads) { y = y + 1.5 + x; x = sqrt(y); } } } return res; } /* * Wait for a wakeup, potentially wake up another of the "0-N" threads, * and notify the main thread when done. */ void* child_thread_func(void *arg) { int my_id = (int)(uintptr_t)arg; int res; int i, j; int32_t new; /* Set policy and so forth */ thread_setup(); /* Tell main thread when everyone has set up */ new = OSAtomicIncrement32(&g_done_threads); if (new == g_numthreads) { semaphore_signal(g_main_sem); } /* For each iteration */ for (i = 0; i < g_iterations; i++) { /* * Leader thread either wakes everyone up or starts the chain going. */ if (my_id == 0) { res = semaphore_wait(g_leadersem); assert(res == 0, fail); g_thread_endtimes_abs[my_id] = mach_absolute_time(); #if MIMIC_DIGI_LEAD_TIME g_long_spinid = rand() % g_numthreads; #endif /* MIMIC_DIGI_LEAD_TIME */ switch (g_waketype) { case WAKE_CHAIN: semaphore_signal(g_semarr[my_id + 1]); break; case WAKE_BROADCAST_ONESEM: semaphore_signal_all(g_machsem); break; case WAKE_BROADCAST_PERTHREAD: for (j = 1; j < g_numthreads; j++) { semaphore_signal(g_semarr[j]); } break; default: printf("Invalid wakeup type?!\n"); exit(1); } } else { /* * Everyone else waits to be woken up, * records when they wake up, and possibly * wakes up a friend. */ switch(g_waketype) { case WAKE_BROADCAST_ONESEM: res = semaphore_wait(g_machsem); assert(res == KERN_SUCCESS, fail); g_thread_endtimes_abs[my_id] = mach_absolute_time(); break; /* * For the chain wakeup case: * wait, record time, signal next thread if appropriate */ case WAKE_BROADCAST_PERTHREAD: res = semaphore_wait(g_semarr[my_id]); assert(res == 0, fail); g_thread_endtimes_abs[my_id] = mach_absolute_time(); break; case WAKE_CHAIN: res = semaphore_wait(g_semarr[my_id]); assert(res == 0, fail); g_thread_endtimes_abs[my_id] = mach_absolute_time(); if (my_id < (g_numthreads - 1)) { res = semaphore_signal(g_semarr[my_id + 1]); assert(res == 0, fail); } break; default: printf("Invalid wake type.\n"); goto fail; } } res = thread_finish_iteration(my_id); assert(res == 0, fail); } return 0; fail: exit(1); } /* * Admittedly not very attractive. */ void print_usage() { printf("Usage: zn [-trace ] [-spin] [-verbose]\n"); } /* * Given an array of uint64_t values, compute average, max, min, and standard deviation */ void compute_stats(uint64_t *values, uint64_t count, float *averagep, uint64_t *maxp, uint64_t *minp, float *stddevp) { int i; uint64_t _sum = 0; uint64_t _max = 0; uint64_t _min = UINT64_MAX; float _avg = 0; float _dev = 0; for (i = 0; i < count; i++) { _sum += values[i]; _max = values[i] > _max ? values[i] : _max; _min = values[i] < _min ? values[i] : _min; } _avg = ((float)_sum) / ((float)count); _dev = 0; for (i = 0; i < count; i++) { _dev += powf((((float)values[i]) - _avg), 2); } _dev /= count; _dev = sqrtf(_dev); *averagep = _avg; *maxp = _max; *minp = _min; *stddevp = _dev; } int main(int argc, char **argv) { int i; int res; pthread_t *threads; uint64_t *worst_latencies_ns; uint64_t *worst_latencies_from_first_ns; uint64_t last_end; uint64_t max, min; uint64_t traceworthy_latency_ns = TRACEWORTHY_NANOS; float avg, stddev; srand(time(NULL)); if (argc < 5 || argc > 9) { print_usage(); goto fail; } /* How many threads? */ g_numthreads = atoi(argv[1]); /* What wakeup pattern? */ g_waketype = parse_wakeup_pattern(argv[2]); /* Policy */ g_policy = parse_thread_policy(argv[3]); /* Iterations */ g_iterations = atoi(argv[4]); /* Optional args */ for (i = 5; i < argc; i++) { if (strcmp(argv[i], "-spin") == 0) { g_do_spin = TRUE; } else if (strcmp(argv[i], "-verbose") == 0) { g_verbose = TRUE; } else if ((strcmp(argv[i], "-trace") == 0) && (i < (argc - 1))) { traceworthy_latency_ns = strtoull(argv[++i], NULL, 10); } else { print_usage(); goto fail; } } mach_timebase_info(&g_mti); #if MIMIC_DIGI_LEAD_TIME g_spinlength_abs = nanos_to_abs(COMPUTATION_NANOS) / 2; #endif /* MIMIC_DIGI_LEAD_TIME */ /* Arrays for threads and their wakeup times */ threads = (pthread_t*) malloc(sizeof(pthread_t) * g_numthreads); assert(threads, fail); g_thread_endtimes_abs = (uint64_t*) malloc(sizeof(uint64_t) * g_numthreads); assert(g_thread_endtimes_abs, fail); worst_latencies_ns = (uint64_t*) malloc(sizeof(uint64_t) * g_iterations); assert(worst_latencies_ns, fail); worst_latencies_from_first_ns = (uint64_t*) malloc(sizeof(uint64_t) * g_iterations); assert(worst_latencies_from_first_ns, fail); res = semaphore_create(mach_task_self(), &g_main_sem, SYNC_POLICY_FIFO, 0); assert(res == KERN_SUCCESS, fail); /* Either one big semaphore or one per thread */ if (g_waketype == WAKE_CHAIN || g_waketype == WAKE_BROADCAST_PERTHREAD) { g_semarr = malloc(sizeof(semaphore_t) * g_numthreads); assert(g_semarr != NULL, fail); for (i = 0; i < g_numthreads; i++) { res = semaphore_create(mach_task_self(), &g_semarr[i], SYNC_POLICY_FIFO, 0); assert(res == KERN_SUCCESS, fail); } g_leadersem = g_semarr[0]; } else { res = semaphore_create(mach_task_self(), &g_machsem, SYNC_POLICY_FIFO, 0); assert(res == KERN_SUCCESS, fail); res = semaphore_create(mach_task_self(), &g_leadersem, SYNC_POLICY_FIFO, 0); assert(res == KERN_SUCCESS, fail); } /* Create the threads */ g_done_threads = 0; for (i = 0; i < g_numthreads; i++) { res = pthread_create(&threads[i], NULL, child_thread_func, (void*)(uintptr_t)i); assert(res == 0, fail); } /* Let everyone get settled */ semaphore_wait(g_main_sem); sleep(1); /* Go! */ for (i = 0; i < g_iterations; i++) { int j; uint64_t worst_abs = 0, best_abs = UINT64_MAX; g_done_threads = 0; OSMemoryBarrier(); g_starttime_abs = mach_absolute_time(); /* Fire them off */ semaphore_signal(g_leadersem); /* Wait for worker threads to finish */ semaphore_wait(g_main_sem); assert(res == KERN_SUCCESS, fail); /* * We report the worst latencies relative to start time * and relative to the lead worker thread. */ for (j = 0; j < g_numthreads; j++) { uint64_t latency_abs; latency_abs = g_thread_endtimes_abs[j] - g_starttime_abs; worst_abs = worst_abs < latency_abs ? latency_abs : worst_abs; } worst_latencies_ns[i] = abs_to_nanos(worst_abs); worst_abs = 0; for (j = 1; j < g_numthreads; j++) { uint64_t latency_abs; latency_abs = g_thread_endtimes_abs[j] - g_thread_endtimes_abs[0]; worst_abs = worst_abs < latency_abs ? latency_abs : worst_abs; best_abs = best_abs > latency_abs ? latency_abs : best_abs; } worst_latencies_from_first_ns[i] = abs_to_nanos(worst_abs); /* * In the event of a bad run, cut a trace point. */ if (worst_latencies_from_first_ns[i] > traceworthy_latency_ns) { int _tmp; if (g_verbose) { printf("Worst on this round was %.2f us.\n", ((float)worst_latencies_from_first_ns[i]) / 1000.0); } _tmp = syscall(SYS_kdebug_trace, 0xEEEEEEEE, 0, 0, 0, 0); } /* Let worker threads get back to sleep... */ usleep(g_numthreads * 10); } /* Rejoin threads */ last_end = 0; for (i = 0; i < g_numthreads; i++) { res = pthread_join(threads[i], NULL); assert(res == 0, fail); } compute_stats(worst_latencies_ns, g_iterations, &avg, &max, &min, &stddev); printf("Results (from a stop):\n"); printf("Max:\t\t%.2f us\n", ((float)max) / 1000.0); printf("Min:\t\t%.2f us\n", ((float)min) / 1000.0); printf("Avg:\t\t%.2f us\n", avg / 1000.0); printf("Stddev:\t\t%.2f us\n", stddev / 1000.0); putchar('\n'); compute_stats(worst_latencies_from_first_ns, g_iterations, &avg, &max, &min, &stddev); printf("Results (relative to first thread):\n"); printf("Max:\t\t%.2f us\n", ((float)max) / 1000.0); printf("Min:\t\t%.2f us\n", ((float)min) / 1000.0); printf("Avg:\t\t%.2f us\n", avg / 1000.0); printf("Stddev:\t\t%.2f us\n", stddev / 1000.0); #if 0 for (i = 0; i < g_iterations; i++) { printf("Iteration %d: %f us\n", i, worst_latencies_ns[i] / 1000.0); } #endif return 0; fail: return 1; }