1require 'monitor' 2require 'thread' 3require 'drb/drb' 4require 'rinda/rinda' 5require 'enumerator' 6require 'forwardable' 7 8module Rinda 9 10 ## 11 # A TupleEntry is a Tuple (i.e. a possible entry in some Tuplespace) 12 # together with expiry and cancellation data. 13 14 class TupleEntry 15 16 include DRbUndumped 17 18 attr_accessor :expires 19 20 ## 21 # Creates a TupleEntry based on +ary+ with an optional renewer or expiry 22 # time +sec+. 23 # 24 # A renewer must implement the +renew+ method which returns a Numeric, 25 # nil, or true to indicate when the tuple has expired. 26 27 def initialize(ary, sec=nil) 28 @cancel = false 29 @expires = nil 30 @tuple = make_tuple(ary) 31 @renewer = nil 32 renew(sec) 33 end 34 35 ## 36 # Marks this TupleEntry as canceled. 37 38 def cancel 39 @cancel = true 40 end 41 42 ## 43 # A TupleEntry is dead when it is canceled or expired. 44 45 def alive? 46 !canceled? && !expired? 47 end 48 49 ## 50 # Return the object which makes up the tuple itself: the Array 51 # or Hash. 52 53 def value; @tuple.value; end 54 55 ## 56 # Returns the canceled status. 57 58 def canceled?; @cancel; end 59 60 ## 61 # Has this tuple expired? (true/false). 62 # 63 # A tuple has expired when its expiry timer based on the +sec+ argument to 64 # #initialize runs out. 65 66 def expired? 67 return true unless @expires 68 return false if @expires > Time.now 69 return true if @renewer.nil? 70 renew(@renewer) 71 return true unless @expires 72 return @expires < Time.now 73 end 74 75 ## 76 # Reset the expiry time according to +sec_or_renewer+. 77 # 78 # +nil+:: it is set to expire in the far future. 79 # +false+:: it has expired. 80 # Numeric:: it will expire in that many seconds. 81 # 82 # Otherwise the argument refers to some kind of renewer object 83 # which will reset its expiry time. 84 85 def renew(sec_or_renewer) 86 sec, @renewer = get_renewer(sec_or_renewer) 87 @expires = make_expires(sec) 88 end 89 90 ## 91 # Returns an expiry Time based on +sec+ which can be one of: 92 # Numeric:: +sec+ seconds into the future 93 # +true+:: the expiry time is the start of 1970 (i.e. expired) 94 # +nil+:: it is Tue Jan 19 03:14:07 GMT Standard Time 2038 (i.e. when 95 # UNIX clocks will die) 96 97 def make_expires(sec=nil) 98 case sec 99 when Numeric 100 Time.now + sec 101 when true 102 Time.at(1) 103 when nil 104 Time.at(2**31-1) 105 end 106 end 107 108 ## 109 # Retrieves +key+ from the tuple. 110 111 def [](key) 112 @tuple[key] 113 end 114 115 ## 116 # Fetches +key+ from the tuple. 117 118 def fetch(key) 119 @tuple.fetch(key) 120 end 121 122 ## 123 # The size of the tuple. 124 125 def size 126 @tuple.size 127 end 128 129 ## 130 # Creates a Rinda::Tuple for +ary+. 131 132 def make_tuple(ary) 133 Rinda::Tuple.new(ary) 134 end 135 136 private 137 138 ## 139 # Returns a valid argument to make_expires and the renewer or nil. 140 # 141 # Given +true+, +nil+, or Numeric, returns that value and +nil+ (no actual 142 # renewer). Otherwise it returns an expiry value from calling +it.renew+ 143 # and the renewer. 144 145 def get_renewer(it) 146 case it 147 when Numeric, true, nil 148 return it, nil 149 else 150 begin 151 return it.renew, it 152 rescue Exception 153 return it, nil 154 end 155 end 156 end 157 158 end 159 160 ## 161 # A TemplateEntry is a Template together with expiry and cancellation data. 162 163 class TemplateEntry < TupleEntry 164 ## 165 # Matches this TemplateEntry against +tuple+. See Template#match for 166 # details on how a Template matches a Tuple. 167 168 def match(tuple) 169 @tuple.match(tuple) 170 end 171 172 alias === match 173 174 def make_tuple(ary) # :nodoc: 175 Rinda::Template.new(ary) 176 end 177 178 end 179 180 ## 181 # <i>Documentation?</i> 182 183 class WaitTemplateEntry < TemplateEntry 184 185 attr_reader :found 186 187 def initialize(place, ary, expires=nil) 188 super(ary, expires) 189 @place = place 190 @cond = place.new_cond 191 @found = nil 192 end 193 194 def cancel 195 super 196 signal 197 end 198 199 def wait 200 @cond.wait 201 end 202 203 def read(tuple) 204 @found = tuple 205 signal 206 end 207 208 def signal 209 @place.synchronize do 210 @cond.signal 211 end 212 end 213 214 end 215 216 ## 217 # A NotifyTemplateEntry is returned by TupleSpace#notify and is notified of 218 # TupleSpace changes. You may receive either your subscribed event or the 219 # 'close' event when iterating over notifications. 220 # 221 # See TupleSpace#notify_event for valid notification types. 222 # 223 # == Example 224 # 225 # ts = Rinda::TupleSpace.new 226 # observer = ts.notify 'write', [nil] 227 # 228 # Thread.start do 229 # observer.each { |t| p t } 230 # end 231 # 232 # 3.times { |i| ts.write [i] } 233 # 234 # Outputs: 235 # 236 # ['write', [0]] 237 # ['write', [1]] 238 # ['write', [2]] 239 240 class NotifyTemplateEntry < TemplateEntry 241 242 ## 243 # Creates a new NotifyTemplateEntry that watches +place+ for +event+s that 244 # match +tuple+. 245 246 def initialize(place, event, tuple, expires=nil) 247 ary = [event, Rinda::Template.new(tuple)] 248 super(ary, expires) 249 @queue = Queue.new 250 @done = false 251 end 252 253 ## 254 # Called by TupleSpace to notify this NotifyTemplateEntry of a new event. 255 256 def notify(ev) 257 @queue.push(ev) 258 end 259 260 ## 261 # Retrieves a notification. Raises RequestExpiredError when this 262 # NotifyTemplateEntry expires. 263 264 def pop 265 raise RequestExpiredError if @done 266 it = @queue.pop 267 @done = true if it[0] == 'close' 268 return it 269 end 270 271 ## 272 # Yields event/tuple pairs until this NotifyTemplateEntry expires. 273 274 def each # :yields: event, tuple 275 while !@done 276 it = pop 277 yield(it) 278 end 279 rescue 280 ensure 281 cancel 282 end 283 284 end 285 286 ## 287 # TupleBag is an unordered collection of tuples. It is the basis 288 # of Tuplespace. 289 290 class TupleBag 291 class TupleBin 292 extend Forwardable 293 def_delegators '@bin', :find_all, :delete_if, :each, :empty? 294 295 def initialize 296 @bin = [] 297 end 298 299 def add(tuple) 300 @bin.push(tuple) 301 end 302 303 def delete(tuple) 304 idx = @bin.rindex(tuple) 305 @bin.delete_at(idx) if idx 306 end 307 308 def find 309 @bin.reverse_each do |x| 310 return x if yield(x) 311 end 312 nil 313 end 314 end 315 316 def initialize # :nodoc: 317 @hash = {} 318 @enum = enum_for(:each_entry) 319 end 320 321 ## 322 # +true+ if the TupleBag to see if it has any expired entries. 323 324 def has_expires? 325 @enum.find do |tuple| 326 tuple.expires 327 end 328 end 329 330 ## 331 # Add +tuple+ to the TupleBag. 332 333 def push(tuple) 334 key = bin_key(tuple) 335 @hash[key] ||= TupleBin.new 336 @hash[key].add(tuple) 337 end 338 339 ## 340 # Removes +tuple+ from the TupleBag. 341 342 def delete(tuple) 343 key = bin_key(tuple) 344 bin = @hash[key] 345 return nil unless bin 346 bin.delete(tuple) 347 @hash.delete(key) if bin.empty? 348 tuple 349 end 350 351 ## 352 # Finds all live tuples that match +template+. 353 def find_all(template) 354 bin_for_find(template).find_all do |tuple| 355 tuple.alive? && template.match(tuple) 356 end 357 end 358 359 ## 360 # Finds a live tuple that matches +template+. 361 362 def find(template) 363 bin_for_find(template).find do |tuple| 364 tuple.alive? && template.match(tuple) 365 end 366 end 367 368 ## 369 # Finds all tuples in the TupleBag which when treated as templates, match 370 # +tuple+ and are alive. 371 372 def find_all_template(tuple) 373 @enum.find_all do |template| 374 template.alive? && template.match(tuple) 375 end 376 end 377 378 ## 379 # Delete tuples which dead tuples from the TupleBag, returning the deleted 380 # tuples. 381 382 def delete_unless_alive 383 deleted = [] 384 @hash.each do |key, bin| 385 bin.delete_if do |tuple| 386 if tuple.alive? 387 false 388 else 389 deleted.push(tuple) 390 true 391 end 392 end 393 end 394 deleted 395 end 396 397 private 398 def each_entry(&blk) 399 @hash.each do |k, v| 400 v.each(&blk) 401 end 402 end 403 404 def bin_key(tuple) 405 head = tuple[0] 406 if head.class == Symbol 407 return head 408 else 409 false 410 end 411 end 412 413 def bin_for_find(template) 414 key = bin_key(template) 415 key ? @hash.fetch(key, []) : @enum 416 end 417 end 418 419 ## 420 # The Tuplespace manages access to the tuples it contains, 421 # ensuring mutual exclusion requirements are met. 422 # 423 # The +sec+ option for the write, take, move, read and notify methods may 424 # either be a number of seconds or a Renewer object. 425 426 class TupleSpace 427 428 include DRbUndumped 429 include MonitorMixin 430 431 ## 432 # Creates a new TupleSpace. +period+ is used to control how often to look 433 # for dead tuples after modifications to the TupleSpace. 434 # 435 # If no dead tuples are found +period+ seconds after the last 436 # modification, the TupleSpace will stop looking for dead tuples. 437 438 def initialize(period=60) 439 super() 440 @bag = TupleBag.new 441 @read_waiter = TupleBag.new 442 @take_waiter = TupleBag.new 443 @notify_waiter = TupleBag.new 444 @period = period 445 @keeper = nil 446 end 447 448 ## 449 # Adds +tuple+ 450 451 def write(tuple, sec=nil) 452 entry = create_entry(tuple, sec) 453 synchronize do 454 if entry.expired? 455 @read_waiter.find_all_template(entry).each do |template| 456 template.read(tuple) 457 end 458 notify_event('write', entry.value) 459 notify_event('delete', entry.value) 460 else 461 @bag.push(entry) 462 start_keeper if entry.expires 463 @read_waiter.find_all_template(entry).each do |template| 464 template.read(tuple) 465 end 466 @take_waiter.find_all_template(entry).each do |template| 467 template.signal 468 end 469 notify_event('write', entry.value) 470 end 471 end 472 entry 473 end 474 475 ## 476 # Removes +tuple+ 477 478 def take(tuple, sec=nil, &block) 479 move(nil, tuple, sec, &block) 480 end 481 482 ## 483 # Moves +tuple+ to +port+. 484 485 def move(port, tuple, sec=nil) 486 template = WaitTemplateEntry.new(self, tuple, sec) 487 yield(template) if block_given? 488 synchronize do 489 entry = @bag.find(template) 490 if entry 491 port.push(entry.value) if port 492 @bag.delete(entry) 493 notify_event('take', entry.value) 494 return entry.value 495 end 496 raise RequestExpiredError if template.expired? 497 498 begin 499 @take_waiter.push(template) 500 start_keeper if template.expires 501 while true 502 raise RequestCanceledError if template.canceled? 503 raise RequestExpiredError if template.expired? 504 entry = @bag.find(template) 505 if entry 506 port.push(entry.value) if port 507 @bag.delete(entry) 508 notify_event('take', entry.value) 509 return entry.value 510 end 511 template.wait 512 end 513 ensure 514 @take_waiter.delete(template) 515 end 516 end 517 end 518 519 ## 520 # Reads +tuple+, but does not remove it. 521 522 def read(tuple, sec=nil) 523 template = WaitTemplateEntry.new(self, tuple, sec) 524 yield(template) if block_given? 525 synchronize do 526 entry = @bag.find(template) 527 return entry.value if entry 528 raise RequestExpiredError if template.expired? 529 530 begin 531 @read_waiter.push(template) 532 start_keeper if template.expires 533 template.wait 534 raise RequestCanceledError if template.canceled? 535 raise RequestExpiredError if template.expired? 536 return template.found 537 ensure 538 @read_waiter.delete(template) 539 end 540 end 541 end 542 543 ## 544 # Returns all tuples matching +tuple+. Does not remove the found tuples. 545 546 def read_all(tuple) 547 template = WaitTemplateEntry.new(self, tuple, nil) 548 synchronize do 549 entry = @bag.find_all(template) 550 entry.collect do |e| 551 e.value 552 end 553 end 554 end 555 556 ## 557 # Registers for notifications of +event+. Returns a NotifyTemplateEntry. 558 # See NotifyTemplateEntry for examples of how to listen for notifications. 559 # 560 # +event+ can be: 561 # 'write':: A tuple was added 562 # 'take':: A tuple was taken or moved 563 # 'delete':: A tuple was lost after being overwritten or expiring 564 # 565 # The TupleSpace will also notify you of the 'close' event when the 566 # NotifyTemplateEntry has expired. 567 568 def notify(event, tuple, sec=nil) 569 template = NotifyTemplateEntry.new(self, event, tuple, sec) 570 synchronize do 571 @notify_waiter.push(template) 572 end 573 template 574 end 575 576 private 577 578 def create_entry(tuple, sec) 579 TupleEntry.new(tuple, sec) 580 end 581 582 ## 583 # Removes dead tuples. 584 585 def keep_clean 586 synchronize do 587 @read_waiter.delete_unless_alive.each do |e| 588 e.signal 589 end 590 @take_waiter.delete_unless_alive.each do |e| 591 e.signal 592 end 593 @notify_waiter.delete_unless_alive.each do |e| 594 e.notify(['close']) 595 end 596 @bag.delete_unless_alive.each do |e| 597 notify_event('delete', e.value) 598 end 599 end 600 end 601 602 ## 603 # Notifies all registered listeners for +event+ of a status change of 604 # +tuple+. 605 606 def notify_event(event, tuple) 607 ev = [event, tuple] 608 @notify_waiter.find_all_template(ev).each do |template| 609 template.notify(ev) 610 end 611 end 612 613 ## 614 # Creates a thread that scans the tuplespace for expired tuples. 615 616 def start_keeper 617 return if @keeper && @keeper.alive? 618 @keeper = Thread.new do 619 while true 620 sleep(@period) 621 synchronize do 622 break unless need_keeper? 623 keep_clean 624 end 625 end 626 end 627 end 628 629 ## 630 # Checks the tuplespace to see if it needs cleaning. 631 632 def need_keeper? 633 return true if @bag.has_expires? 634 return true if @read_waiter.has_expires? 635 return true if @take_waiter.has_expires? 636 return true if @notify_waiter.has_expires? 637 end 638 639 end 640 641end 642 643