1require 'thread'
2require 'set'
3
4require 'rake/promise'
5
6module Rake
7
8  class ThreadPool              # :nodoc: all
9
10    # Creates a ThreadPool object.
11    # The parameter is the size of the pool.
12    def initialize(thread_count)
13      @max_active_threads = [thread_count, 0].max
14      @threads = Set.new
15      @threads_mon = Monitor.new
16      @queue = Queue.new
17      @join_cond = @threads_mon.new_cond
18
19      @history_start_time = nil
20      @history = []
21      @history_mon = Monitor.new
22      @total_threads_in_play = 0
23    end
24
25    # Creates a future executed by the +ThreadPool+.
26    #
27    # The args are passed to the block when executing (similarly to
28    # <tt>Thread#new</tt>) The return value is an object representing
29    # a future which has been created and added to the queue in the
30    # pool. Sending <tt>#value</tt> to the object will sleep the
31    # current thread until the future is finished and will return the
32    # result (or raise an exception thrown from the future)
33    def future(*args, &block)
34      promise = Promise.new(args, &block)
35      promise.recorder = lambda { |*stats| stat(*stats) }
36
37      @queue.enq promise
38      stat :queued, :item_id => promise.object_id
39      start_thread
40      promise
41    end
42
43    # Waits until the queue of futures is empty and all threads have exited.
44    def join
45      @threads_mon.synchronize do
46        begin
47          stat :joining
48          @join_cond.wait unless @threads.empty?
49          stat :joined
50        rescue Exception => e
51          stat :joined
52          $stderr.puts e
53          $stderr.print "Queue contains #{@queue.size} items. Thread pool contains #{@threads.count} threads\n"
54          $stderr.print "Current Thread #{Thread.current} status = #{Thread.current.status}\n"
55          $stderr.puts e.backtrace.join("\n")
56          @threads.each do |t|
57            $stderr.print "Thread #{t} status = #{t.status}\n"
58            # 1.8 doesn't support Thread#backtrace
59            $stderr.puts t.backtrace.join("\n") if t.respond_to? :backtrace
60          end
61          raise e
62        end
63      end
64    end
65
66    # Enable the gathering of history events.
67    def gather_history          #:nodoc:
68      @history_start_time = Time.now if @history_start_time.nil?
69    end
70
71    # Return a array of history events for the thread pool.
72    #
73    # History gathering must be enabled to be able to see the events
74    # (see #gather_history). Best to call this when the job is
75    # complete (i.e. after ThreadPool#join is called).
76    def history                 # :nodoc:
77      @history_mon.synchronize { @history.dup }.
78        sort_by { |i| i[:time] }.
79        each { |i| i[:time] -= @history_start_time }
80    end
81
82    # Return a hash of always collected statistics for the thread pool.
83    def statistics              #  :nodoc:
84      {
85        :total_threads_in_play => @total_threads_in_play,
86        :max_active_threads => @max_active_threads,
87      }
88    end
89
90    private
91
92    # processes one item on the queue. Returns true if there was an
93    # item to process, false if there was no item
94    def process_queue_item      #:nodoc:
95      return false if @queue.empty?
96
97      # Even though we just asked if the queue was empty, it
98      # still could have had an item which by this statement
99      # is now gone. For this reason we pass true to Queue#deq
100      # because we will sleep indefinitely if it is empty.
101      promise = @queue.deq(true)
102      stat :dequeued, :item_id => promise.object_id
103      promise.work
104      return true
105
106      rescue ThreadError # this means the queue is empty
107      false
108    end
109
110    def start_thread # :nodoc:
111      @threads_mon.synchronize do
112        next unless @threads.count < @max_active_threads
113
114        t = Thread.new do
115          begin
116            while @threads.count <= @max_active_threads
117              break unless process_queue_item
118            end
119          ensure
120            @threads_mon.synchronize do
121              @threads.delete Thread.current
122              stat :ended, :thread_count => @threads.count
123              @join_cond.broadcast if @threads.empty?
124            end
125          end
126        end
127        @threads << t
128        stat :spawned, :new_thread => t.object_id, :thread_count => @threads.count
129        @total_threads_in_play = @threads.count if @threads.count > @total_threads_in_play
130      end
131    end
132
133    def stat(event, data=nil) # :nodoc:
134      return if @history_start_time.nil?
135      info = {
136        :event  => event,
137        :data   => data,
138        :time   => Time.now,
139        :thread => Thread.current.object_id,
140      }
141      @history_mon.synchronize { @history << info }
142    end
143
144    # for testing only
145
146    def __queue__ # :nodoc:
147      @queue
148    end
149
150    def __threads__ # :nodoc:
151      @threads.dup
152    end
153  end
154
155end
156