1# 2# thread.rb - thread support classes 3# by Yukihiro Matsumoto <matz@netlab.co.jp> 4# 5# Copyright (C) 2001 Yukihiro Matsumoto 6# Copyright (C) 2000 Network Applied Communication Laboratory, Inc. 7# Copyright (C) 2000 Information-technology Promotion Agency, Japan 8# 9 10unless defined? Thread 11 raise "Thread not available for this ruby interpreter" 12end 13 14unless defined? ThreadError 15 class ThreadError < StandardError 16 end 17end 18 19if $DEBUG 20 Thread.abort_on_exception = true 21end 22 23# 24# ConditionVariable objects augment class Mutex. Using condition variables, 25# it is possible to suspend while in the middle of a critical section until a 26# resource becomes available. 27# 28# Example: 29# 30# require 'thread' 31# 32# mutex = Mutex.new 33# resource = ConditionVariable.new 34# 35# a = Thread.new { 36# mutex.synchronize { 37# # Thread 'a' now needs the resource 38# resource.wait(mutex) 39# # 'a' can now have the resource 40# } 41# } 42# 43# b = Thread.new { 44# mutex.synchronize { 45# # Thread 'b' has finished using the resource 46# resource.signal 47# } 48# } 49# 50class ConditionVariable 51 # 52 # Creates a new ConditionVariable 53 # 54 def initialize 55 @waiters = {} 56 @waiters_mutex = Mutex.new 57 end 58 59 # 60 # Releases the lock held in +mutex+ and waits; reacquires the lock on wakeup. 61 # 62 # If +timeout+ is given, this method returns after +timeout+ seconds passed, 63 # even if no other thread doesn't signal. 64 # 65 def wait(mutex, timeout=nil) 66 Thread.handle_interrupt(StandardError => :never) do 67 begin 68 Thread.handle_interrupt(StandardError => :on_blocking) do 69 @waiters_mutex.synchronize do 70 @waiters[Thread.current] = true 71 end 72 mutex.sleep timeout 73 end 74 ensure 75 @waiters_mutex.synchronize do 76 @waiters.delete(Thread.current) 77 end 78 end 79 end 80 self 81 end 82 83 # 84 # Wakes up the first thread in line waiting for this lock. 85 # 86 def signal 87 Thread.handle_interrupt(StandardError => :on_blocking) do 88 begin 89 t, _ = @waiters_mutex.synchronize { @waiters.shift } 90 t.run if t 91 rescue ThreadError 92 retry # t was already dead? 93 end 94 end 95 self 96 end 97 98 # 99 # Wakes up all threads waiting for this lock. 100 # 101 def broadcast 102 Thread.handle_interrupt(StandardError => :on_blocking) do 103 threads = nil 104 @waiters_mutex.synchronize do 105 threads = @waiters.keys 106 @waiters.clear 107 end 108 for t in threads 109 begin 110 t.run 111 rescue ThreadError 112 end 113 end 114 end 115 self 116 end 117end 118 119# 120# This class provides a way to synchronize communication between threads. 121# 122# Example: 123# 124# require 'thread' 125# 126# queue = Queue.new 127# 128# producer = Thread.new do 129# 5.times do |i| 130# sleep rand(i) # simulate expense 131# queue << i 132# puts "#{i} produced" 133# end 134# end 135# 136# consumer = Thread.new do 137# 5.times do |i| 138# value = queue.pop 139# sleep rand(i/2) # simulate expense 140# puts "consumed #{value}" 141# end 142# end 143# 144# consumer.join 145# 146class Queue 147 # 148 # Creates a new queue. 149 # 150 def initialize 151 @que = [] 152 @que.taint # enable tainted communication 153 @num_waiting = 0 154 self.taint 155 @mutex = Mutex.new 156 @cond = ConditionVariable.new 157 end 158 159 # 160 # Pushes +obj+ to the queue. 161 # 162 def push(obj) 163 Thread.handle_interrupt(StandardError => :on_blocking) do 164 @mutex.synchronize do 165 @que.push obj 166 @cond.signal 167 end 168 end 169 end 170 171 # 172 # Alias of push 173 # 174 alias << push 175 176 # 177 # Alias of push 178 # 179 alias enq push 180 181 # 182 # Retrieves data from the queue. If the queue is empty, the calling thread is 183 # suspended until data is pushed onto the queue. If +non_block+ is true, the 184 # thread isn't suspended, and an exception is raised. 185 # 186 def pop(non_block=false) 187 Thread.handle_interrupt(StandardError => :on_blocking) do 188 @mutex.synchronize do 189 while true 190 if @que.empty? 191 if non_block 192 raise ThreadError, "queue empty" 193 else 194 begin 195 @num_waiting += 1 196 @cond.wait @mutex 197 ensure 198 @num_waiting -= 1 199 end 200 end 201 else 202 return @que.shift 203 end 204 end 205 end 206 end 207 end 208 209 # 210 # Alias of pop 211 # 212 alias shift pop 213 214 # 215 # Alias of pop 216 # 217 alias deq pop 218 219 # 220 # Returns +true+ if the queue is empty. 221 # 222 def empty? 223 @que.empty? 224 end 225 226 # 227 # Removes all objects from the queue. 228 # 229 def clear 230 @que.clear 231 end 232 233 # 234 # Returns the length of the queue. 235 # 236 def length 237 @que.length 238 end 239 240 # 241 # Alias of length. 242 # 243 alias size length 244 245 # 246 # Returns the number of threads waiting on the queue. 247 # 248 def num_waiting 249 @num_waiting 250 end 251end 252 253# 254# This class represents queues of specified size capacity. The push operation 255# may be blocked if the capacity is full. 256# 257# See Queue for an example of how a SizedQueue works. 258# 259class SizedQueue < Queue 260 # 261 # Creates a fixed-length queue with a maximum size of +max+. 262 # 263 def initialize(max) 264 raise ArgumentError, "queue size must be positive" unless max > 0 265 @max = max 266 @enque_cond = ConditionVariable.new 267 @num_enqueue_waiting = 0 268 super() 269 end 270 271 # 272 # Returns the maximum size of the queue. 273 # 274 def max 275 @max 276 end 277 278 # 279 # Sets the maximum size of the queue. 280 # 281 def max=(max) 282 raise ArgumentError, "queue size must be positive" unless max > 0 283 284 @mutex.synchronize do 285 if max <= @max 286 @max = max 287 else 288 diff = max - @max 289 @max = max 290 diff.times do 291 @enque_cond.signal 292 end 293 end 294 end 295 max 296 end 297 298 # 299 # Pushes +obj+ to the queue. If there is no space left in the queue, waits 300 # until space becomes available. 301 # 302 def push(obj) 303 Thread.handle_interrupt(RuntimeError => :on_blocking) do 304 @mutex.synchronize do 305 while true 306 break if @que.length < @max 307 @num_enqueue_waiting += 1 308 begin 309 @enque_cond.wait @mutex 310 ensure 311 @num_enqueue_waiting -= 1 312 end 313 end 314 315 @que.push obj 316 @cond.signal 317 end 318 end 319 end 320 321 # 322 # Removes all objects from the queue. 323 # 324 def clear 325 super 326 @mutex.synchronize do 327 @max.times do 328 @enque_cond.signal 329 end 330 end 331 end 332 333 # 334 # Alias of push 335 # 336 alias << push 337 338 # 339 # Alias of push 340 # 341 alias enq push 342 343 # 344 # Retrieves data from the queue and runs a waiting thread, if any. 345 # 346 def pop(*args) 347 retval = super 348 @mutex.synchronize do 349 if @que.length < @max 350 @enque_cond.signal 351 end 352 end 353 retval 354 end 355 356 # 357 # Alias of pop 358 # 359 alias shift pop 360 361 # 362 # Alias of pop 363 # 364 alias deq pop 365 366 # 367 # Returns the number of threads waiting on the queue. 368 # 369 def num_waiting 370 @num_waiting + @num_enqueue_waiting 371 end 372end 373 374# Documentation comments: 375# - How do you make RDoc inherit documentation from superclass? 376