1#
2# tpool.tcl --
3#
4# Tcl implementation of a threadpool paradigm in pure Tcl using
5# the Tcl threading extension 2.5 (or higher).
6#
7# This file is for example purposes only. The efficient C-level
8# threadpool implementation is already a part of the threading
9# extension starting with 2.5 version. Both implementations have
10# the same Tcl API so both can be used interchangeably. Goal of
11# this implementation is to serve as an example of using the Tcl
12# extension to implement some very common threading paradigms.
13#
14# Beware: with time, as improvements are made to the C-level
15# implementation, this Tcl one might lag behind.
16# Please consider this code as a working example only.
17#
18#
19#
20# Copyright (c) 2002 by Zoran Vasiljevic.
21#
22# See the file "license.terms" for information on usage and
23# redistribution of this file, and for a DISCLAIMER OF ALL WARRANTIES.
24#
25# -----------------------------------------------------------------------------
26# RCS: @(#) $Id: tpool.tcl,v 1.8 2006/10/07 09:05:17 vasiljevic Exp $
27#
28
29package require Thread 2.5
30set thisScript [info script]
31
32namespace eval tpool {
33
34    variable afterevent "" ; # Idle timer event for worker threads
35    variable result        ; # Stores result from the worker thread
36    variable waiter        ; # Waits for an idle worker thread
37    variable jobsdone      ; # Accumulates results from worker threads
38
39    #
40    # Create shared array with a single element.
41    # It is used for automatic pool handles creation.
42    #
43
44    set ns [namespace current]
45    tsv::lock $ns {
46        if {[tsv::exists $ns count] == 0} {
47            tsv::set $ns count 0
48        }
49        tsv::set $ns count -1
50    }
51    variable thisScript [info script]
52}
53
54#
55# tpool::create --
56#
57#   Creates instance of a thread pool.
58#
59# Arguments:
60#   args Variable number of key/value arguments, as follows:
61#
62#        -minworkers  minimum # of worker threads (def:0)
63#        -maxworkers  maximum # of worker threads (def:4)
64#        -idletime    # of sec worker is idle before exiting (def:0 = never)
65#        -initcmd     script used to initialize new worker thread
66#        -exitcmd     script run at worker thread exit
67#
68# Side Effects:
69#   Might create many new threads if "-minworkers" option is > 0.
70#
71# Results:
72#   The id of the newly created thread pool. This id must be used
73#   in all other tpool::* commands.
74#
75
76proc tpool::create {args} {
77
78    variable thisScript
79
80    #
81    # Get next threadpool handle and create the pool array.
82    #
83
84    set usage "wrong \# args: should be \"[lindex [info level 1] 0]\
85               ?-minworkers count? ?-maxworkers count?\
86               ?-initcmd script? ?-exitcmd script?\
87               ?-idletime seconds?\""
88
89    set ns [namespace current]
90    set tpid [namespace tail $ns][tsv::incr $ns count]
91
92    tsv::lock $tpid {
93        tsv::set $tpid name $tpid
94    }
95
96    #
97    # Setup default pool data.
98    #
99
100    tsv::array set $tpid {
101         thrworkers  ""
102         thrwaiters  ""
103         jobcounter  0
104         refcounter  0
105         numworkers  0
106        -minworkers  0
107        -maxworkers  4
108        -idletime    0
109        -initcmd    ""
110        -exitcmd    ""
111    }
112
113    tsv::set $tpid -initcmd  "source $thisScript"
114
115    #
116    # Override with user-supplied data
117    #
118
119    if {[llength $args] % 2} {
120        error $usage
121    }
122
123    foreach {arg val} $args {
124        switch -- $arg {
125            -minworkers -
126            -maxworkers {tsv::set $tpid $arg $val}
127            -idletime   {tsv::set $tpid $arg [expr {$val*1000}]}
128            -initcmd    {tsv::append $tpid $arg \n $val}
129            -exitcmd    {tsv::append $tpid $arg \n $val}
130            default {
131                error $usage
132            }
133        }
134    }
135
136    #
137    # Start initial (minimum) number of worker threads.
138    #
139
140    for {set ii 0} {$ii < [tsv::set $tpid -minworkers]} {incr ii} {
141        Worker $tpid
142    }
143
144    return $tpid
145}
146
147#
148# tpool::names --
149#
150#   Returns list of currently created threadpools
151#
152# Arguments:
153#   None.
154#
155# Side Effects:
156#   None.
157#
158# Results
159#   List of active threadpoool identifiers or empty if none found
160#
161#
162
163proc tpool::names {} {
164    tsv::names [namespace tail [namespace current]]*
165}
166
167#
168# tpool::post --
169#
170#   Submits the new job to the thread pool. The caller might pass
171#   the job in two modes: synchronous and asynchronous.
172#   For the synchronous mode, the pool implementation will retain
173#   the result of the passed script until the caller collects it
174#   using the "thread::get" command.
175#   For the asynchronous mode, the result of the script is ignored.
176#
177# Arguments:
178#   args   Variable # of arguments with the following syntax:
179#          tpool::post ?-detached? tpid script
180#
181#          -detached  flag to turn the async operation (ignore result)
182#          tpid       the id of the thread pool
183#          script     script to pass to the worker thread for execution
184#
185# Side Effects:
186#   Depends on the passed script.
187#
188# Results:
189#   The id of the posted job. This id is used later on to collect
190#   result of the job and set local variables accordingly.
191#   For asynchronously posted jobs, the return result is ignored
192#   and this function returns empty result.
193#
194
195proc tpool::post {args} {
196
197    #
198    # Parse command arguments.
199    #
200
201    set ns [namespace current]
202    set usage "wrong \# args: should be \"[lindex [info level 1] 0]\
203               ?-detached? tpoolId script\""
204
205    if {[llength $args] == 2} {
206        set detached 0
207        set tpid [lindex $args 0]
208        set cmd  [lindex $args 1]
209    } elseif {[llength $args] == 3} {
210        if {[lindex $args 0] != "-detached"} {
211            error $usage
212        }
213        set detached 1
214        set tpid [lindex $args 1]
215        set cmd  [lindex $args 2]
216    } else {
217        error $usage
218    }
219
220    #
221    # Find idle (or create new) worker thread. This is relatively
222    # a complex issue, since we must honour the limits about number
223    # of allowed worker threads imposed to us by the caller.
224    #
225
226    set tid ""
227
228    while {$tid == ""} {
229        tsv::lock $tpid {
230            set tid [tsv::lpop $tpid thrworkers]
231            if {$tid == "" || [catch {thread::preserve $tid}]} {
232                set tid ""
233                tsv::lpush $tpid thrwaiters [thread::id] end
234                if {[tsv::set $tpid numworkers]<[tsv::set $tpid -maxworkers]} {
235                    Worker $tpid
236                }
237            }
238        }
239        if {$tid == ""} {
240            vwait ${ns}::waiter
241        }
242    }
243
244    #
245    # Post the command to the worker thread
246    #
247
248    if {$detached} {
249        set j ""
250        thread::send -async $tid [list ${ns}::Run $tpid 0 $cmd]
251    } else {
252        set j [tsv::incr $tpid jobcounter]
253        thread::send -async $tid [list ${ns}::Run $tpid $j $cmd] ${ns}::result
254    }
255
256    variable jobsdone
257    set jobsdone($j) ""
258
259    return $j
260}
261
262#
263# tpool::wait --
264#
265#   Waits for jobs sent with "thread::post" to finish.
266#
267# Arguments:
268#   tpid     Name of the pool shared array.
269#   jobList  List of job id's done.
270#   jobLeft  List of jobs still pending.
271#
272# Side Effects:
273#   Might eventually enter the event loop while waiting
274#   for the job result to arrive from the worker thread.
275#   It ignores bogus job ids.
276#
277# Results:
278#   Result of the job. If the job resulted in error, it sets
279#   the global errorInfo and errorCode variables accordingly.
280#
281
282proc tpool::wait {tpid jobList {jobLeft ""}} {
283
284    variable result
285    variable jobsdone
286
287    if {$jobLeft != ""} {
288        upvar $jobLeft jobleft
289    }
290
291    set retlist ""
292    set jobleft ""
293
294    foreach j $jobList {
295        if {[info exists jobsdone($j)] == 0} {
296            continue ; # Ignore (skip) bogus job ids
297        }
298        if {$jobsdone($j) != ""} {
299            lappend retlist $j
300        } else {
301            lappend jobleft $j
302        }
303    }
304    if {[llength $retlist] == 0 && [llength $jobList]} {
305        #
306        # No jobs found; wait for the first one to get ready.
307        #
308        set jobleft $jobList
309        while {1} {
310            vwait [namespace current]::result
311            set doneid [lindex $result 0]
312            set jobsdone($doneid) $result
313            if {[lsearch $jobList $doneid] >= 0} {
314                lappend retlist $doneid
315                set x [lsearch $jobleft $doneid]
316                set jobleft [lreplace $jobleft $x $x]
317                break
318            }
319        }
320    }
321
322    return $retlist
323}
324
325#
326# tpool::get --
327#
328#   Waits for a job sent with "thread::post" to finish.
329#
330# Arguments:
331#   tpid   Name of the pool shared array.
332#   jobid  Id of the previously posted job.
333#
334# Side Effects:
335#   None.
336#
337# Results:
338#   Result of the job. If the job resulted in error, it sets
339#   the global errorInfo and errorCode variables accordingly.
340#
341
342proc tpool::get {tpid jobid} {
343
344    variable jobsdone
345
346    if {[lindex $jobsdone($jobid) 1] != 0} {
347        eval error [lrange $jobsdone($jobid) 2 end]
348    }
349
350    return [lindex $jobsdone($jobid) 2]
351}
352
353#
354# tpool::preserve --
355#
356#   Increments the reference counter of the threadpool, reserving it
357#   for the private usage..
358#
359# Arguments:
360#   tpid   Name of the pool shared array.
361#
362# Side Effects:
363#   None.
364#
365# Results:
366#   Current number of threadpool reservations.
367#
368
369proc tpool::preserve {tpid} {
370    tsv::incr $tpid refcounter
371}
372
373#
374# tpool::release --
375#
376#   Decrements the reference counter of the threadpool, eventually
377#   tearing the pool down if this was the last reservation.
378#
379# Arguments:
380#   tpid   Name of the pool shared array.
381#
382# Side Effects:
383#   If the number of reservations drops to zero or below
384#   the threadpool is teared down.
385#
386# Results:
387#   Current number of threadpool reservations.
388#
389
390proc tpool::release {tpid} {
391
392    tsv::lock $tpid {
393        if {[tsv::incr $tpid refcounter -1] <= 0} {
394            # Release all workers threads
395            foreach t [tsv::set $tpid thrworkers] {
396                thread::release -wait $t
397            }
398            tsv::unset $tpid ; # This is not an error; it works!
399        }
400    }
401}
402
403#
404# Private procedures, not a part of the threadpool API.
405#
406
407#
408# tpool::Worker --
409#
410#   Creates new worker thread. This procedure must be executed
411#   under the tsv lock.
412#
413# Arguments:
414#   tpid  Name of the pool shared array.
415#
416# Side Effects:
417#   Depends on the thread initialization script.
418#
419# Results:
420#   None.
421#
422
423proc tpool::Worker {tpid} {
424
425    #
426    # Create new worker thread
427    #
428
429    set tid [thread::create]
430
431    thread::send $tid [tsv::set $tpid -initcmd]
432    thread::preserve $tid
433
434    tsv::incr  $tpid numworkers
435    tsv::lpush $tpid thrworkers $tid
436
437    #
438    # Signalize waiter threads if any
439    #
440
441    set waiter [tsv::lpop $tpid thrwaiters]
442    if {$waiter != ""} {
443        thread::send -async $waiter [subst {
444            set [namespace current]::waiter 1
445        }]
446    }
447}
448
449#
450# tpool::Timer --
451#
452#   This procedure should be executed within the worker thread only.
453#   It registers the callback for terminating the idle thread.
454#
455# Arguments:
456#   tpid  Name of the pool shared array.
457#
458# Side Effects:
459#   Thread may eventually exit.
460#
461# Results:
462#   None.
463#
464
465proc tpool::Timer {tpid} {
466
467    tsv::lock $tpid {
468        if {[tsv::set $tpid  numworkers] > [tsv::set $tpid -minworkers]} {
469
470            #
471            # We have more workers than needed, so kill this one.
472            # We first splice ourselves from the list of active
473            # workers, adjust the number of workers and release
474            # this thread, which may exit eventually.
475            #
476
477            set x [tsv::lsearch $tpid thrworkers [thread::id]]
478            if {$x >= 0} {
479                tsv::lreplace $tpid thrworkers $x $x
480                tsv::incr $tpid numworkers -1
481                set exitcmd [tsv::set $tpid -exitcmd]
482                if {$exitcmd != ""} {
483                    catch {eval $exitcmd}
484                }
485                thread::release
486            }
487        }
488    }
489}
490
491#
492# tpool::Run --
493#
494#   This procedure should be executed within the worker thread only.
495#   It performs the actual command execution in the worker thread.
496#
497# Arguments:
498#   tpid  Name of the pool shared array.
499#   jid   The job id
500#   cmd   The command to execute
501#
502# Side Effects:
503#   Many, depending of the passed command
504#
505# Results:
506#   List for passing the evaluation result and status back.
507#
508
509proc tpool::Run {tpid jid cmd} {
510
511    #
512    # Cancel the idle timer callback, if any.
513    #
514
515    variable afterevent
516    if {$afterevent != ""} {
517        after cancel $afterevent
518    }
519
520    #
521    # Evaluate passed command and build the result list.
522    #
523
524    set code [catch {uplevel \#0 $cmd} ret]
525    if {$code == 0} {
526        set res [list $jid 0 $ret]
527    } else {
528        set res [list $jid $code $ret $::errorInfo $::errorCode]
529    }
530
531    #
532    # Check to see if any caller is waiting to be serviced.
533    # If yes, kick it out of the waiting state.
534    #
535
536    set ns [namespace current]
537
538    tsv::lock $tpid {
539        tsv::lpush $tpid thrworkers [thread::id]
540        set waiter [tsv::lpop $tpid thrwaiters]
541        if {$waiter != ""} {
542            thread::send -async $waiter [subst {
543                set ${ns}::waiter 1
544            }]
545        }
546    }
547
548    #
549    # Release the thread. If this turns out to be
550    # the last refcount held, don't bother to do
551    # any more work, since thread will soon exit.
552    #
553
554    if {[thread::release] <= 0} {
555        return $res
556    }
557
558    #
559    # Register the idle timer again.
560    #
561
562    if {[set idle [tsv::set $tpid -idletime]]} {
563        set afterevent [after $idle [subst {
564            ${ns}::Timer $tpid
565        }]]
566    }
567
568    return $res
569}
570
571# EOF $RCSfile: tpool.tcl,v $
572
573# Emacs Setup Variables
574# Local Variables:
575# mode: Tcl
576# indent-tabs-mode: nil
577# tcl-basic-offset: 4
578# End:
579
580