1require 'thread' 2require 'set' 3 4require 'rake/promise' 5 6module Rake 7 8 class ThreadPool # :nodoc: all 9 10 # Creates a ThreadPool object. 11 # The parameter is the size of the pool. 12 def initialize(thread_count) 13 @max_active_threads = [thread_count, 0].max 14 @threads = Set.new 15 @threads_mon = Monitor.new 16 @queue = Queue.new 17 @join_cond = @threads_mon.new_cond 18 19 @history_start_time = nil 20 @history = [] 21 @history_mon = Monitor.new 22 @total_threads_in_play = 0 23 end 24 25 # Creates a future executed by the +ThreadPool+. 26 # 27 # The args are passed to the block when executing (similarly to 28 # <tt>Thread#new</tt>) The return value is an object representing 29 # a future which has been created and added to the queue in the 30 # pool. Sending <tt>#value</tt> to the object will sleep the 31 # current thread until the future is finished and will return the 32 # result (or raise an exception thrown from the future) 33 def future(*args, &block) 34 promise = Promise.new(args, &block) 35 promise.recorder = lambda { |*stats| stat(*stats) } 36 37 @queue.enq promise 38 stat :queued, :item_id => promise.object_id 39 start_thread 40 promise 41 end 42 43 # Waits until the queue of futures is empty and all threads have exited. 44 def join 45 @threads_mon.synchronize do 46 begin 47 stat :joining 48 @join_cond.wait unless @threads.empty? 49 stat :joined 50 rescue Exception => e 51 stat :joined 52 $stderr.puts e 53 $stderr.print "Queue contains #{@queue.size} items. Thread pool contains #{@threads.count} threads\n" 54 $stderr.print "Current Thread #{Thread.current} status = #{Thread.current.status}\n" 55 $stderr.puts e.backtrace.join("\n") 56 @threads.each do |t| 57 $stderr.print "Thread #{t} status = #{t.status}\n" 58 # 1.8 doesn't support Thread#backtrace 59 $stderr.puts t.backtrace.join("\n") if t.respond_to? :backtrace 60 end 61 raise e 62 end 63 end 64 end 65 66 # Enable the gathering of history events. 67 def gather_history #:nodoc: 68 @history_start_time = Time.now if @history_start_time.nil? 69 end 70 71 # Return a array of history events for the thread pool. 72 # 73 # History gathering must be enabled to be able to see the events 74 # (see #gather_history). Best to call this when the job is 75 # complete (i.e. after ThreadPool#join is called). 76 def history # :nodoc: 77 @history_mon.synchronize { @history.dup }. 78 sort_by { |i| i[:time] }. 79 each { |i| i[:time] -= @history_start_time } 80 end 81 82 # Return a hash of always collected statistics for the thread pool. 83 def statistics # :nodoc: 84 { 85 :total_threads_in_play => @total_threads_in_play, 86 :max_active_threads => @max_active_threads, 87 } 88 end 89 90 private 91 92 # processes one item on the queue. Returns true if there was an 93 # item to process, false if there was no item 94 def process_queue_item #:nodoc: 95 return false if @queue.empty? 96 97 # Even though we just asked if the queue was empty, it 98 # still could have had an item which by this statement 99 # is now gone. For this reason we pass true to Queue#deq 100 # because we will sleep indefinitely if it is empty. 101 promise = @queue.deq(true) 102 stat :dequeued, :item_id => promise.object_id 103 promise.work 104 return true 105 106 rescue ThreadError # this means the queue is empty 107 false 108 end 109 110 def start_thread # :nodoc: 111 @threads_mon.synchronize do 112 next unless @threads.count < @max_active_threads 113 114 t = Thread.new do 115 begin 116 while @threads.count <= @max_active_threads 117 break unless process_queue_item 118 end 119 ensure 120 @threads_mon.synchronize do 121 @threads.delete Thread.current 122 stat :ended, :thread_count => @threads.count 123 @join_cond.broadcast if @threads.empty? 124 end 125 end 126 end 127 @threads << t 128 stat :spawned, :new_thread => t.object_id, :thread_count => @threads.count 129 @total_threads_in_play = @threads.count if @threads.count > @total_threads_in_play 130 end 131 end 132 133 def stat(event, data=nil) # :nodoc: 134 return if @history_start_time.nil? 135 info = { 136 :event => event, 137 :data => data, 138 :time => Time.now, 139 :thread => Thread.current.object_id, 140 } 141 @history_mon.synchronize { @history << info } 142 end 143 144 # for testing only 145 146 def __queue__ # :nodoc: 147 @queue 148 end 149 150 def __threads__ # :nodoc: 151 @threads.dup 152 end 153 end 154 155end 156