1/* 2 * threadPoolCmd.c -- 3 * 4 * This file implements the Tcl thread pools. 5 * 6 * Copyright (c) 2002 by Zoran Vasiljevic. 7 * 8 * See the file "license.terms" for information on usage and redistribution 9 * of this file, and for a DISCLAIMER OF ALL WARRANTIES. 10 * 11 * RCS: @(#) $Id: threadPoolCmd.c,v 1.43 2010/03/31 08:50:24 vasiljevic Exp $ 12 * ---------------------------------------------------------------------------- 13 */ 14 15#include "tclThread.h" 16 17/* 18 * Structure to maintain idle poster threads 19 */ 20 21typedef struct TpoolWaiter { 22 Tcl_ThreadId threadId; /* Thread id of the current thread */ 23 struct TpoolWaiter *nextPtr; /* Next structure in the list */ 24 struct TpoolWaiter *prevPtr; /* Previous structure in the list */ 25} TpoolWaiter; 26 27/* 28 * Structure describing an instance of a thread pool. 29 */ 30 31typedef struct ThreadPool { 32 unsigned int jobId; /* Job counter */ 33 int idleTime; /* Time in secs a worker thread idles */ 34 int tearDown; /* Set to 1 to tear down the pool */ 35 int suspend; /* Set to 1 to suspend pool processing */ 36 char *initScript; /* Script to initialize worker thread */ 37 char *exitScript; /* Script to cleanup the worker */ 38 int minWorkers; /* Minimum number or worker threads */ 39 int maxWorkers; /* Maximum number of worker threads */ 40 int numWorkers; /* Current number of worker threads */ 41 int idleWorkers; /* Number of idle workers */ 42 int refCount; /* Reference counter for reserve/release */ 43 Tcl_Mutex mutex; /* Pool mutex */ 44 Tcl_Condition cond; /* Pool condition variable */ 45 Tcl_HashTable jobsDone; /* Stores processed job results */ 46 struct TpoolResult *workTail; /* Tail of the list with jobs pending*/ 47 struct TpoolResult *workHead; /* Head of the list with jobs pending*/ 48 struct TpoolWaiter *waitTail; /* Tail of the thread waiters list */ 49 struct TpoolWaiter *waitHead; /* Head of the thread waiters list */ 50 struct ThreadPool *nextPtr; /* Next structure in the threadpool list */ 51 struct ThreadPool *prevPtr; /* Previous structure in threadpool list */ 52} ThreadPool; 53 54#define TPOOL_HNDLPREFIX "tpool" /* Prefix to generate Tcl pool handles */ 55#define TPOOL_MINWORKERS 0 /* Default minimum # of worker threads */ 56#define TPOOL_MAXWORKERS 4 /* Default maximum # of worker threads */ 57#define TPOOL_IDLETIMER 0 /* Default worker thread idle timer */ 58 59/* 60 * Structure for passing evaluation results 61 */ 62 63typedef struct TpoolResult { 64 int detached; /* Result is to be ignored */ 65 unsigned int jobId; /* The job id of the current job */ 66 char *script; /* Script to evaluate in worker thread */ 67 int scriptLen; /* Length of the script */ 68 int retcode; /* Tcl return code of the current job */ 69 char *result; /* Tcl result of the current job */ 70 char *errorCode; /* On error: content of the errorCode */ 71 char *errorInfo; /* On error: content of the errorInfo */ 72 Tcl_ThreadId threadId; /* Originating thread id */ 73 ThreadPool *tpoolPtr; /* Current thread pool */ 74 struct TpoolResult *nextPtr; 75 struct TpoolResult *prevPtr; 76} TpoolResult; 77 78/* 79 * Private structure for each worker/poster thread. 80 */ 81 82typedef struct ThreadSpecificData { 83 int stop; /* Set stop event; exit from event loop */ 84 TpoolWaiter *waitPtr; /* Threads private idle structure */ 85} ThreadSpecificData; 86 87static Tcl_ThreadDataKey dataKey; 88 89/* 90 * This global list maintains thread pools. 91 */ 92 93static ThreadPool *tpoolList; 94static Tcl_Mutex listMutex; 95static Tcl_Mutex startMutex; 96 97/* 98 * Used to represent the empty result. 99 */ 100 101static char *threadEmptyResult = (char *)""; 102 103/* 104 * Functions implementing Tcl commands 105 */ 106 107static Tcl_ObjCmdProc TpoolCreateObjCmd; 108static Tcl_ObjCmdProc TpoolPostObjCmd; 109static Tcl_ObjCmdProc TpoolWaitObjCmd; 110static Tcl_ObjCmdProc TpoolCancelObjCmd; 111static Tcl_ObjCmdProc TpoolGetObjCmd; 112static Tcl_ObjCmdProc TpoolReserveObjCmd; 113static Tcl_ObjCmdProc TpoolReleaseObjCmd; 114static Tcl_ObjCmdProc TpoolSuspendObjCmd; 115static Tcl_ObjCmdProc TpoolResumeObjCmd; 116static Tcl_ObjCmdProc TpoolNamesObjCmd; 117 118/* 119 * Miscelaneous functions used within this file 120 */ 121 122static int 123CreateWorker _ANSI_ARGS_((Tcl_Interp *interp, ThreadPool *tpoolPtr)); 124 125static Tcl_ThreadCreateType 126TpoolWorker _ANSI_ARGS_((ClientData clientData)); 127 128static int 129RunStopEvent _ANSI_ARGS_((Tcl_Event *evPtr, int mask)); 130 131static void 132PushWork _ANSI_ARGS_((TpoolResult *rPtr, ThreadPool *tpoolPtr)); 133 134static TpoolResult* 135PopWork _ANSI_ARGS_((ThreadPool *tpoolPtr)); 136 137static void 138PushWaiter _ANSI_ARGS_((ThreadPool *tpoolPtr)); 139 140static TpoolWaiter* 141PopWaiter _ANSI_ARGS_((ThreadPool *tpoolPtr)); 142 143static void 144SignalWaiter _ANSI_ARGS_((ThreadPool *tpoolPtr)); 145 146static int 147TpoolEval _ANSI_ARGS_((Tcl_Interp *interp, char *script, int scriptLen, 148 TpoolResult *rPtr)); 149static void 150SetResult _ANSI_ARGS_((Tcl_Interp *interp, TpoolResult *rPtr)); 151 152static ThreadPool* 153GetTpool _ANSI_ARGS_((const char *tpoolName)); 154 155static ThreadPool* 156GetTpoolUnl _ANSI_ARGS_((const char *tpoolName)); 157 158static void 159ThrExitHandler _ANSI_ARGS_((ClientData clientData)); 160 161static void 162AppExitHandler _ANSI_ARGS_((ClientData clientData)); 163 164static int 165TpoolReserve _ANSI_ARGS_((ThreadPool *tpoolPtr)); 166 167static int 168TpoolRelease _ANSI_ARGS_((ThreadPool *tpoolPtr)); 169 170static void 171TpoolSuspend _ANSI_ARGS_((ThreadPool *tpoolPtr)); 172 173static void 174TpoolResume _ANSI_ARGS_((ThreadPool *tpoolPtr)); 175 176static void 177InitWaiter _ANSI_ARGS_((void)); 178 179 180/* 181 *---------------------------------------------------------------------- 182 * 183 * TpoolCreateObjCmd -- 184 * 185 * This procedure is invoked to process the "tpool::create" Tcl 186 * command. See the user documentation for details on what it does. 187 * 188 * Results: 189 * A standard Tcl result. 190 * 191 * Side effects: 192 * None. 193 * 194 *---------------------------------------------------------------------- 195 */ 196 197static int 198TpoolCreateObjCmd(dummy, interp, objc, objv) 199 ClientData dummy; /* Not used. */ 200 Tcl_Interp *interp; /* Current interpreter. */ 201 int objc; /* Number of arguments. */ 202 Tcl_Obj *const objv[]; /* Argument objects. */ 203{ 204 int ii, minw, maxw, idle, len; 205 char buf[64], *exs = NULL, *cmd = NULL; 206 ThreadPool *tpoolPtr; 207 208 /* 209 * Syntax: tpool::create ?-minworkers count? 210 * ?-maxworkers count? 211 * ?-initcmd script? 212 * ?-exitcmd script? 213 * ?-idletime seconds? 214 */ 215 216 if (((objc-1) % 2)) { 217 goto usage; 218 } 219 220 minw = TPOOL_MINWORKERS; 221 maxw = TPOOL_MAXWORKERS; 222 idle = TPOOL_IDLETIMER; 223 224 /* 225 * Parse the optional arguments 226 */ 227 228 for (ii = 1; ii < objc; ii += 2) { 229 char *opt = Tcl_GetString(objv[ii]); 230 if (OPT_CMP(opt, "-minworkers")) { 231 if (Tcl_GetIntFromObj(interp, objv[ii+1], &minw) != TCL_OK) { 232 return TCL_ERROR; 233 } 234 } else if (OPT_CMP(opt, "-maxworkers")) { 235 if (Tcl_GetIntFromObj(interp, objv[ii+1], &maxw) != TCL_OK) { 236 return TCL_ERROR; 237 } 238 } else if (OPT_CMP(opt, "-idletime")) { 239 if (Tcl_GetIntFromObj(interp, objv[ii+1], &idle) != TCL_OK) { 240 return TCL_ERROR; 241 } 242 } else if (OPT_CMP(opt, "-initcmd")) { 243 const char *val = Tcl_GetStringFromObj(objv[ii+1], &len); 244 cmd = strcpy(Tcl_Alloc(len+1), val); 245 } else if (OPT_CMP(opt, "-exitcmd")) { 246 const char *val = Tcl_GetStringFromObj(objv[ii+1], &len); 247 exs = strcpy(Tcl_Alloc(len+1), val); 248 } else { 249 goto usage; 250 } 251 } 252 253 /* 254 * Do some consistency checking 255 */ 256 257 if (minw < 0) { 258 minw = 0; 259 } 260 if (maxw < 0) { 261 maxw = TPOOL_MAXWORKERS; 262 } 263 if (minw > maxw) { 264 maxw = minw; 265 } 266 267 /* 268 * Allocate and initialize thread pool structure 269 */ 270 271 tpoolPtr = (ThreadPool*)Tcl_Alloc(sizeof(ThreadPool)); 272 memset(tpoolPtr, 0, sizeof(ThreadPool)); 273 274 tpoolPtr->minWorkers = minw; 275 tpoolPtr->maxWorkers = maxw; 276 tpoolPtr->idleTime = idle; 277 tpoolPtr->initScript = cmd; 278 tpoolPtr->exitScript = exs; 279 Tcl_InitHashTable(&tpoolPtr->jobsDone, TCL_ONE_WORD_KEYS); 280 281 Tcl_MutexLock(&listMutex); 282 SpliceIn(tpoolPtr, tpoolList); 283 Tcl_MutexUnlock(&listMutex); 284 285 /* 286 * Start the required number of worker threads. 287 * On failure to start any of them, tear-down 288 * partially initialized pool. 289 */ 290 291 Tcl_MutexLock(&tpoolPtr->mutex); 292 for (ii = 0; ii < tpoolPtr->minWorkers; ii++) { 293 if (CreateWorker(interp, tpoolPtr) != TCL_OK) { 294 Tcl_MutexUnlock(&tpoolPtr->mutex); 295 Tcl_MutexLock(&listMutex); 296 TpoolRelease(tpoolPtr); 297 Tcl_MutexUnlock(&listMutex); 298 return TCL_ERROR; 299 } 300 } 301 Tcl_MutexUnlock(&tpoolPtr->mutex); 302 303 sprintf(buf, "%s%p", TPOOL_HNDLPREFIX, tpoolPtr); 304 Tcl_SetObjResult(interp, Tcl_NewStringObj(buf, -1)); 305 306 return TCL_OK; 307 308 usage: 309 Tcl_WrongNumArgs(interp, 1, objv, 310 "?-minworkers count? ?-maxworkers count? " 311 "?-initcmd script? ?-exitcmd script? " 312 "?-idletime seconds?"); 313 return TCL_ERROR; 314} 315 316/* 317 *---------------------------------------------------------------------- 318 * 319 * TpoolPostObjCmd -- 320 * 321 * This procedure is invoked to process the "tpool::post" Tcl 322 * command. See the user documentation for details on what it does. 323 * 324 * Results: 325 * A standard Tcl result. 326 * 327 * Side effects: 328 * None. 329 * 330 *---------------------------------------------------------------------- 331 */ 332 333static int 334TpoolPostObjCmd(dummy, interp, objc, objv) 335 ClientData dummy; /* Not used. */ 336 Tcl_Interp *interp; /* Current interpreter. */ 337 int objc; /* Number of arguments. */ 338 Tcl_Obj *const objv[]; /* Argument objects. */ 339{ 340 unsigned int jobId = 0; 341 int ii, detached = 0, nowait = 0, len; 342 const char *tpoolName, *script; 343 TpoolResult *rPtr; 344 ThreadPool *tpoolPtr; 345 346 ThreadSpecificData *tsdPtr = TCL_TSD_INIT(&dataKey); 347 348 /* 349 * Syntax: tpool::post ?-detached? ?-nowait? tpoolId script 350 */ 351 352 if (objc < 3 || objc > 5) { 353 goto usage; 354 } 355 for (ii = 1; ii < objc; ii++) { 356 char *opt = Tcl_GetString(objv[ii]); 357 if (*opt != '-') { 358 break; 359 } else if (OPT_CMP(opt, "-detached")) { 360 detached = 1; 361 } else if (OPT_CMP(opt, "-nowait")) { 362 nowait = 1; 363 } else { 364 goto usage; 365 } 366 } 367 368 tpoolName = Tcl_GetString(objv[ii]); 369 script = Tcl_GetStringFromObj(objv[ii+1], &len); 370 tpoolPtr = GetTpool(tpoolName); 371 if (tpoolPtr == NULL) { 372 Tcl_AppendResult(interp, "can not find threadpool \"", tpoolName, 373 "\"", NULL); 374 return TCL_ERROR; 375 } 376 377 /* 378 * Initialize per-thread private data for this caller 379 */ 380 381 InitWaiter(); 382 383 /* 384 * See if any worker available to run the job. 385 */ 386 387 Tcl_MutexLock(&tpoolPtr->mutex); 388 if (nowait && tpoolPtr->numWorkers == 0) { 389 390 /* 391 * Do not wait for an idle thread but assure 392 * there is at least one worker started. 393 */ 394 395 PushWaiter(tpoolPtr); 396 if (CreateWorker(interp, tpoolPtr) != TCL_OK) { 397 Tcl_MutexUnlock(&tpoolPtr->mutex); 398 return TCL_ERROR; 399 } 400 /* Wait for worker to start and service the event loop */ 401 Tcl_MutexUnlock(&tpoolPtr->mutex); 402 tsdPtr->stop = -1; 403 while(tsdPtr->stop == -1) { 404 Tcl_DoOneEvent(TCL_ALL_EVENTS); 405 } 406 Tcl_MutexLock(&tpoolPtr->mutex); 407 } else { 408 409 /* 410 * If there are no idle worker threads, start some new 411 * unless we are already running max number of workers. 412 * In that case wait for the next one to become idle. 413 */ 414 415 while (tpoolPtr->idleWorkers == 0) { 416 PushWaiter(tpoolPtr); 417 if (tpoolPtr->numWorkers < tpoolPtr->maxWorkers) { 418 /* No more free workers; start new one */ 419 if (CreateWorker(interp, tpoolPtr) != TCL_OK) { 420 Tcl_MutexUnlock(&tpoolPtr->mutex); 421 return TCL_ERROR; 422 } 423 } 424 /* Wait for any idle worker and service the event loop */ 425 Tcl_MutexUnlock(&tpoolPtr->mutex); 426 tsdPtr->stop = -1; 427 while(tsdPtr->stop == -1) { 428 Tcl_DoOneEvent(TCL_ALL_EVENTS); 429 } 430 Tcl_MutexLock(&tpoolPtr->mutex); 431 } 432 } 433 434 /* 435 * Create new job ticket and put it on the list. 436 */ 437 438 rPtr = (TpoolResult*)Tcl_Alloc(sizeof(TpoolResult)); 439 memset(rPtr, 0, sizeof(TpoolResult)); 440 441 if (detached == 0) { 442 jobId = ++tpoolPtr->jobId; 443 rPtr->jobId = jobId; 444 } 445 446 rPtr->script = strcpy(Tcl_Alloc(len+1), script); 447 rPtr->scriptLen = len; 448 rPtr->detached = detached; 449 rPtr->threadId = Tcl_GetCurrentThread(); 450 451 PushWork(rPtr, tpoolPtr); 452 Tcl_ConditionNotify(&tpoolPtr->cond); 453 Tcl_MutexUnlock(&tpoolPtr->mutex); 454 455 if (detached == 0) { 456 Tcl_SetObjResult(interp, Tcl_NewIntObj(jobId)); 457 } 458 459 return TCL_OK; 460 461 usage: 462 Tcl_WrongNumArgs(interp, 1, objv, "?-detached? ?-nowait? tpoolId script"); 463 return TCL_ERROR; 464} 465 466/* 467 *---------------------------------------------------------------------- 468 * 469 * TpoolWaitObjCmd -- 470 * 471 * This procedure is invoked to process the "tpool::wait" Tcl 472 * command. See the user documentation for details on what it does. 473 * 474 * Results: 475 * A standard Tcl result. 476 * 477 * Side effects: 478 * None. 479 * 480 *---------------------------------------------------------------------- 481 */ 482static int 483TpoolWaitObjCmd(dummy, interp, objc, objv) 484 ClientData dummy; /* Not used. */ 485 Tcl_Interp *interp; /* Current interpreter. */ 486 int objc; /* Number of arguments. */ 487 Tcl_Obj *const objv[]; /* Argument objects. */ 488{ 489 int ii, done, wObjc; 490 unsigned int jobId; 491 char *tpoolName, *listVar = NULL; 492 Tcl_Obj *waitList, *doneList, **wObjv; 493 ThreadPool *tpoolPtr; 494 TpoolResult *rPtr; 495 Tcl_HashEntry *hPtr; 496 497 ThreadSpecificData *tsdPtr = TCL_TSD_INIT(&dataKey); 498 499 /* 500 * Syntax: tpool::wait tpoolId jobIdList ?listVar? 501 */ 502 503 if (objc < 3 || objc > 4) { 504 Tcl_WrongNumArgs(interp, 1, objv, "tpoolId jobIdList ?listVar"); 505 return TCL_ERROR; 506 } 507 if (objc == 4) { 508 listVar = Tcl_GetString(objv[3]); 509 } 510 if (Tcl_ListObjGetElements(interp, objv[2], &wObjc, &wObjv) != TCL_OK) { 511 return TCL_ERROR; 512 } 513 tpoolName = Tcl_GetString(objv[1]); 514 tpoolPtr = GetTpool(tpoolName); 515 if (tpoolPtr == NULL) { 516 Tcl_AppendResult(interp, "can not find threadpool \"", tpoolName, 517 "\"", NULL); 518 return TCL_ERROR; 519 } 520 521 InitWaiter(); 522 done = 0; /* Number of elements in the done list */ 523 doneList = Tcl_NewListObj(0, NULL); 524 525 Tcl_MutexLock(&tpoolPtr->mutex); 526 while (1) { 527 waitList = Tcl_NewListObj(0, NULL); 528 for (ii = 0; ii < wObjc; ii++) { 529 if (Tcl_GetIntFromObj(interp, wObjv[ii], (int *)&jobId) != TCL_OK) { 530 Tcl_MutexUnlock(&tpoolPtr->mutex); 531 return TCL_ERROR; 532 } 533 hPtr = Tcl_FindHashEntry(&tpoolPtr->jobsDone, (char*)jobId); 534 if (hPtr) { 535 rPtr = (TpoolResult*)Tcl_GetHashValue(hPtr); 536 } else { 537 for (rPtr = tpoolPtr->workHead; rPtr; rPtr = rPtr->nextPtr) { 538 if (rPtr->jobId == jobId) { 539 break; 540 } 541 } 542 if (rPtr == NULL) { 543 continue; /* Bogus job id; ignore */ 544 } 545 } 546 if (rPtr->detached) { 547 continue; /* A detached job */ 548 } 549 if (rPtr->result) { 550 done++; /* Job has been processed */ 551 Tcl_ListObjAppendElement(interp, doneList, wObjv[ii]); 552 } else if (listVar) { 553 Tcl_ListObjAppendElement(interp, waitList, wObjv[ii]); 554 } 555 } 556 if (done) { 557 break; 558 } 559 560 /* 561 * None of the jobs done, wait for completion 562 * of the next job and try again. 563 */ 564 565 Tcl_DecrRefCount(waitList); 566 PushWaiter(tpoolPtr); 567 568 Tcl_MutexUnlock(&tpoolPtr->mutex); 569 tsdPtr->stop = -1; 570 while (tsdPtr->stop == -1) { 571 Tcl_DoOneEvent(TCL_ALL_EVENTS); 572 } 573 Tcl_MutexLock(&tpoolPtr->mutex); 574 } 575 Tcl_MutexUnlock(&tpoolPtr->mutex); 576 577 if (listVar) { 578 Tcl_SetVar2Ex(interp, listVar, NULL, waitList, 0); 579 } 580 581 Tcl_SetObjResult(interp, doneList); 582 583 return TCL_OK; 584} 585 586/* 587 *---------------------------------------------------------------------- 588 * 589 * TpoolCancelObjCmd -- 590 * 591 * This procedure is invoked to process the "tpool::cancel" Tcl 592 * command. See the user documentation for details on what it does. 593 * 594 * Results: 595 * A standard Tcl result. 596 * 597 * Side effects: 598 * None. 599 * 600 *---------------------------------------------------------------------- 601 */ 602static int 603TpoolCancelObjCmd(dummy, interp, objc, objv) 604 ClientData dummy; /* Not used. */ 605 Tcl_Interp *interp; /* Current interpreter. */ 606 int objc; /* Number of arguments. */ 607 Tcl_Obj *const objv[]; /* Argument objects. */ 608{ 609 int ii, wObjc, jobId; 610 char *tpoolName, *listVar = NULL; 611 Tcl_Obj *doneList, *waitList, **wObjv; 612 ThreadPool *tpoolPtr; 613 TpoolResult *rPtr; 614 615 /* 616 * Syntax: tpool::wait tpoolId jobIdList ?listVar? 617 */ 618 619 if (objc < 3 || objc > 4) { 620 Tcl_WrongNumArgs(interp, 1, objv, "tpoolId jobIdList ?listVar"); 621 return TCL_ERROR; 622 } 623 if (objc == 4) { 624 listVar = Tcl_GetString(objv[3]); 625 } 626 if (Tcl_ListObjGetElements(interp, objv[2], &wObjc, &wObjv) != TCL_OK) { 627 return TCL_ERROR; 628 } 629 tpoolName = Tcl_GetString(objv[1]); 630 tpoolPtr = GetTpool(tpoolName); 631 if (tpoolPtr == NULL) { 632 Tcl_AppendResult(interp, "can not find threadpool \"", tpoolName, 633 "\"", NULL); 634 return TCL_ERROR; 635 } 636 637 InitWaiter(); 638 doneList = Tcl_NewListObj(0, NULL); 639 waitList = Tcl_NewListObj(0, NULL); 640 641 Tcl_MutexLock(&tpoolPtr->mutex); 642 for (ii = 0; ii < wObjc; ii++) { 643 if (Tcl_GetIntFromObj(interp, wObjv[ii], &jobId) != TCL_OK) { 644 return TCL_ERROR; 645 } 646 for (rPtr = tpoolPtr->workHead; rPtr; rPtr = rPtr->nextPtr) { 647 if (rPtr->jobId == (unsigned int)jobId) { 648 if (rPtr->prevPtr != NULL) { 649 rPtr->prevPtr->nextPtr = rPtr->nextPtr; 650 } else { 651 tpoolPtr->workHead = rPtr->nextPtr; 652 } 653 if (rPtr->nextPtr != NULL) { 654 rPtr->nextPtr->prevPtr = rPtr->prevPtr; 655 } else { 656 tpoolPtr->workTail = rPtr->prevPtr; 657 } 658 SetResult(NULL, rPtr); /* Just to free the result */ 659 Tcl_Free(rPtr->script); 660 Tcl_Free((char*)rPtr); 661 Tcl_ListObjAppendElement(interp, doneList, wObjv[ii]); 662 break; 663 } 664 } 665 if (rPtr == NULL && listVar) { 666 Tcl_ListObjAppendElement(interp, waitList, wObjv[ii]); 667 } 668 } 669 Tcl_MutexUnlock(&tpoolPtr->mutex); 670 671 if (listVar) { 672 Tcl_SetVar2Ex(interp, listVar, NULL, waitList, 0); 673 } 674 675 Tcl_SetObjResult(interp, doneList); 676 677 return TCL_OK; 678} 679 680/* 681 *---------------------------------------------------------------------- 682 * 683 * TpoolGetObjCmd -- 684 * 685 * This procedure is invoked to process the "tpool::get" Tcl 686 * command. See the user documentation for details on what it does. 687 * 688 * Results: 689 * A standard Tcl result. 690 * 691 * Side effects: 692 * None. 693 * 694 *---------------------------------------------------------------------- 695 */ 696static int 697TpoolGetObjCmd(dummy, interp, objc, objv) 698 ClientData dummy; /* Not used. */ 699 Tcl_Interp *interp; /* Current interpreter. */ 700 int objc; /* Number of arguments. */ 701 Tcl_Obj *const objv[]; /* Argument objects. */ 702{ 703 int ret, jobId; 704 char *tpoolName, *resVar = NULL; 705 ThreadPool *tpoolPtr; 706 TpoolResult *rPtr; 707 Tcl_HashEntry *hPtr; 708 709 /* 710 * Syntax: tpool::get tpoolId jobId ?result? 711 */ 712 713 if (objc < 3 || objc > 4) { 714 Tcl_WrongNumArgs(interp, 1, objv, "tpoolId jobId ?result?"); 715 return TCL_ERROR; 716 } 717 if (Tcl_GetIntFromObj(interp, objv[2], &jobId) != TCL_OK) { 718 return TCL_ERROR; 719 } 720 if (objc == 4) { 721 resVar = Tcl_GetString(objv[3]); 722 } 723 724 /* 725 * Locate the threadpool 726 */ 727 728 tpoolName = Tcl_GetString(objv[1]); 729 tpoolPtr = GetTpool(tpoolName); 730 if (tpoolPtr == NULL) { 731 Tcl_AppendResult(interp, "can not find threadpool \"", tpoolName, 732 "\"", NULL); 733 return TCL_ERROR; 734 } 735 736 /* 737 * Locate the job in question. It is an error to 738 * do a "get" on bogus job handle or on the job 739 * which did not complete yet. 740 */ 741 742 Tcl_MutexLock(&tpoolPtr->mutex); 743 hPtr = Tcl_FindHashEntry(&tpoolPtr->jobsDone, (char*)jobId); 744 if (hPtr == NULL) { 745 Tcl_MutexUnlock(&tpoolPtr->mutex); 746 Tcl_AppendResult(interp, "no such job", NULL); 747 return TCL_ERROR; 748 } 749 rPtr = (TpoolResult*)Tcl_GetHashValue(hPtr); 750 if (rPtr->result == NULL) { 751 Tcl_MutexUnlock(&tpoolPtr->mutex); 752 Tcl_AppendResult(interp, "job not completed", NULL); 753 return TCL_ERROR; 754 } 755 756 Tcl_DeleteHashEntry(hPtr); 757 Tcl_MutexUnlock(&tpoolPtr->mutex); 758 759 ret = rPtr->retcode; 760 SetResult(interp, rPtr); 761 Tcl_Free((char*)rPtr); 762 763 if (resVar) { 764 Tcl_SetVar2Ex(interp, resVar, NULL, Tcl_GetObjResult(interp), 0); 765 Tcl_SetObjResult(interp, Tcl_NewIntObj(ret)); 766 ret = TCL_OK; 767 } 768 769 return ret; 770} 771 772/* 773 *---------------------------------------------------------------------- 774 * 775 * TpoolReserveObjCmd -- 776 * 777 * This procedure is invoked to process the "tpool::preserve" Tcl 778 * command. See the user documentation for details on what it does. 779 * 780 * Results: 781 * A standard Tcl result. 782 * 783 * Side effects: 784 * None. 785 * 786 *---------------------------------------------------------------------- 787 */ 788 789static int 790TpoolReserveObjCmd(dummy, interp, objc, objv) 791 ClientData dummy; /* Not used. */ 792 Tcl_Interp *interp; /* Current interpreter. */ 793 int objc; /* Number of arguments. */ 794 Tcl_Obj *const objv[]; /* Argument objects. */ 795{ 796 int ret; 797 char *tpoolName; 798 ThreadPool *tpoolPtr; 799 800 /* 801 * Syntax: tpool::preserve tpoolId 802 */ 803 804 if (objc != 2) { 805 Tcl_WrongNumArgs(interp, 1, objv, "tpoolId"); 806 return TCL_ERROR; 807 } 808 809 tpoolName = Tcl_GetString(objv[1]); 810 811 Tcl_MutexLock(&listMutex); 812 tpoolPtr = GetTpoolUnl(tpoolName); 813 if (tpoolPtr == NULL) { 814 Tcl_MutexUnlock(&listMutex); 815 Tcl_AppendResult(interp, "can not find threadpool \"", tpoolName, 816 "\"", NULL); 817 return TCL_ERROR; 818 } 819 820 ret = TpoolReserve(tpoolPtr); 821 Tcl_MutexUnlock(&listMutex); 822 Tcl_SetObjResult(interp, Tcl_NewIntObj(ret)); 823 824 return TCL_OK; 825} 826 827/* 828 *---------------------------------------------------------------------- 829 * 830 * TpoolReleaseObjCmd -- 831 * 832 * This procedure is invoked to process the "tpool::release" Tcl 833 * command. See the user documentation for details on what it does. 834 * 835 * Results: 836 * A standard Tcl result. 837 * 838 * Side effects: 839 * None. 840 * 841 *---------------------------------------------------------------------- 842 */ 843 844static int 845TpoolReleaseObjCmd(dummy, interp, objc, objv) 846 ClientData dummy; /* Not used. */ 847 Tcl_Interp *interp; /* Current interpreter. */ 848 int objc; /* Number of arguments. */ 849 Tcl_Obj *const objv[]; /* Argument objects. */ 850{ 851 int ret; 852 char *tpoolName; 853 ThreadPool *tpoolPtr; 854 855 /* 856 * Syntax: tpool::release tpoolId 857 */ 858 859 if (objc != 2) { 860 Tcl_WrongNumArgs(interp, 1, objv, "tpoolId"); 861 return TCL_ERROR; 862 } 863 864 tpoolName = Tcl_GetString(objv[1]); 865 866 Tcl_MutexLock(&listMutex); 867 tpoolPtr = GetTpoolUnl(tpoolName); 868 if (tpoolPtr == NULL) { 869 Tcl_MutexUnlock(&listMutex); 870 Tcl_AppendResult(interp, "can not find threadpool \"", tpoolName, 871 "\"", NULL); 872 return TCL_ERROR; 873 } 874 875 ret = TpoolRelease(tpoolPtr); 876 Tcl_MutexUnlock(&listMutex); 877 Tcl_SetObjResult(interp, Tcl_NewIntObj(ret)); 878 879 return TCL_OK; 880} 881 882/* 883 *---------------------------------------------------------------------- 884 * 885 * TpoolSuspendObjCmd -- 886 * 887 * This procedure is invoked to process the "tpool::suspend" Tcl 888 * command. See the user documentation for details on what it does. 889 * 890 * Results: 891 * A standard Tcl result. 892 * 893 * Side effects: 894 * None. 895 * 896 *---------------------------------------------------------------------- 897 */ 898 899static int 900TpoolSuspendObjCmd(dummy, interp, objc, objv) 901 ClientData dummy; /* Not used. */ 902 Tcl_Interp *interp; /* Current interpreter. */ 903 int objc; /* Number of arguments. */ 904 Tcl_Obj *const objv[]; /* Argument objects. */ 905{ 906 char *tpoolName; 907 ThreadPool *tpoolPtr; 908 909 /* 910 * Syntax: tpool::suspend tpoolId 911 */ 912 913 if (objc != 2) { 914 Tcl_WrongNumArgs(interp, 1, objv, "tpoolId"); 915 return TCL_ERROR; 916 } 917 918 tpoolName = Tcl_GetString(objv[1]); 919 tpoolPtr = GetTpool(tpoolName); 920 921 if (tpoolPtr == NULL) { 922 Tcl_AppendResult(interp, "can not find threadpool \"", tpoolName, 923 "\"", NULL); 924 return TCL_ERROR; 925 } 926 927 TpoolSuspend(tpoolPtr); 928 929 return TCL_OK; 930} 931 932/* 933 *---------------------------------------------------------------------- 934 * 935 * TpoolResumeObjCmd -- 936 * 937 * This procedure is invoked to process the "tpool::resume" Tcl 938 * command. See the user documentation for details on what it does. 939 * 940 * Results: 941 * A standard Tcl result. 942 * 943 * Side effects: 944 * None. 945 * 946 *---------------------------------------------------------------------- 947 */ 948 949static int 950TpoolResumeObjCmd(dummy, interp, objc, objv) 951 ClientData dummy; /* Not used. */ 952 Tcl_Interp *interp; /* Current interpreter. */ 953 int objc; /* Number of arguments. */ 954 Tcl_Obj *const objv[]; /* Argument objects. */ 955{ 956 char *tpoolName; 957 ThreadPool *tpoolPtr; 958 959 /* 960 * Syntax: tpool::resume tpoolId 961 */ 962 963 if (objc != 2) { 964 Tcl_WrongNumArgs(interp, 1, objv, "tpoolId"); 965 return TCL_ERROR; 966 } 967 968 tpoolName = Tcl_GetString(objv[1]); 969 tpoolPtr = GetTpool(tpoolName); 970 971 if (tpoolPtr == NULL) { 972 Tcl_AppendResult(interp, "can not find threadpool \"", tpoolName, 973 "\"", NULL); 974 return TCL_ERROR; 975 } 976 977 TpoolResume(tpoolPtr); 978 979 return TCL_OK; 980} 981 982/* 983 *---------------------------------------------------------------------- 984 * 985 * TpoolNamesObjCmd -- 986 * 987 * This procedure is invoked to process the "tpool::names" Tcl 988 * command. See the user documentation for details on what it does. 989 * 990 * Results: 991 * A standard Tcl result. 992 * 993 * Side effects: 994 * None. 995 * 996 *---------------------------------------------------------------------- 997 */ 998 999static int 1000TpoolNamesObjCmd(dummy, interp, objc, objv) 1001 ClientData dummy; /* Not used. */ 1002 Tcl_Interp *interp; /* Current interpreter. */ 1003 int objc; /* Number of arguments. */ 1004 Tcl_Obj *const objv[]; /* Argument objects. */ 1005{ 1006 ThreadPool *tpoolPtr; 1007 Tcl_Obj *listObj = Tcl_NewListObj(0, NULL); 1008 1009 Tcl_MutexLock(&listMutex); 1010 for (tpoolPtr = tpoolList; tpoolPtr; tpoolPtr = tpoolPtr->nextPtr) { 1011 char buf[32]; 1012 sprintf(buf, "%s%p", TPOOL_HNDLPREFIX, tpoolPtr); 1013 Tcl_ListObjAppendElement(interp, listObj, Tcl_NewStringObj(buf,-1)); 1014 } 1015 Tcl_MutexUnlock(&listMutex); 1016 Tcl_SetObjResult(interp, listObj); 1017 1018 return TCL_OK; 1019} 1020 1021/* 1022 *---------------------------------------------------------------------- 1023 * 1024 * CreateWorker -- 1025 * 1026 * Creates new worker thread for the given pool. Assumes the caller 1027 * holds the pool mutex. 1028 * 1029 * Results: 1030 * None. 1031 * 1032 * Side effects: 1033 * Informs waiter thread (if any) about the new worker thread. 1034 * 1035 *---------------------------------------------------------------------- 1036 */ 1037static int 1038CreateWorker(interp, tpoolPtr) 1039 Tcl_Interp *interp; 1040 ThreadPool *tpoolPtr; 1041{ 1042 Tcl_ThreadId id; 1043 TpoolResult result; 1044 1045 /* 1046 * Initialize the result structure to be 1047 * passed to the new thread. This is used 1048 * as communication to and from the thread. 1049 */ 1050 1051 memset(&result, 0, sizeof(TpoolResult)); 1052 result.retcode = -1; 1053 result.tpoolPtr = tpoolPtr; 1054 1055 /* 1056 * Create new worker thread here. Wait for the thread to start 1057 * because it's using the ThreadResult arg which is on our stack. 1058 */ 1059 1060 Tcl_MutexLock(&startMutex); 1061 if (Tcl_CreateThread(&id, TpoolWorker, (ClientData)&result, 1062 TCL_THREAD_STACK_DEFAULT, 0) != TCL_OK) { 1063 Tcl_SetResult(interp, "can't create a new thread", TCL_STATIC); 1064 Tcl_MutexUnlock(&startMutex); 1065 return TCL_ERROR; 1066 } 1067 while(result.retcode == -1) { 1068 Tcl_ConditionWait(&tpoolPtr->cond, &startMutex, NULL); 1069 } 1070 Tcl_MutexUnlock(&startMutex); 1071 1072 /* 1073 * Set error-related information if the thread 1074 * failed to initialize correctly. 1075 */ 1076 1077 if (result.retcode == 1) { 1078 result.retcode = TCL_ERROR; 1079 SetResult(interp, &result); 1080 return TCL_ERROR; 1081 } 1082 1083 return TCL_OK; 1084} 1085 1086/* 1087 *---------------------------------------------------------------------- 1088 * 1089 * TpoolWorker -- 1090 * 1091 * This is the main function of each of the threads in the pool. 1092 * 1093 * Results: 1094 * None. 1095 * 1096 * Side effects: 1097 * None. 1098 * 1099 *---------------------------------------------------------------------- 1100 */ 1101 1102static Tcl_ThreadCreateType 1103TpoolWorker(clientData) 1104 ClientData clientData; 1105{ 1106 TpoolResult *rPtr = (TpoolResult*)clientData; 1107 ThreadPool *tpoolPtr = rPtr->tpoolPtr; 1108 1109 int tout = 0; 1110 Tcl_Interp *interp; 1111 Tcl_Time waitTime, *idlePtr; 1112 char *errMsg; 1113 1114 Tcl_MutexLock(&startMutex); 1115 1116 /* 1117 * Initialize the Tcl interpreter 1118 */ 1119 1120#ifdef NS_AOLSERVER 1121 interp = (Tcl_Interp*)Ns_TclAllocateInterp(NULL); 1122 rPtr->retcode = 0; 1123#else 1124 interp = Tcl_CreateInterp(); 1125 if (Tcl_Init(interp) != TCL_OK) { 1126 rPtr->retcode = 1; 1127 } else if (Thread_Init(interp) != TCL_OK) { 1128 rPtr->retcode = 1; 1129 } else { 1130 rPtr->retcode = 0; 1131 } 1132#endif 1133 1134 if (rPtr->retcode == 1) { 1135 errMsg = (char*)Tcl_GetStringResult(interp); 1136 rPtr->result = strcpy(Tcl_Alloc(strlen(errMsg)+1), errMsg); 1137 Tcl_ConditionNotify(&tpoolPtr->cond); 1138 Tcl_MutexUnlock(&startMutex); 1139 goto out; 1140 } 1141 1142 /* 1143 * Initialize the interpreter 1144 */ 1145 1146 if (tpoolPtr->initScript) { 1147 TpoolEval(interp, tpoolPtr->initScript, -1, rPtr); 1148 if (rPtr->retcode != TCL_OK) { 1149 rPtr->retcode = 1; 1150 errMsg = (char*)Tcl_GetStringResult(interp); 1151 rPtr->result = strcpy(Tcl_Alloc(strlen(errMsg)+1), errMsg); 1152 Tcl_ConditionNotify(&tpoolPtr->cond); 1153 Tcl_MutexUnlock(&startMutex); 1154 goto out; 1155 } 1156 } 1157 1158 /* 1159 * Setup idle timer 1160 */ 1161 1162 if (tpoolPtr->idleTime == 0) { 1163 idlePtr = NULL; 1164 } else { 1165 waitTime.sec = tpoolPtr->idleTime; 1166 waitTime.usec = 0; 1167 idlePtr = &waitTime; 1168 } 1169 1170 /* 1171 * Tell caller we've started 1172 */ 1173 1174 tpoolPtr->numWorkers++; 1175 Tcl_ConditionNotify(&tpoolPtr->cond); 1176 Tcl_MutexUnlock(&startMutex); 1177 1178 /* 1179 * Wait for jobs to arrive. Note the handcrafted time test. 1180 * Tcl API misses the return value of the Tcl_ConditionWait(). 1181 * Hence, we do not know why the call returned. Was it someone 1182 * signalled the variable or has the idle timer expired? 1183 */ 1184 1185 Tcl_MutexLock(&tpoolPtr->mutex); 1186 while (!tpoolPtr->tearDown) { 1187 SignalWaiter(tpoolPtr); 1188 tpoolPtr->idleWorkers++; 1189 rPtr = NULL; 1190 tout = 0; 1191 while (tpoolPtr->suspend 1192 || (!tpoolPtr->tearDown && !tout 1193 && (rPtr = PopWork(tpoolPtr)) == NULL)) { 1194 if (tpoolPtr->suspend && rPtr == NULL) { 1195 Tcl_ConditionWait(&tpoolPtr->cond, &tpoolPtr->mutex, NULL); 1196 } else if (rPtr == NULL) { 1197 Tcl_Time t1, t2; 1198 Tcl_GetTime(&t1); 1199 Tcl_ConditionWait(&tpoolPtr->cond, &tpoolPtr->mutex, idlePtr); 1200 Tcl_GetTime(&t2); 1201 if (tpoolPtr->idleTime > 0) { 1202 tout = (t2.sec - t1.sec) >= tpoolPtr->idleTime; 1203 } 1204 } 1205 } 1206 tpoolPtr->idleWorkers--; 1207 if (rPtr == NULL) { 1208 if (tpoolPtr->numWorkers > tpoolPtr->minWorkers) { 1209 break; /* Enough workers, can safely kill this one */ 1210 } else { 1211 continue; /* Worker count at min, leave this one alive */ 1212 } 1213 } else if (tpoolPtr->tearDown) { 1214 PushWork(rPtr, tpoolPtr); 1215 break; /* Kill worker because pool is going down */ 1216 } 1217 Tcl_MutexUnlock(&tpoolPtr->mutex); 1218 TpoolEval(interp, rPtr->script, rPtr->scriptLen, rPtr); 1219 Tcl_MutexLock(&tpoolPtr->mutex); 1220 Tcl_Free(rPtr->script); 1221 if (!rPtr->detached) { 1222 int new; 1223 Tcl_SetHashValue(Tcl_CreateHashEntry(&tpoolPtr->jobsDone, 1224 (char*)rPtr->jobId, &new), 1225 (ClientData)rPtr); 1226 } else { 1227 Tcl_Free((char*)rPtr); 1228 } 1229 } 1230 1231 /* 1232 * Tear down the worker 1233 */ 1234 1235 if (tpoolPtr->exitScript) { 1236 TpoolEval(interp, tpoolPtr->exitScript, -1, NULL); 1237 } 1238 1239 tpoolPtr->numWorkers--; 1240 SignalWaiter(tpoolPtr); 1241 Tcl_MutexUnlock(&tpoolPtr->mutex); 1242 1243 out: 1244 1245#ifdef NS_AOLSERVER 1246 Ns_TclMarkForDelete(interp); 1247 Ns_TclDeAllocateInterp(interp); 1248#else 1249 Tcl_DeleteInterp(interp); 1250#endif 1251 Tcl_ExitThread(0); 1252 1253 TCL_THREAD_CREATE_RETURN; 1254} 1255 1256/* 1257 *---------------------------------------------------------------------- 1258 * 1259 * RunStopEvent -- 1260 * 1261 * Signalizes the waiter thread to stop waiting. 1262 * 1263 * Results: 1264 * 1 (always) 1265 * 1266 * Side effects: 1267 * None. 1268 * 1269 *---------------------------------------------------------------------- 1270 */ 1271static int 1272RunStopEvent(eventPtr, mask) 1273 Tcl_Event *eventPtr; 1274 int mask; 1275{ 1276 ThreadSpecificData *tsdPtr = TCL_TSD_INIT(&dataKey); 1277 1278 tsdPtr->stop = 1; 1279 return 1; 1280} 1281 1282/* 1283 *---------------------------------------------------------------------- 1284 * 1285 * PushWork -- 1286 * 1287 * Adds a worker thread to the end of the workers list. 1288 * 1289 * Results: 1290 * None. 1291 * 1292 * Side effects: 1293 * None. 1294 * 1295 *---------------------------------------------------------------------- 1296 */ 1297 1298static void 1299PushWork(rPtr, tpoolPtr) 1300 TpoolResult *rPtr; 1301 ThreadPool *tpoolPtr; 1302{ 1303 SpliceIn(rPtr, tpoolPtr->workHead); 1304 if (tpoolPtr->workTail == NULL) { 1305 tpoolPtr->workTail = rPtr; 1306 } 1307} 1308 1309/* 1310 *---------------------------------------------------------------------- 1311 * 1312 * PopWork -- 1313 * 1314 * Pops the work ticket from the list 1315 * 1316 * Results: 1317 * None. 1318 * 1319 * Side effects: 1320 * None. 1321 * 1322 *---------------------------------------------------------------------- 1323 */ 1324 1325static TpoolResult * 1326PopWork(tpoolPtr) 1327 ThreadPool *tpoolPtr; 1328{ 1329 TpoolResult *rPtr = tpoolPtr->workTail; 1330 1331 if (rPtr == NULL) { 1332 return NULL; 1333 } 1334 1335 tpoolPtr->workTail = rPtr->prevPtr; 1336 SpliceOut(rPtr, tpoolPtr->workHead); 1337 1338 rPtr->nextPtr = rPtr->prevPtr = NULL; 1339 1340 return rPtr; 1341} 1342 1343/* 1344 *---------------------------------------------------------------------- 1345 * 1346 * PushWaiter -- 1347 * 1348 * Adds a waiter thread to the end of the waiters list. 1349 * 1350 * Results: 1351 * None. 1352 * 1353 * Side effects: 1354 * None. 1355 * 1356 *---------------------------------------------------------------------- 1357 */ 1358 1359static void 1360PushWaiter(tpoolPtr) 1361 ThreadPool *tpoolPtr; 1362{ 1363 ThreadSpecificData *tsdPtr = TCL_TSD_INIT(&dataKey); 1364 1365 SpliceIn(tsdPtr->waitPtr, tpoolPtr->waitHead); 1366 if (tpoolPtr->waitTail == NULL) { 1367 tpoolPtr->waitTail = tsdPtr->waitPtr; 1368 } 1369} 1370 1371/* 1372 *---------------------------------------------------------------------- 1373 * 1374 * PopWaiter -- 1375 * 1376 * Pops the first waiter from the head of the waiters list. 1377 * 1378 * Results: 1379 * None. 1380 * 1381 * Side effects: 1382 * None. 1383 * 1384 *---------------------------------------------------------------------- 1385 */ 1386 1387static TpoolWaiter* 1388PopWaiter(tpoolPtr) 1389 ThreadPool *tpoolPtr; 1390{ 1391 TpoolWaiter *waitPtr = tpoolPtr->waitTail; 1392 1393 if (waitPtr == NULL) { 1394 return NULL; 1395 } 1396 1397 tpoolPtr->waitTail = waitPtr->prevPtr; 1398 SpliceOut(waitPtr, tpoolPtr->waitHead); 1399 1400 waitPtr->prevPtr = waitPtr->nextPtr = NULL; 1401 1402 return waitPtr; 1403} 1404 1405/* 1406 *---------------------------------------------------------------------- 1407 * 1408 * GetTpool 1409 * 1410 * Parses the Tcl threadpool handle and locates the 1411 * corresponding threadpool maintenance structure. 1412 * 1413 * Results: 1414 * Pointer to the threadpool struct or NULL if none found, 1415 * 1416 * Side effects: 1417 * None. 1418 * 1419 *---------------------------------------------------------------------- 1420 */ 1421static ThreadPool* 1422GetTpool(tpoolName) 1423 const char *tpoolName; 1424{ 1425 ThreadPool *tpoolPtr; 1426 1427 Tcl_MutexLock(&listMutex); 1428 tpoolPtr = GetTpoolUnl(tpoolName); 1429 Tcl_MutexUnlock(&listMutex); 1430 1431 return tpoolPtr; 1432} 1433 1434/* 1435 *---------------------------------------------------------------------- 1436 * 1437 * GetTpoolUnl 1438 * 1439 * Parses the threadpool handle and locates the 1440 * corresponding threadpool maintenance structure. 1441 * Assumes caller holds the listMutex, 1442 * 1443 * Results: 1444 * Pointer to the threadpool struct or NULL if none found, 1445 * 1446 * Side effects: 1447 * None. 1448 * 1449 *---------------------------------------------------------------------- 1450 */ 1451 1452static ThreadPool* 1453GetTpoolUnl (tpoolName) 1454 const char *tpoolName; 1455{ 1456 ThreadPool *tpool; 1457 ThreadPool *tpoolPtr = NULL; 1458 1459 if (sscanf(tpoolName, TPOOL_HNDLPREFIX"%p", &tpool) != 1) { 1460 return NULL; 1461 } 1462 for (tpoolPtr = tpoolList; tpoolPtr; tpoolPtr = tpoolPtr->nextPtr) { 1463 if (tpoolPtr == tpool) { 1464 break; 1465 } 1466 } 1467 1468 return tpoolPtr; 1469} 1470 1471/* 1472 *---------------------------------------------------------------------- 1473 * 1474 * TpoolEval 1475 * 1476 * Evaluates the script and fills in the result structure. 1477 * 1478 * Results: 1479 * Standard Tcl result, 1480 * 1481 * Side effects: 1482 * Many, depending on the script. 1483 * 1484 *---------------------------------------------------------------------- 1485 */ 1486static int 1487TpoolEval(interp, script, scriptLen, rPtr) 1488 Tcl_Interp *interp; 1489 char *script; 1490 int scriptLen; 1491 TpoolResult *rPtr; 1492{ 1493 int ret, reslen; 1494 char *result, *errorCode, *errorInfo; 1495 1496 ret = Tcl_EvalEx(interp, script, scriptLen, TCL_EVAL_GLOBAL); 1497 if (rPtr == NULL || rPtr->detached) { 1498 return ret; 1499 } 1500 rPtr->retcode = ret; 1501 if (ret == TCL_ERROR) { 1502 errorCode = (char*)Tcl_GetVar(interp, "errorCode", TCL_GLOBAL_ONLY); 1503 errorInfo = (char*)Tcl_GetVar(interp, "errorInfo", TCL_GLOBAL_ONLY); 1504 if (errorCode != NULL) { 1505 rPtr->errorCode = Tcl_Alloc(1 + strlen(errorCode)); 1506 strcpy(rPtr->errorCode, errorCode); 1507 } 1508 if (errorInfo != NULL) { 1509 rPtr->errorInfo = Tcl_Alloc(1 + strlen(errorInfo)); 1510 strcpy(rPtr->errorInfo, errorInfo); 1511 } 1512 } 1513 1514 result = (char*)Tcl_GetStringResult(interp); 1515 reslen = strlen(result); 1516 1517 if (reslen == 0) { 1518 rPtr->result = threadEmptyResult; 1519 } else { 1520 rPtr->result = strcpy(Tcl_Alloc(1 + reslen), result); 1521 } 1522 1523 return ret; 1524} 1525 1526/* 1527 *---------------------------------------------------------------------- 1528 * 1529 * SetResult 1530 * 1531 * Sets the result in current interpreter. 1532 * 1533 * Results: 1534 * Standard Tcl result, 1535 * 1536 * Side effects: 1537 * None. 1538 * 1539 *---------------------------------------------------------------------- 1540 */ 1541static void 1542SetResult(interp, rPtr) 1543 Tcl_Interp *interp; 1544 TpoolResult *rPtr; 1545{ 1546 if (rPtr->result) { 1547 if (rPtr->result == threadEmptyResult) { 1548 if (interp) { 1549 Tcl_ResetResult(interp); 1550 } 1551 } else { 1552 if (interp) { 1553 Tcl_SetObjResult(interp, Tcl_NewStringObj(rPtr->result,-1)); 1554 } 1555 Tcl_Free(rPtr->result); 1556 rPtr->result = NULL; 1557 } 1558 } 1559 if (rPtr->retcode == TCL_ERROR) { 1560 if (rPtr->errorCode) { 1561 if (interp) { 1562 Tcl_SetObjErrorCode(interp,Tcl_NewStringObj(rPtr->errorCode,-1)); 1563 } 1564 Tcl_Free(rPtr->errorCode); 1565 rPtr->errorCode = NULL; 1566 } 1567 if (rPtr->errorInfo) { 1568 if (interp) { 1569 Tcl_AddObjErrorInfo(interp, rPtr->errorInfo, -1); 1570 } 1571 Tcl_Free(rPtr->errorInfo); 1572 rPtr->errorInfo = NULL; 1573 } 1574 } 1575} 1576 1577/* 1578 *---------------------------------------------------------------------- 1579 * 1580 * TpoolReserve -- 1581 * 1582 * Does the pool preserve and/or release. Assumes caller holds 1583 * the listMutex. 1584 * 1585 * Results: 1586 * None. 1587 * 1588 * Side effects: 1589 * May tear-down the threadpool if refcount drops to 0 or below. 1590 * 1591 *---------------------------------------------------------------------- 1592 */ 1593static int 1594TpoolReserve(tpoolPtr) 1595 ThreadPool *tpoolPtr; 1596{ 1597 return ++tpoolPtr->refCount; 1598} 1599 1600/* 1601 *---------------------------------------------------------------------- 1602 * 1603 * TpoolRelease -- 1604 * 1605 * Does the pool preserve and/or release. Assumes caller holds 1606 * the listMutex. 1607 * 1608 * Results: 1609 * None. 1610 * 1611 * Side effects: 1612 * May tear-down the threadpool if refcount drops to 0 or below. 1613 * 1614 *---------------------------------------------------------------------- 1615 */ 1616static int 1617TpoolRelease(tpoolPtr) 1618 ThreadPool *tpoolPtr; 1619{ 1620 ThreadSpecificData *tsdPtr = TCL_TSD_INIT(&dataKey); 1621 TpoolResult *rPtr; 1622 Tcl_HashEntry *hPtr; 1623 Tcl_HashSearch search; 1624 1625 if (--tpoolPtr->refCount > 0) { 1626 return tpoolPtr->refCount; 1627 } 1628 1629 /* 1630 * Pool is going away; remove from the list of pools, 1631 */ 1632 1633 SpliceOut(tpoolPtr, tpoolList); 1634 InitWaiter(); 1635 1636 /* 1637 * Signal and wait for all workers to die. 1638 */ 1639 1640 tpoolPtr->tearDown = 1; 1641 Tcl_MutexLock(&tpoolPtr->mutex); 1642 while (tpoolPtr->numWorkers > 0) { 1643 PushWaiter(tpoolPtr); 1644 Tcl_ConditionNotify(&tpoolPtr->cond); 1645 Tcl_MutexUnlock(&tpoolPtr->mutex); 1646 tsdPtr->stop = -1; 1647 while(tsdPtr->stop == -1) { 1648 Tcl_DoOneEvent(TCL_ALL_EVENTS); 1649 } 1650 Tcl_MutexLock(&tpoolPtr->mutex); 1651 } 1652 Tcl_MutexUnlock(&tpoolPtr->mutex); 1653 1654 /* 1655 * Tear down the pool structure 1656 */ 1657 1658 if (tpoolPtr->initScript) { 1659 Tcl_Free(tpoolPtr->initScript); 1660 } 1661 if (tpoolPtr->exitScript) { 1662 Tcl_Free(tpoolPtr->exitScript); 1663 } 1664 1665 /* 1666 * Cleanup completed but not collected jobs 1667 */ 1668 1669 hPtr = Tcl_FirstHashEntry(&tpoolPtr->jobsDone, &search); 1670 while (hPtr != NULL) { 1671 rPtr = (TpoolResult*)Tcl_GetHashValue(hPtr); 1672 if (rPtr->result && rPtr->result != threadEmptyResult) { 1673 Tcl_Free(rPtr->result); 1674 } 1675 if (rPtr->retcode == TCL_ERROR) { 1676 if (rPtr->errorInfo) { 1677 Tcl_Free(rPtr->errorInfo); 1678 } 1679 if (rPtr->errorCode) { 1680 Tcl_Free(rPtr->errorCode); 1681 } 1682 } 1683 Tcl_Free((char*)rPtr); 1684 Tcl_DeleteHashEntry(hPtr); 1685 hPtr = Tcl_NextHashEntry(&search); 1686 } 1687 Tcl_DeleteHashTable(&tpoolPtr->jobsDone); 1688 1689 /* 1690 * Cleanup jobs posted but never completed. 1691 */ 1692 1693 for (rPtr = tpoolPtr->workHead; rPtr; rPtr = rPtr->nextPtr) { 1694 Tcl_Free(rPtr->script); 1695 Tcl_Free((char*)rPtr); 1696 } 1697 Tcl_MutexFinalize(&tpoolPtr->mutex); 1698 Tcl_ConditionFinalize(&tpoolPtr->cond); 1699 Tcl_Free((char*)tpoolPtr); 1700 1701 return 0; 1702} 1703 1704/* 1705 *---------------------------------------------------------------------- 1706 * 1707 * TpoolSuspend -- 1708 * 1709 * Marks the pool as suspended. This prevents pool workers to drain 1710 * the pool work queue. 1711 * 1712 * Results: 1713 * Value of the suspend flag (1 always). 1714 * 1715 * Side effects: 1716 * During the suspended state, pool worker threads wlll not timeout 1717 * even if the worker inactivity timer has been configured. 1718 * 1719 *---------------------------------------------------------------------- 1720 */ 1721static void 1722TpoolSuspend(tpoolPtr) 1723 ThreadPool *tpoolPtr; 1724{ 1725 Tcl_MutexLock(&tpoolPtr->mutex); 1726 tpoolPtr->suspend = 1; 1727 Tcl_MutexUnlock(&tpoolPtr->mutex); 1728} 1729 1730/* 1731 *---------------------------------------------------------------------- 1732 * 1733 * TpoolResume -- 1734 * 1735 * Clears the pool suspended state. This allows pool workers to drain 1736 * the pool work queue again. 1737 * 1738 * Results: 1739 * None. 1740 * 1741 * Side effects: 1742 * Pool workers may be started or awaken. 1743 * 1744 *---------------------------------------------------------------------- 1745 */ 1746static void 1747TpoolResume(tpoolPtr) 1748 ThreadPool *tpoolPtr; 1749{ 1750 Tcl_MutexLock(&tpoolPtr->mutex); 1751 tpoolPtr->suspend = 0; 1752 Tcl_ConditionNotify(&tpoolPtr->cond); 1753 Tcl_MutexUnlock(&tpoolPtr->mutex); 1754} 1755 1756/* 1757 *---------------------------------------------------------------------- 1758 * 1759 * SignalWaiter -- 1760 * 1761 * Signals the waiter thread. 1762 * 1763 * Results: 1764 * None. 1765 * 1766 * Side effects: 1767 * The waiter thread will exit from the event loop. 1768 * 1769 *---------------------------------------------------------------------- 1770 */ 1771static void 1772SignalWaiter(tpoolPtr) 1773 ThreadPool *tpoolPtr; 1774{ 1775 TpoolWaiter *waitPtr; 1776 Tcl_Event *evPtr; 1777 1778 waitPtr = PopWaiter(tpoolPtr); 1779 if (waitPtr == NULL) { 1780 return; 1781 } 1782 1783 evPtr = (Tcl_Event*)Tcl_Alloc(sizeof(Tcl_Event)); 1784 evPtr->proc = RunStopEvent; 1785 1786 Tcl_ThreadQueueEvent(waitPtr->threadId,(Tcl_Event*)evPtr,TCL_QUEUE_TAIL); 1787 Tcl_ThreadAlert(waitPtr->threadId); 1788} 1789 1790/* 1791 *---------------------------------------------------------------------- 1792 * 1793 * InitWaiter -- 1794 * 1795 * Setup poster thread to be able to wait in the event loop. 1796 * 1797 * Results: 1798 * None. 1799 * 1800 * Side effects: 1801 * None. 1802 * 1803 *---------------------------------------------------------------------- 1804 */ 1805static void 1806InitWaiter () 1807{ 1808 ThreadSpecificData *tsdPtr = TCL_TSD_INIT(&dataKey); 1809 1810 if (tsdPtr->waitPtr == NULL) { 1811 tsdPtr->waitPtr = (TpoolWaiter*)Tcl_Alloc(sizeof(TpoolWaiter)); 1812 tsdPtr->waitPtr->prevPtr = NULL; 1813 tsdPtr->waitPtr->nextPtr = NULL; 1814 tsdPtr->waitPtr->threadId = Tcl_GetCurrentThread(); 1815 Tcl_CreateThreadExitHandler(ThrExitHandler, (ClientData)tsdPtr); 1816 } 1817} 1818 1819/* 1820 *---------------------------------------------------------------------- 1821 * 1822 * ThrExitHandler -- 1823 * 1824 * Performs cleanup when a caller (poster) thread exits. 1825 * 1826 * Results: 1827 * None. 1828 * 1829 * Side effects: 1830 * None. 1831 * 1832 *---------------------------------------------------------------------- 1833 */ 1834static void 1835ThrExitHandler(clientData) 1836 ClientData clientData; 1837{ 1838 ThreadSpecificData *tsdPtr = (ThreadSpecificData *)clientData; 1839 1840 Tcl_Free((char*)tsdPtr->waitPtr); 1841} 1842 1843/* 1844 *---------------------------------------------------------------------- 1845 * 1846 * AppExitHandler 1847 * 1848 * Deletes all threadpools on application exit. 1849 * 1850 * Results: 1851 * None. 1852 * 1853 * Side effects: 1854 * None. 1855 * 1856 *---------------------------------------------------------------------- 1857 */ 1858static void 1859AppExitHandler(clientData) 1860 ClientData clientData; 1861{ 1862 ThreadPool *tpoolPtr; 1863 1864 Tcl_MutexLock(&listMutex); 1865 /* 1866 * Restart with head of list each time until empty. [Bug 1427570] 1867 */ 1868 for (tpoolPtr = tpoolList; tpoolPtr; tpoolPtr = tpoolList) { 1869 TpoolRelease(tpoolPtr); 1870 } 1871 Tcl_MutexUnlock(&listMutex); 1872} 1873 1874/* 1875 *---------------------------------------------------------------------- 1876 * 1877 * Tpool_Init -- 1878 * 1879 * Create commands in current interpreter. 1880 * 1881 * Results: 1882 * None. 1883 * 1884 * Side effects: 1885 * On first load, creates application exit handler to clean up 1886 * any threadpools left. 1887 * 1888 *---------------------------------------------------------------------- 1889 */ 1890 1891int 1892Tpool_Init (interp) 1893 Tcl_Interp *interp; /* Interp where to create cmds */ 1894{ 1895 static int initialized; 1896 1897 TCL_CMD(interp, TPOOL_CMD_PREFIX"create", TpoolCreateObjCmd); 1898 TCL_CMD(interp, TPOOL_CMD_PREFIX"names", TpoolNamesObjCmd); 1899 TCL_CMD(interp, TPOOL_CMD_PREFIX"post", TpoolPostObjCmd); 1900 TCL_CMD(interp, TPOOL_CMD_PREFIX"wait", TpoolWaitObjCmd); 1901 TCL_CMD(interp, TPOOL_CMD_PREFIX"cancel", TpoolCancelObjCmd); 1902 TCL_CMD(interp, TPOOL_CMD_PREFIX"get", TpoolGetObjCmd); 1903 TCL_CMD(interp, TPOOL_CMD_PREFIX"preserve", TpoolReserveObjCmd); 1904 TCL_CMD(interp, TPOOL_CMD_PREFIX"release", TpoolReleaseObjCmd); 1905 TCL_CMD(interp, TPOOL_CMD_PREFIX"suspend", TpoolSuspendObjCmd); 1906 TCL_CMD(interp, TPOOL_CMD_PREFIX"resume", TpoolResumeObjCmd); 1907 1908 if (initialized == 0) { 1909 Tcl_MutexLock(&listMutex); 1910 if (initialized == 0) { 1911 Tcl_CreateExitHandler(AppExitHandler, (ClientData)-1); 1912 initialized = 1; 1913 } 1914 Tcl_MutexUnlock(&listMutex); 1915 } 1916 return TCL_OK; 1917} 1918 1919/* EOF $RCSfile: threadPoolCmd.c,v $ */ 1920 1921/* Emacs Setup Variables */ 1922/* Local Variables: */ 1923/* mode: C */ 1924/* indent-tabs-mode: nil */ 1925/* c-basic-offset: 4 */ 1926/* End: */ 1927