1# 2# tpool.tcl -- 3# 4# Tcl implementation of a threadpool paradigm in pure Tcl using 5# the Tcl threading extension 2.5 (or higher). 6# 7# This file is for example purposes only. The efficient C-level 8# threadpool implementation is already a part of the threading 9# extension starting with 2.5 version. Both implementations have 10# the same Tcl API so both can be used interchangeably. Goal of 11# this implementation is to serve as an example of using the Tcl 12# extension to implement some very common threading paradigms. 13# 14# Beware: with time, as improvements are made to the C-level 15# implementation, this Tcl one might lag behind. 16# Please consider this code as a working example only. 17# 18# 19# 20# Copyright (c) 2002 by Zoran Vasiljevic. 21# 22# See the file "license.terms" for information on usage and 23# redistribution of this file, and for a DISCLAIMER OF ALL WARRANTIES. 24# 25# ----------------------------------------------------------------------------- 26# RCS: @(#) $Id: tpool.tcl,v 1.8 2006/10/07 09:05:17 vasiljevic Exp $ 27# 28 29package require Thread 2.5 30set thisScript [info script] 31 32namespace eval tpool { 33 34 variable afterevent "" ; # Idle timer event for worker threads 35 variable result ; # Stores result from the worker thread 36 variable waiter ; # Waits for an idle worker thread 37 variable jobsdone ; # Accumulates results from worker threads 38 39 # 40 # Create shared array with a single element. 41 # It is used for automatic pool handles creation. 42 # 43 44 set ns [namespace current] 45 tsv::lock $ns { 46 if {[tsv::exists $ns count] == 0} { 47 tsv::set $ns count 0 48 } 49 tsv::set $ns count -1 50 } 51 variable thisScript [info script] 52} 53 54# 55# tpool::create -- 56# 57# Creates instance of a thread pool. 58# 59# Arguments: 60# args Variable number of key/value arguments, as follows: 61# 62# -minworkers minimum # of worker threads (def:0) 63# -maxworkers maximum # of worker threads (def:4) 64# -idletime # of sec worker is idle before exiting (def:0 = never) 65# -initcmd script used to initialize new worker thread 66# -exitcmd script run at worker thread exit 67# 68# Side Effects: 69# Might create many new threads if "-minworkers" option is > 0. 70# 71# Results: 72# The id of the newly created thread pool. This id must be used 73# in all other tpool::* commands. 74# 75 76proc tpool::create {args} { 77 78 variable thisScript 79 80 # 81 # Get next threadpool handle and create the pool array. 82 # 83 84 set usage "wrong \# args: should be \"[lindex [info level 1] 0]\ 85 ?-minworkers count? ?-maxworkers count?\ 86 ?-initcmd script? ?-exitcmd script?\ 87 ?-idletime seconds?\"" 88 89 set ns [namespace current] 90 set tpid [namespace tail $ns][tsv::incr $ns count] 91 92 tsv::lock $tpid { 93 tsv::set $tpid name $tpid 94 } 95 96 # 97 # Setup default pool data. 98 # 99 100 tsv::array set $tpid { 101 thrworkers "" 102 thrwaiters "" 103 jobcounter 0 104 refcounter 0 105 numworkers 0 106 -minworkers 0 107 -maxworkers 4 108 -idletime 0 109 -initcmd "" 110 -exitcmd "" 111 } 112 113 tsv::set $tpid -initcmd "source $thisScript" 114 115 # 116 # Override with user-supplied data 117 # 118 119 if {[llength $args] % 2} { 120 error $usage 121 } 122 123 foreach {arg val} $args { 124 switch -- $arg { 125 -minworkers - 126 -maxworkers {tsv::set $tpid $arg $val} 127 -idletime {tsv::set $tpid $arg [expr {$val*1000}]} 128 -initcmd {tsv::append $tpid $arg \n $val} 129 -exitcmd {tsv::append $tpid $arg \n $val} 130 default { 131 error $usage 132 } 133 } 134 } 135 136 # 137 # Start initial (minimum) number of worker threads. 138 # 139 140 for {set ii 0} {$ii < [tsv::set $tpid -minworkers]} {incr ii} { 141 Worker $tpid 142 } 143 144 return $tpid 145} 146 147# 148# tpool::names -- 149# 150# Returns list of currently created threadpools 151# 152# Arguments: 153# None. 154# 155# Side Effects: 156# None. 157# 158# Results 159# List of active threadpoool identifiers or empty if none found 160# 161# 162 163proc tpool::names {} { 164 tsv::names [namespace tail [namespace current]]* 165} 166 167# 168# tpool::post -- 169# 170# Submits the new job to the thread pool. The caller might pass 171# the job in two modes: synchronous and asynchronous. 172# For the synchronous mode, the pool implementation will retain 173# the result of the passed script until the caller collects it 174# using the "thread::get" command. 175# For the asynchronous mode, the result of the script is ignored. 176# 177# Arguments: 178# args Variable # of arguments with the following syntax: 179# tpool::post ?-detached? tpid script 180# 181# -detached flag to turn the async operation (ignore result) 182# tpid the id of the thread pool 183# script script to pass to the worker thread for execution 184# 185# Side Effects: 186# Depends on the passed script. 187# 188# Results: 189# The id of the posted job. This id is used later on to collect 190# result of the job and set local variables accordingly. 191# For asynchronously posted jobs, the return result is ignored 192# and this function returns empty result. 193# 194 195proc tpool::post {args} { 196 197 # 198 # Parse command arguments. 199 # 200 201 set ns [namespace current] 202 set usage "wrong \# args: should be \"[lindex [info level 1] 0]\ 203 ?-detached? tpoolId script\"" 204 205 if {[llength $args] == 2} { 206 set detached 0 207 set tpid [lindex $args 0] 208 set cmd [lindex $args 1] 209 } elseif {[llength $args] == 3} { 210 if {[lindex $args 0] != "-detached"} { 211 error $usage 212 } 213 set detached 1 214 set tpid [lindex $args 1] 215 set cmd [lindex $args 2] 216 } else { 217 error $usage 218 } 219 220 # 221 # Find idle (or create new) worker thread. This is relatively 222 # a complex issue, since we must honour the limits about number 223 # of allowed worker threads imposed to us by the caller. 224 # 225 226 set tid "" 227 228 while {$tid == ""} { 229 tsv::lock $tpid { 230 set tid [tsv::lpop $tpid thrworkers] 231 if {$tid == "" || [catch {thread::preserve $tid}]} { 232 set tid "" 233 tsv::lpush $tpid thrwaiters [thread::id] end 234 if {[tsv::set $tpid numworkers]<[tsv::set $tpid -maxworkers]} { 235 Worker $tpid 236 } 237 } 238 } 239 if {$tid == ""} { 240 vwait ${ns}::waiter 241 } 242 } 243 244 # 245 # Post the command to the worker thread 246 # 247 248 if {$detached} { 249 set j "" 250 thread::send -async $tid [list ${ns}::Run $tpid 0 $cmd] 251 } else { 252 set j [tsv::incr $tpid jobcounter] 253 thread::send -async $tid [list ${ns}::Run $tpid $j $cmd] ${ns}::result 254 } 255 256 variable jobsdone 257 set jobsdone($j) "" 258 259 return $j 260} 261 262# 263# tpool::wait -- 264# 265# Waits for jobs sent with "thread::post" to finish. 266# 267# Arguments: 268# tpid Name of the pool shared array. 269# jobList List of job id's done. 270# jobLeft List of jobs still pending. 271# 272# Side Effects: 273# Might eventually enter the event loop while waiting 274# for the job result to arrive from the worker thread. 275# It ignores bogus job ids. 276# 277# Results: 278# Result of the job. If the job resulted in error, it sets 279# the global errorInfo and errorCode variables accordingly. 280# 281 282proc tpool::wait {tpid jobList {jobLeft ""}} { 283 284 variable result 285 variable jobsdone 286 287 if {$jobLeft != ""} { 288 upvar $jobLeft jobleft 289 } 290 291 set retlist "" 292 set jobleft "" 293 294 foreach j $jobList { 295 if {[info exists jobsdone($j)] == 0} { 296 continue ; # Ignore (skip) bogus job ids 297 } 298 if {$jobsdone($j) != ""} { 299 lappend retlist $j 300 } else { 301 lappend jobleft $j 302 } 303 } 304 if {[llength $retlist] == 0 && [llength $jobList]} { 305 # 306 # No jobs found; wait for the first one to get ready. 307 # 308 set jobleft $jobList 309 while {1} { 310 vwait [namespace current]::result 311 set doneid [lindex $result 0] 312 set jobsdone($doneid) $result 313 if {[lsearch $jobList $doneid] >= 0} { 314 lappend retlist $doneid 315 set x [lsearch $jobleft $doneid] 316 set jobleft [lreplace $jobleft $x $x] 317 break 318 } 319 } 320 } 321 322 return $retlist 323} 324 325# 326# tpool::get -- 327# 328# Waits for a job sent with "thread::post" to finish. 329# 330# Arguments: 331# tpid Name of the pool shared array. 332# jobid Id of the previously posted job. 333# 334# Side Effects: 335# None. 336# 337# Results: 338# Result of the job. If the job resulted in error, it sets 339# the global errorInfo and errorCode variables accordingly. 340# 341 342proc tpool::get {tpid jobid} { 343 344 variable jobsdone 345 346 if {[lindex $jobsdone($jobid) 1] != 0} { 347 eval error [lrange $jobsdone($jobid) 2 end] 348 } 349 350 return [lindex $jobsdone($jobid) 2] 351} 352 353# 354# tpool::preserve -- 355# 356# Increments the reference counter of the threadpool, reserving it 357# for the private usage.. 358# 359# Arguments: 360# tpid Name of the pool shared array. 361# 362# Side Effects: 363# None. 364# 365# Results: 366# Current number of threadpool reservations. 367# 368 369proc tpool::preserve {tpid} { 370 tsv::incr $tpid refcounter 371} 372 373# 374# tpool::release -- 375# 376# Decrements the reference counter of the threadpool, eventually 377# tearing the pool down if this was the last reservation. 378# 379# Arguments: 380# tpid Name of the pool shared array. 381# 382# Side Effects: 383# If the number of reservations drops to zero or below 384# the threadpool is teared down. 385# 386# Results: 387# Current number of threadpool reservations. 388# 389 390proc tpool::release {tpid} { 391 392 tsv::lock $tpid { 393 if {[tsv::incr $tpid refcounter -1] <= 0} { 394 # Release all workers threads 395 foreach t [tsv::set $tpid thrworkers] { 396 thread::release -wait $t 397 } 398 tsv::unset $tpid ; # This is not an error; it works! 399 } 400 } 401} 402 403# 404# Private procedures, not a part of the threadpool API. 405# 406 407# 408# tpool::Worker -- 409# 410# Creates new worker thread. This procedure must be executed 411# under the tsv lock. 412# 413# Arguments: 414# tpid Name of the pool shared array. 415# 416# Side Effects: 417# Depends on the thread initialization script. 418# 419# Results: 420# None. 421# 422 423proc tpool::Worker {tpid} { 424 425 # 426 # Create new worker thread 427 # 428 429 set tid [thread::create] 430 431 thread::send $tid [tsv::set $tpid -initcmd] 432 thread::preserve $tid 433 434 tsv::incr $tpid numworkers 435 tsv::lpush $tpid thrworkers $tid 436 437 # 438 # Signalize waiter threads if any 439 # 440 441 set waiter [tsv::lpop $tpid thrwaiters] 442 if {$waiter != ""} { 443 thread::send -async $waiter [subst { 444 set [namespace current]::waiter 1 445 }] 446 } 447} 448 449# 450# tpool::Timer -- 451# 452# This procedure should be executed within the worker thread only. 453# It registers the callback for terminating the idle thread. 454# 455# Arguments: 456# tpid Name of the pool shared array. 457# 458# Side Effects: 459# Thread may eventually exit. 460# 461# Results: 462# None. 463# 464 465proc tpool::Timer {tpid} { 466 467 tsv::lock $tpid { 468 if {[tsv::set $tpid numworkers] > [tsv::set $tpid -minworkers]} { 469 470 # 471 # We have more workers than needed, so kill this one. 472 # We first splice ourselves from the list of active 473 # workers, adjust the number of workers and release 474 # this thread, which may exit eventually. 475 # 476 477 set x [tsv::lsearch $tpid thrworkers [thread::id]] 478 if {$x >= 0} { 479 tsv::lreplace $tpid thrworkers $x $x 480 tsv::incr $tpid numworkers -1 481 set exitcmd [tsv::set $tpid -exitcmd] 482 if {$exitcmd != ""} { 483 catch {eval $exitcmd} 484 } 485 thread::release 486 } 487 } 488 } 489} 490 491# 492# tpool::Run -- 493# 494# This procedure should be executed within the worker thread only. 495# It performs the actual command execution in the worker thread. 496# 497# Arguments: 498# tpid Name of the pool shared array. 499# jid The job id 500# cmd The command to execute 501# 502# Side Effects: 503# Many, depending of the passed command 504# 505# Results: 506# List for passing the evaluation result and status back. 507# 508 509proc tpool::Run {tpid jid cmd} { 510 511 # 512 # Cancel the idle timer callback, if any. 513 # 514 515 variable afterevent 516 if {$afterevent != ""} { 517 after cancel $afterevent 518 } 519 520 # 521 # Evaluate passed command and build the result list. 522 # 523 524 set code [catch {uplevel \#0 $cmd} ret] 525 if {$code == 0} { 526 set res [list $jid 0 $ret] 527 } else { 528 set res [list $jid $code $ret $::errorInfo $::errorCode] 529 } 530 531 # 532 # Check to see if any caller is waiting to be serviced. 533 # If yes, kick it out of the waiting state. 534 # 535 536 set ns [namespace current] 537 538 tsv::lock $tpid { 539 tsv::lpush $tpid thrworkers [thread::id] 540 set waiter [tsv::lpop $tpid thrwaiters] 541 if {$waiter != ""} { 542 thread::send -async $waiter [subst { 543 set ${ns}::waiter 1 544 }] 545 } 546 } 547 548 # 549 # Release the thread. If this turns out to be 550 # the last refcount held, don't bother to do 551 # any more work, since thread will soon exit. 552 # 553 554 if {[thread::release] <= 0} { 555 return $res 556 } 557 558 # 559 # Register the idle timer again. 560 # 561 562 if {[set idle [tsv::set $tpid -idletime]]} { 563 set afterevent [after $idle [subst { 564 ${ns}::Timer $tpid 565 }]] 566 } 567 568 return $res 569} 570 571# EOF $RCSfile: tpool.tcl,v $ 572 573# Emacs Setup Variables 574# Local Variables: 575# mode: Tcl 576# indent-tabs-mode: nil 577# tcl-basic-offset: 4 578# End: 579 580