1#
2# Note: Rinda::Ring API is unstable.
3#
4require 'drb/drb'
5require 'rinda/rinda'
6require 'thread'
7
8module Rinda
9
10  ##
11  # The default port Ring discovery will use.
12
13  Ring_PORT = 7647
14
15  ##
16  # A RingServer allows a Rinda::TupleSpace to be located via UDP broadcasts.
17  # Service location uses the following steps:
18  #
19  # 1. A RingServer begins listening on the broadcast UDP address.
20  # 2. A RingFinger sends a UDP packet containing the DRb URI where it will
21  #    listen for a reply.
22  # 3. The RingServer receives the UDP packet and connects back to the
23  #    provided DRb URI with the DRb service.
24
25  class RingServer
26
27    include DRbUndumped
28
29    ##
30    # Advertises +ts+ on the UDP broadcast address at +port+.
31
32    def initialize(ts, port=Ring_PORT)
33      @ts = ts
34      @soc = UDPSocket.open
35      @soc.bind('', port)
36      @w_service = write_service
37      @r_service = reply_service
38    end
39
40    ##
41    # Creates a thread that picks up UDP packets and passes them to do_write
42    # for decoding.
43
44    def write_service
45      Thread.new do
46        loop do
47          msg = @soc.recv(1024)
48          do_write(msg)
49        end
50      end
51    end
52
53    ##
54    # Extracts the response URI from +msg+ and adds it to TupleSpace where it
55    # will be picked up by +reply_service+ for notification.
56
57    def do_write(msg)
58      Thread.new do
59        begin
60          tuple, sec = Marshal.load(msg)
61          @ts.write(tuple, sec)
62        rescue
63        end
64      end
65    end
66
67    ##
68    # Creates a thread that notifies waiting clients from the TupleSpace.
69
70    def reply_service
71      Thread.new do
72        loop do
73          do_reply
74        end
75      end
76    end
77
78    ##
79    # Pulls lookup tuples out of the TupleSpace and sends their DRb object the
80    # address of the local TupleSpace.
81
82    def do_reply
83      tuple = @ts.take([:lookup_ring, DRbObject])
84      Thread.new { tuple[1].call(@ts) rescue nil}
85    rescue
86    end
87
88  end
89
90  ##
91  # RingFinger is used by RingServer clients to discover the RingServer's
92  # TupleSpace.  Typically, all a client needs to do is call
93  # RingFinger.primary to retrieve the remote TupleSpace, which it can then
94  # begin using.
95
96  class RingFinger
97
98    @@broadcast_list = ['<broadcast>', 'localhost']
99
100    @@finger = nil
101
102    ##
103    # Creates a singleton RingFinger and looks for a RingServer.  Returns the
104    # created RingFinger.
105
106    def self.finger
107      unless @@finger
108        @@finger = self.new
109        @@finger.lookup_ring_any
110      end
111      @@finger
112    end
113
114    ##
115    # Returns the first advertised TupleSpace.
116
117    def self.primary
118      finger.primary
119    end
120
121    ##
122    # Contains all discovered TupleSpaces except for the primary.
123
124    def self.to_a
125      finger.to_a
126    end
127
128    ##
129    # The list of addresses where RingFinger will send query packets.
130
131    attr_accessor :broadcast_list
132
133    ##
134    # The port that RingFinger will send query packets to.
135
136    attr_accessor :port
137
138    ##
139    # Contain the first advertised TupleSpace after lookup_ring_any is called.
140
141    attr_accessor :primary
142
143    ##
144    # Creates a new RingFinger that will look for RingServers at +port+ on
145    # the addresses in +broadcast_list+.
146
147    def initialize(broadcast_list=@@broadcast_list, port=Ring_PORT)
148      @broadcast_list = broadcast_list || ['localhost']
149      @port = port
150      @primary = nil
151      @rings = []
152    end
153
154    ##
155    # Contains all discovered TupleSpaces except for the primary.
156
157    def to_a
158      @rings
159    end
160
161    ##
162    # Iterates over all discovered TupleSpaces starting with the primary.
163
164    def each
165      lookup_ring_any unless @primary
166      return unless @primary
167      yield(@primary)
168      @rings.each { |x| yield(x) }
169    end
170
171    ##
172    # Looks up RingServers waiting +timeout+ seconds.  RingServers will be
173    # given +block+ as a callback, which will be called with the remote
174    # TupleSpace.
175
176    def lookup_ring(timeout=5, &block)
177      return lookup_ring_any(timeout) unless block_given?
178
179      msg = Marshal.dump([[:lookup_ring, DRbObject.new(block)], timeout])
180      @broadcast_list.each do |it|
181        soc = UDPSocket.open
182        begin
183          soc.setsockopt(Socket::SOL_SOCKET, Socket::SO_BROADCAST, true)
184          soc.send(msg, 0, it, @port)
185        rescue
186          nil
187        ensure
188          soc.close
189        end
190      end
191      sleep(timeout)
192    end
193
194    ##
195    # Returns the first found remote TupleSpace.  Any further recovered
196    # TupleSpaces can be found by calling +to_a+.
197
198    def lookup_ring_any(timeout=5)
199      queue = Queue.new
200
201      Thread.new do
202        self.lookup_ring(timeout) do |ts|
203          queue.push(ts)
204        end
205        queue.push(nil)
206      end
207
208      @primary = queue.pop
209      raise('RingNotFound') if @primary.nil?
210
211      Thread.new do
212        while it = queue.pop
213          @rings.push(it)
214        end
215      end
216
217      @primary
218    end
219
220  end
221
222  ##
223  # RingProvider uses a RingServer advertised TupleSpace as a name service.
224  # TupleSpace clients can register themselves with the remote TupleSpace and
225  # look up other provided services via the remote TupleSpace.
226  #
227  # Services are registered with a tuple of the format [:name, klass,
228  # DRbObject, description].
229
230  class RingProvider
231
232    ##
233    # Creates a RingProvider that will provide a +klass+ service running on
234    # +front+, with a +description+.  +renewer+ is optional.
235
236    def initialize(klass, front, desc, renewer = nil)
237      @tuple = [:name, klass, front, desc]
238      @renewer = renewer || Rinda::SimpleRenewer.new
239    end
240
241    ##
242    # Advertises this service on the primary remote TupleSpace.
243
244    def provide
245      ts = Rinda::RingFinger.primary
246      ts.write(@tuple, @renewer)
247    end
248
249  end
250
251end
252
253if __FILE__ == $0
254  DRb.start_service
255  case ARGV.shift
256  when 's'
257    require 'rinda/tuplespace'
258    ts = Rinda::TupleSpace.new
259    Rinda::RingServer.new(ts)
260    $stdin.gets
261  when 'w'
262    finger = Rinda::RingFinger.new(nil)
263    finger.lookup_ring do |ts2|
264      p ts2
265      ts2.write([:hello, :world])
266    end
267  when 'r'
268    finger = Rinda::RingFinger.new(nil)
269    finger.lookup_ring do |ts2|
270      p ts2
271      p ts2.take([nil, nil])
272    end
273  end
274end
275
276