1/* 2 * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions 6 * are met: 7 * 1. Redistributions of source code must retain the above copyright 8 * notice, this list of conditions and the following disclaimer. 9 * 2. Redistributions in binary form must reproduce the above copyright 10 * notice, this list of conditions and the following disclaimer in the 11 * documentation and/or other materials provided with the distribution. 12 * 3. The name of the author may not be used to endorse or promote products 13 * derived from this software without specific prior written permission. 14 * 15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 16 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 17 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 18 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 19 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 20 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 21 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 22 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 23 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 24 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 25 */ 26 27/* The old tests here need assertions to work. */ 28#undef NDEBUG 29 30#include "event2/event-config.h" 31 32#include <sys/types.h> 33#include <stdio.h> 34#include <stdlib.h> 35#include <string.h> 36#ifdef _EVENT_HAVE_UNISTD_H 37#include <unistd.h> 38#endif 39#ifdef _EVENT_HAVE_SYS_WAIT_H 40#include <sys/wait.h> 41#endif 42 43#ifdef _EVENT_HAVE_PTHREADS 44#include <pthread.h> 45#elif defined(WIN32) 46#include <process.h> 47#endif 48#include <assert.h> 49#ifdef _EVENT_HAVE_UNISTD_H 50#include <unistd.h> 51#endif 52#include <time.h> 53 54#include "sys/queue.h" 55 56#include "event2/util.h" 57#include "event2/event.h" 58#include "event2/event_struct.h" 59#include "event2/thread.h" 60#include "evthread-internal.h" 61#include "event-internal.h" 62#include "defer-internal.h" 63#include "regress.h" 64#include "tinytest_macros.h" 65 66#ifdef _EVENT_HAVE_PTHREADS 67#define THREAD_T pthread_t 68#define THREAD_FN void * 69#define THREAD_RETURN() return (NULL) 70#define THREAD_START(threadvar, fn, arg) \ 71 pthread_create(&(threadvar), NULL, fn, arg) 72#define THREAD_JOIN(th) pthread_join(th, NULL) 73#else 74#define THREAD_T HANDLE 75#define THREAD_FN unsigned __stdcall 76#define THREAD_RETURN() return (0) 77#define THREAD_START(threadvar, fn, arg) do { \ 78 uintptr_t threadhandle = _beginthreadex(NULL,0,fn,(arg),0,NULL); \ 79 (threadvar) = (HANDLE) threadhandle; \ 80 } while (0) 81#define THREAD_JOIN(th) WaitForSingleObject(th, INFINITE) 82#endif 83 84struct cond_wait { 85 void *lock; 86 void *cond; 87}; 88 89static void 90wake_all_timeout(evutil_socket_t fd, short what, void *arg) 91{ 92 struct cond_wait *cw = arg; 93 EVLOCK_LOCK(cw->lock, 0); 94 EVTHREAD_COND_BROADCAST(cw->cond); 95 EVLOCK_UNLOCK(cw->lock, 0); 96 97} 98 99static void 100wake_one_timeout(evutil_socket_t fd, short what, void *arg) 101{ 102 struct cond_wait *cw = arg; 103 EVLOCK_LOCK(cw->lock, 0); 104 EVTHREAD_COND_SIGNAL(cw->cond); 105 EVLOCK_UNLOCK(cw->lock, 0); 106} 107 108#define NUM_THREADS 100 109#define NUM_ITERATIONS 100 110void *count_lock; 111static int count; 112 113static THREAD_FN 114basic_thread(void *arg) 115{ 116 struct cond_wait cw; 117 struct event_base *base = arg; 118 struct event ev; 119 int i = 0; 120 121 EVTHREAD_ALLOC_LOCK(cw.lock, 0); 122 EVTHREAD_ALLOC_COND(cw.cond); 123 assert(cw.lock); 124 assert(cw.cond); 125 126 evtimer_assign(&ev, base, wake_all_timeout, &cw); 127 for (i = 0; i < NUM_ITERATIONS; i++) { 128 struct timeval tv; 129 evutil_timerclear(&tv); 130 tv.tv_sec = 0; 131 tv.tv_usec = 3000; 132 133 EVLOCK_LOCK(cw.lock, 0); 134 /* we need to make sure that event does not happen before 135 * we get to wait on the conditional variable */ 136 assert(evtimer_add(&ev, &tv) == 0); 137 138 assert(EVTHREAD_COND_WAIT(cw.cond, cw.lock) == 0); 139 EVLOCK_UNLOCK(cw.lock, 0); 140 141 EVLOCK_LOCK(count_lock, 0); 142 ++count; 143 EVLOCK_UNLOCK(count_lock, 0); 144 } 145 146 /* exit the loop only if all threads fired all timeouts */ 147 EVLOCK_LOCK(count_lock, 0); 148 if (count >= NUM_THREADS * NUM_ITERATIONS) 149 event_base_loopexit(base, NULL); 150 EVLOCK_UNLOCK(count_lock, 0); 151 152 EVTHREAD_FREE_LOCK(cw.lock, 0); 153 EVTHREAD_FREE_COND(cw.cond); 154 155 THREAD_RETURN(); 156} 157 158static int notification_fd_used = 0; 159#ifndef WIN32 160static int got_sigchld = 0; 161static void 162sigchld_cb(evutil_socket_t fd, short event, void *arg) 163{ 164 struct timeval tv; 165 struct event_base *base = arg; 166 167 got_sigchld++; 168 tv.tv_usec = 100000; 169 tv.tv_sec = 0; 170 event_base_loopexit(base, &tv); 171} 172 173 174static void 175notify_fd_cb(evutil_socket_t fd, short event, void *arg) 176{ 177 ++notification_fd_used; 178} 179#endif 180 181static void 182thread_basic(void *arg) 183{ 184 THREAD_T threads[NUM_THREADS]; 185 struct event ev; 186 struct timeval tv; 187 int i; 188 struct basic_test_data *data = arg; 189 struct event_base *base = data->base; 190 191 struct event *notification_event = NULL; 192 struct event *sigchld_event = NULL; 193 194 EVTHREAD_ALLOC_LOCK(count_lock, 0); 195 tt_assert(count_lock); 196 197 tt_assert(base); 198 if (evthread_make_base_notifiable(base)<0) { 199 tt_abort_msg("Couldn't make base notifiable!"); 200 } 201 202#ifndef WIN32 203 if (data->setup_data && !strcmp(data->setup_data, "forking")) { 204 pid_t pid; 205 int status; 206 sigchld_event = evsignal_new(base, SIGCHLD, sigchld_cb, base); 207 /* This piggybacks on the th_notify_fd weirdly, and looks 208 * inside libevent internals. Not a good idea in non-testing 209 * code! */ 210 notification_event = event_new(base, 211 base->th_notify_fd[0], EV_READ|EV_PERSIST, notify_fd_cb, 212 NULL); 213 event_add(sigchld_event, NULL); 214 event_add(notification_event, NULL); 215 216 if ((pid = fork()) == 0) { 217 event_del(notification_event); 218 if (event_reinit(base) < 0) { 219 TT_FAIL(("reinit")); 220 exit(1); 221 } 222 event_assign(notification_event, base, 223 base->th_notify_fd[0], EV_READ|EV_PERSIST, 224 notify_fd_cb, NULL); 225 event_add(notification_event, NULL); 226 goto child; 227 } 228 229 event_base_dispatch(base); 230 231 if (waitpid(pid, &status, 0) == -1) 232 tt_abort_perror("waitpid"); 233 TT_BLATHER(("Waitpid okay\n")); 234 235 tt_assert(got_sigchld); 236 tt_int_op(notification_fd_used, ==, 0); 237 238 goto end; 239 } 240 241child: 242#endif 243 for (i = 0; i < NUM_THREADS; ++i) 244 THREAD_START(threads[i], basic_thread, base); 245 246 evtimer_assign(&ev, base, NULL, NULL); 247 evutil_timerclear(&tv); 248 tv.tv_sec = 1000; 249 event_add(&ev, &tv); 250 251 event_base_dispatch(base); 252 253 for (i = 0; i < NUM_THREADS; ++i) 254 THREAD_JOIN(threads[i]); 255 256 event_del(&ev); 257 258 tt_int_op(count, ==, NUM_THREADS * NUM_ITERATIONS); 259 260 EVTHREAD_FREE_LOCK(count_lock, 0); 261 262 TT_BLATHER(("notifiations==%d", notification_fd_used)); 263 264end: 265 266 if (notification_event) 267 event_free(notification_event); 268 if (sigchld_event) 269 event_free(sigchld_event); 270} 271 272#undef NUM_THREADS 273#define NUM_THREADS 10 274 275struct alerted_record { 276 struct cond_wait *cond; 277 struct timeval delay; 278 struct timeval alerted_at; 279 int timed_out; 280}; 281 282static THREAD_FN 283wait_for_condition(void *arg) 284{ 285 struct alerted_record *rec = arg; 286 int r; 287 288 EVLOCK_LOCK(rec->cond->lock, 0); 289 if (rec->delay.tv_sec || rec->delay.tv_usec) { 290 r = EVTHREAD_COND_WAIT_TIMED(rec->cond->cond, rec->cond->lock, 291 &rec->delay); 292 } else { 293 r = EVTHREAD_COND_WAIT(rec->cond->cond, rec->cond->lock); 294 } 295 EVLOCK_UNLOCK(rec->cond->lock, 0); 296 297 evutil_gettimeofday(&rec->alerted_at, NULL); 298 if (r == 1) 299 rec->timed_out = 1; 300 301 THREAD_RETURN(); 302} 303 304static void 305thread_conditions_simple(void *arg) 306{ 307 struct timeval tv_signal, tv_timeout, tv_broadcast; 308 struct alerted_record alerted[NUM_THREADS]; 309 THREAD_T threads[NUM_THREADS]; 310 struct cond_wait cond; 311 int i; 312 struct timeval launched_at; 313 struct event wake_one; 314 struct event wake_all; 315 struct basic_test_data *data = arg; 316 struct event_base *base = data->base; 317 int n_timed_out=0, n_signal=0, n_broadcast=0; 318 319 tv_signal.tv_sec = tv_timeout.tv_sec = tv_broadcast.tv_sec = 0; 320 tv_signal.tv_usec = 30*1000; 321 tv_timeout.tv_usec = 150*1000; 322 tv_broadcast.tv_usec = 500*1000; 323 324 EVTHREAD_ALLOC_LOCK(cond.lock, EVTHREAD_LOCKTYPE_RECURSIVE); 325 EVTHREAD_ALLOC_COND(cond.cond); 326 tt_assert(cond.lock); 327 tt_assert(cond.cond); 328 for (i = 0; i < NUM_THREADS; ++i) { 329 memset(&alerted[i], 0, sizeof(struct alerted_record)); 330 alerted[i].cond = &cond; 331 } 332 333 /* Threads 5 and 6 will be allowed to time out */ 334 memcpy(&alerted[5].delay, &tv_timeout, sizeof(tv_timeout)); 335 memcpy(&alerted[6].delay, &tv_timeout, sizeof(tv_timeout)); 336 337 evtimer_assign(&wake_one, base, wake_one_timeout, &cond); 338 evtimer_assign(&wake_all, base, wake_all_timeout, &cond); 339 340 evutil_gettimeofday(&launched_at, NULL); 341 342 /* Launch the threads... */ 343 for (i = 0; i < NUM_THREADS; ++i) { 344 THREAD_START(threads[i], wait_for_condition, &alerted[i]); 345 } 346 347 /* Start the timers... */ 348 tt_int_op(event_add(&wake_one, &tv_signal), ==, 0); 349 tt_int_op(event_add(&wake_all, &tv_broadcast), ==, 0); 350 351 /* And run for a bit... */ 352 event_base_dispatch(base); 353 354 /* And wait till the threads are done. */ 355 for (i = 0; i < NUM_THREADS; ++i) 356 THREAD_JOIN(threads[i]); 357 358 /* Now, let's see what happened. At least one of 5 or 6 should 359 * have timed out. */ 360 n_timed_out = alerted[5].timed_out + alerted[6].timed_out; 361 tt_int_op(n_timed_out, >=, 1); 362 tt_int_op(n_timed_out, <=, 2); 363 364 for (i = 0; i < NUM_THREADS; ++i) { 365 const struct timeval *target_delay; 366 struct timeval target_time, actual_delay; 367 if (alerted[i].timed_out) { 368 TT_BLATHER(("%d looks like a timeout\n", i)); 369 target_delay = &tv_timeout; 370 tt_assert(i == 5 || i == 6); 371 } else if (evutil_timerisset(&alerted[i].alerted_at)) { 372 long diff1,diff2; 373 evutil_timersub(&alerted[i].alerted_at, 374 &launched_at, &actual_delay); 375 diff1 = timeval_msec_diff(&actual_delay, 376 &tv_signal); 377 diff2 = timeval_msec_diff(&actual_delay, 378 &tv_broadcast); 379 if (abs(diff1) < abs(diff2)) { 380 TT_BLATHER(("%d looks like a signal\n", i)); 381 target_delay = &tv_signal; 382 ++n_signal; 383 } else { 384 TT_BLATHER(("%d looks like a broadcast\n", i)); 385 target_delay = &tv_broadcast; 386 ++n_broadcast; 387 } 388 } else { 389 TT_FAIL(("Thread %d never got woken", i)); 390 continue; 391 } 392 evutil_timeradd(target_delay, &launched_at, &target_time); 393 test_timeval_diff_leq(&target_time, &alerted[i].alerted_at, 394 0, 50); 395 } 396 tt_int_op(n_broadcast + n_signal + n_timed_out, ==, NUM_THREADS); 397 tt_int_op(n_signal, ==, 1); 398 399end: 400 ; 401} 402 403#define CB_COUNT 128 404#define QUEUE_THREAD_COUNT 8 405 406#ifdef WIN32 407#define SLEEP_MS(ms) Sleep(ms) 408#else 409#define SLEEP_MS(ms) usleep((ms) * 1000) 410#endif 411 412struct deferred_test_data { 413 struct deferred_cb cbs[CB_COUNT]; 414 struct deferred_cb_queue *queue; 415}; 416 417static time_t timer_start = 0; 418static time_t timer_end = 0; 419static unsigned callback_count = 0; 420static THREAD_T load_threads[QUEUE_THREAD_COUNT]; 421static struct deferred_test_data deferred_data[QUEUE_THREAD_COUNT]; 422 423static void 424deferred_callback(struct deferred_cb *cb, void *arg) 425{ 426 SLEEP_MS(1); 427 callback_count += 1; 428} 429 430static THREAD_FN 431load_deferred_queue(void *arg) 432{ 433 struct deferred_test_data *data = arg; 434 size_t i; 435 436 for (i = 0; i < CB_COUNT; ++i) { 437 event_deferred_cb_init(&data->cbs[i], deferred_callback, NULL); 438 event_deferred_cb_schedule(data->queue, &data->cbs[i]); 439 SLEEP_MS(1); 440 } 441 442 THREAD_RETURN(); 443} 444 445static void 446timer_callback(evutil_socket_t fd, short what, void *arg) 447{ 448 timer_end = time(NULL); 449} 450 451static void 452start_threads_callback(evutil_socket_t fd, short what, void *arg) 453{ 454 int i; 455 456 for (i = 0; i < QUEUE_THREAD_COUNT; ++i) { 457 THREAD_START(load_threads[i], load_deferred_queue, 458 &deferred_data[i]); 459 } 460} 461 462static void 463thread_deferred_cb_skew(void *arg) 464{ 465 struct basic_test_data *data = arg; 466 struct timeval tv_timer = {4, 0}; 467 struct deferred_cb_queue *queue; 468 time_t elapsed; 469 int i; 470 471 queue = event_base_get_deferred_cb_queue(data->base); 472 tt_assert(queue); 473 474 for (i = 0; i < QUEUE_THREAD_COUNT; ++i) 475 deferred_data[i].queue = queue; 476 477 timer_start = time(NULL); 478 event_base_once(data->base, -1, EV_TIMEOUT, timer_callback, NULL, 479 &tv_timer); 480 event_base_once(data->base, -1, EV_TIMEOUT, start_threads_callback, 481 NULL, NULL); 482 event_base_dispatch(data->base); 483 484 elapsed = timer_end - timer_start; 485 TT_BLATHER(("callback count, %u", callback_count)); 486 TT_BLATHER(("elapsed time, %u", (unsigned)elapsed)); 487 /* XXX be more intelligent here. just make sure skew is 488 * within 2 seconds for now. */ 489 tt_assert(elapsed >= 4 && elapsed <= 6); 490 491end: 492 for (i = 0; i < QUEUE_THREAD_COUNT; ++i) 493 THREAD_JOIN(load_threads[i]); 494} 495 496#define TEST(name) \ 497 { #name, thread_##name, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, \ 498 &basic_setup, NULL } 499 500struct testcase_t thread_testcases[] = { 501 { "basic", thread_basic, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, 502 &basic_setup, NULL }, 503#ifndef WIN32 504 { "forking", thread_basic, TT_FORK|TT_NEED_THREADS|TT_NEED_BASE, 505 &basic_setup, (char*)"forking" }, 506#endif 507 TEST(conditions_simple), 508 TEST(deferred_cb_skew), 509 END_OF_TESTCASES 510}; 511 512