1# 2# Note: Rinda::Ring API is unstable. 3# 4require 'drb/drb' 5require 'rinda/rinda' 6require 'thread' 7 8module Rinda 9 10 ## 11 # The default port Ring discovery will use. 12 13 Ring_PORT = 7647 14 15 ## 16 # A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts. 17 # Service location uses the following steps: 18 # 19 # 1. A RingServer begins listening on the broadcast UDP address. 20 # 2. A RingFinger sends a UDP packet containing the DRb URI where it will 21 # listen for a reply. 22 # 3. The RingServer receives the UDP packet and connects back to the 23 # provided DRb URI with the DRb service. 24 25 class RingServer 26 27 include DRbUndumped 28 29 ## 30 # Advertises +ts+ on the UDP broadcast address at +port+. 31 32 def initialize(ts, port=Ring_PORT) 33 @ts = ts 34 @soc = UDPSocket.open 35 @soc.bind('', port) 36 @w_service = write_service 37 @r_service = reply_service 38 end 39 40 ## 41 # Creates a thread that picks up UDP packets and passes them to do_write 42 # for decoding. 43 44 def write_service 45 Thread.new do 46 loop do 47 msg = @soc.recv(1024) 48 do_write(msg) 49 end 50 end 51 end 52 53 ## 54 # Extracts the response URI from +msg+ and adds it to TupleSpace where it 55 # will be picked up by +reply_service+ for notification. 56 57 def do_write(msg) 58 Thread.new do 59 begin 60 tuple, sec = Marshal.load(msg) 61 @ts.write(tuple, sec) 62 rescue 63 end 64 end 65 end 66 67 ## 68 # Creates a thread that notifies waiting clients from the TupleSpace. 69 70 def reply_service 71 Thread.new do 72 loop do 73 do_reply 74 end 75 end 76 end 77 78 ## 79 # Pulls lookup tuples out of the TupleSpace and sends their DRb object the 80 # address of the local TupleSpace. 81 82 def do_reply 83 tuple = @ts.take([:lookup_ring, DRbObject]) 84 Thread.new { tuple[1].call(@ts) rescue nil} 85 rescue 86 end 87 88 end 89 90 ## 91 # RingFinger is used by RingServer clients to discover the RingServer's 92 # TupleSpace. Typically, all a client needs to do is call 93 # RingFinger.primary to retrieve the remote TupleSpace, which it can then 94 # begin using. 95 96 class RingFinger 97 98 @@broadcast_list = ['<broadcast>', 'localhost'] 99 100 @@finger = nil 101 102 ## 103 # Creates a singleton RingFinger and looks for a RingServer. Returns the 104 # created RingFinger. 105 106 def self.finger 107 unless @@finger 108 @@finger = self.new 109 @@finger.lookup_ring_any 110 end 111 @@finger 112 end 113 114 ## 115 # Returns the first advertised TupleSpace. 116 117 def self.primary 118 finger.primary 119 end 120 121 ## 122 # Contains all discovered TupleSpaces except for the primary. 123 124 def self.to_a 125 finger.to_a 126 end 127 128 ## 129 # The list of addresses where RingFinger will send query packets. 130 131 attr_accessor :broadcast_list 132 133 ## 134 # The port that RingFinger will send query packets to. 135 136 attr_accessor :port 137 138 ## 139 # Contain the first advertised TupleSpace after lookup_ring_any is called. 140 141 attr_accessor :primary 142 143 ## 144 # Creates a new RingFinger that will look for RingServers at +port+ on 145 # the addresses in +broadcast_list+. 146 147 def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT) 148 @broadcast_list = broadcast_list || ['localhost'] 149 @port = port 150 @primary = nil 151 @rings = [] 152 end 153 154 ## 155 # Contains all discovered TupleSpaces except for the primary. 156 157 def to_a 158 @rings 159 end 160 161 ## 162 # Iterates over all discovered TupleSpaces starting with the primary. 163 164 def each 165 lookup_ring_any unless @primary 166 return unless @primary 167 yield(@primary) 168 @rings.each { |x| yield(x) } 169 end 170 171 ## 172 # Looks up RingServers waiting +timeout+ seconds. RingServers will be 173 # given +block+ as a callback, which will be called with the remote 174 # TupleSpace. 175 176 def lookup_ring(timeout=5, &block) 177 return lookup_ring_any(timeout) unless block_given? 178 179 msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout]) 180 @broadcast_list.each do |it| 181 soc = UDPSocket.open 182 begin 183 soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true) 184 soc.send(msg, 0, it, @port) 185 rescue 186 nil 187 ensure 188 soc.close 189 end 190 end 191 sleep(timeout) 192 end 193 194 ## 195 # Returns the first found remote TupleSpace. Any further recovered 196 # TupleSpaces can be found by calling +to_a+. 197 198 def lookup_ring_any(timeout=5) 199 queue = Queue.new 200 201 Thread.new do 202 self.lookup_ring(timeout) do |ts| 203 queue.push(ts) 204 end 205 queue.push(nil) 206 end 207 208 @primary = queue.pop 209 raise('RingNotFound') if @primary.nil? 210 211 Thread.new do 212 while it = queue.pop 213 @rings.push(it) 214 end 215 end 216 217 @primary 218 end 219 220 end 221 222 ## 223 # RingProvider uses a RingServer advertised TupleSpace as a name service. 224 # TupleSpace clients can register themselves with the remote TupleSpace and 225 # look up other provided services via the remote TupleSpace. 226 # 227 # Services are registered with a tuple of the format [:name, klass, 228 # DRbObject, description]. 229 230 class RingProvider 231 232 ## 233 # Creates a RingProvider that will provide a +klass+ service running on 234 # +front+, with a +description+. +renewer+ is optional. 235 236 def initialize(klass, front, desc, renewer = nil) 237 @tuple = [:name, klass, front, desc] 238 @renewer = renewer || Rinda::SimpleRenewer.new 239 end 240 241 ## 242 # Advertises this service on the primary remote TupleSpace. 243 244 def provide 245 ts = Rinda::RingFinger.primary 246 ts.write(@tuple, @renewer) 247 end 248 249 end 250 251end 252 253if __FILE__ == $0 254 DRb.start_service 255 case ARGV.shift 256 when 's' 257 require 'rinda/tuplespace' 258 ts = Rinda::TupleSpace.new 259 Rinda::RingServer.new(ts) 260 $stdin.gets 261 when 'w' 262 finger = Rinda::RingFinger.new(nil) 263 finger.lookup_ring do |ts2| 264 p ts2 265 ts2.write([:hello, :world]) 266 end 267 when 'r' 268 finger = Rinda::RingFinger.new(nil) 269 finger.lookup_ring do |ts2| 270 p ts2 271 p ts2.take([nil, nil]) 272 end 273 end 274end 275 276