1#
2#               thread.rb - thread support classes
3#                       by Yukihiro Matsumoto <matz@netlab.co.jp>
4#
5# Copyright (C) 2001  Yukihiro Matsumoto
6# Copyright (C) 2000  Network Applied Communication Laboratory, Inc.
7# Copyright (C) 2000  Information-technology Promotion Agency, Japan
8#
9
10unless defined? Thread
11  raise "Thread not available for this ruby interpreter"
12end
13
14unless defined? ThreadError
15  class ThreadError < StandardError
16  end
17end
18
19if $DEBUG
20  Thread.abort_on_exception = true
21end
22
23#
24# ConditionVariable objects augment class Mutex. Using condition variables,
25# it is possible to suspend while in the middle of a critical section until a
26# resource becomes available.
27#
28# Example:
29#
30#   require 'thread'
31#
32#   mutex = Mutex.new
33#   resource = ConditionVariable.new
34#
35#   a = Thread.new {
36#     mutex.synchronize {
37#       # Thread 'a' now needs the resource
38#       resource.wait(mutex)
39#       # 'a' can now have the resource
40#     }
41#   }
42#
43#   b = Thread.new {
44#     mutex.synchronize {
45#       # Thread 'b' has finished using the resource
46#       resource.signal
47#     }
48#   }
49#
50class ConditionVariable
51  #
52  # Creates a new ConditionVariable
53  #
54  def initialize
55    @waiters = {}
56    @waiters_mutex = Mutex.new
57  end
58
59  #
60  # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup.
61  #
62  # If +timeout+ is given, this method returns after +timeout+ seconds passed,
63  # even if no other thread doesn't signal.
64  #
65  def wait(mutex, timeout=nil)
66    Thread.handle_interrupt(StandardError => :never) do
67      begin
68        Thread.handle_interrupt(StandardError => :on_blocking) do
69          @waiters_mutex.synchronize do
70            @waiters[Thread.current] = true
71          end
72          mutex.sleep timeout
73        end
74      ensure
75        @waiters_mutex.synchronize do
76          @waiters.delete(Thread.current)
77        end
78      end
79    end
80    self
81  end
82
83  #
84  # Wakes up the first thread in line waiting for this lock.
85  #
86  def signal
87    Thread.handle_interrupt(StandardError => :on_blocking) do
88      begin
89        t, _ = @waiters_mutex.synchronize { @waiters.shift }
90        t.run if t
91      rescue ThreadError
92        retry # t was already dead?
93      end
94    end
95    self
96  end
97
98  #
99  # Wakes up all threads waiting for this lock.
100  #
101  def broadcast
102    Thread.handle_interrupt(StandardError => :on_blocking) do
103      threads = nil
104      @waiters_mutex.synchronize do
105        threads = @waiters.keys
106        @waiters.clear
107      end
108      for t in threads
109        begin
110          t.run
111        rescue ThreadError
112        end
113      end
114    end
115    self
116  end
117end
118
119#
120# This class provides a way to synchronize communication between threads.
121#
122# Example:
123#
124#   require 'thread'
125#
126#   queue = Queue.new
127#
128#   producer = Thread.new do
129#     5.times do |i|
130#       sleep rand(i) # simulate expense
131#       queue << i
132#       puts "#{i} produced"
133#     end
134#   end
135#
136#   consumer = Thread.new do
137#     5.times do |i|
138#       value = queue.pop
139#       sleep rand(i/2) # simulate expense
140#       puts "consumed #{value}"
141#     end
142#   end
143#
144#   consumer.join
145#
146class Queue
147  #
148  # Creates a new queue.
149  #
150  def initialize
151    @que = []
152    @que.taint          # enable tainted communication
153    @num_waiting = 0
154    self.taint
155    @mutex = Mutex.new
156    @cond = ConditionVariable.new
157  end
158
159  #
160  # Pushes +obj+ to the queue.
161  #
162  def push(obj)
163    Thread.handle_interrupt(StandardError => :on_blocking) do
164      @mutex.synchronize do
165        @que.push obj
166        @cond.signal
167      end
168    end
169  end
170
171  #
172  # Alias of push
173  #
174  alias << push
175
176  #
177  # Alias of push
178  #
179  alias enq push
180
181  #
182  # Retrieves data from the queue.  If the queue is empty, the calling thread is
183  # suspended until data is pushed onto the queue.  If +non_block+ is true, the
184  # thread isn't suspended, and an exception is raised.
185  #
186  def pop(non_block=false)
187    Thread.handle_interrupt(StandardError => :on_blocking) do
188      @mutex.synchronize do
189        while true
190          if @que.empty?
191            if non_block
192              raise ThreadError, "queue empty"
193            else
194              begin
195                @num_waiting += 1
196                @cond.wait @mutex
197              ensure
198                @num_waiting -= 1
199              end
200            end
201          else
202            return @que.shift
203          end
204        end
205      end
206    end
207  end
208
209  #
210  # Alias of pop
211  #
212  alias shift pop
213
214  #
215  # Alias of pop
216  #
217  alias deq pop
218
219  #
220  # Returns +true+ if the queue is empty.
221  #
222  def empty?
223    @que.empty?
224  end
225
226  #
227  # Removes all objects from the queue.
228  #
229  def clear
230    @que.clear
231  end
232
233  #
234  # Returns the length of the queue.
235  #
236  def length
237    @que.length
238  end
239
240  #
241  # Alias of length.
242  #
243  alias size length
244
245  #
246  # Returns the number of threads waiting on the queue.
247  #
248  def num_waiting
249    @num_waiting
250  end
251end
252
253#
254# This class represents queues of specified size capacity.  The push operation
255# may be blocked if the capacity is full.
256#
257# See Queue for an example of how a SizedQueue works.
258#
259class SizedQueue < Queue
260  #
261  # Creates a fixed-length queue with a maximum size of +max+.
262  #
263  def initialize(max)
264    raise ArgumentError, "queue size must be positive" unless max > 0
265    @max = max
266    @enque_cond = ConditionVariable.new
267    @num_enqueue_waiting = 0
268    super()
269  end
270
271  #
272  # Returns the maximum size of the queue.
273  #
274  def max
275    @max
276  end
277
278  #
279  # Sets the maximum size of the queue.
280  #
281  def max=(max)
282    raise ArgumentError, "queue size must be positive" unless max > 0
283
284    @mutex.synchronize do
285      if max <= @max
286        @max = max
287      else
288        diff = max - @max
289        @max = max
290        diff.times do
291          @enque_cond.signal
292        end
293      end
294    end
295    max
296  end
297
298  #
299  # Pushes +obj+ to the queue.  If there is no space left in the queue, waits
300  # until space becomes available.
301  #
302  def push(obj)
303    Thread.handle_interrupt(RuntimeError => :on_blocking) do
304      @mutex.synchronize do
305        while true
306          break if @que.length < @max
307          @num_enqueue_waiting += 1
308          begin
309            @enque_cond.wait @mutex
310          ensure
311            @num_enqueue_waiting -= 1
312          end
313        end
314
315        @que.push obj
316        @cond.signal
317      end
318    end
319  end
320
321  #
322  # Removes all objects from the queue.
323  #
324  def clear
325    super
326    @mutex.synchronize do
327      @max.times do
328        @enque_cond.signal
329      end
330    end
331  end
332
333  #
334  # Alias of push
335  #
336  alias << push
337
338  #
339  # Alias of push
340  #
341  alias enq push
342
343  #
344  # Retrieves data from the queue and runs a waiting thread, if any.
345  #
346  def pop(*args)
347    retval = super
348    @mutex.synchronize do
349      if @que.length < @max
350        @enque_cond.signal
351      end
352    end
353    retval
354  end
355
356  #
357  # Alias of pop
358  #
359  alias shift pop
360
361  #
362  # Alias of pop
363  #
364  alias deq pop
365
366  #
367  # Returns the number of threads waiting on the queue.
368  #
369  def num_waiting
370    @num_waiting + @num_enqueue_waiting
371  end
372end
373
374# Documentation comments:
375#  - How do you make RDoc inherit documentation from superclass?
376