1#
2# Copyright (C) 2001 John W. Small All Rights Reserved
3#
4# Author::        John W. Small
5# Documentation:: Gavin Sinclair
6# Licence::       Ruby License
7
8require "socket"
9require "thread"
10
11#
12# GServer implements a generic server, featuring thread pool management,
13# simple logging, and multi-server management.  See HttpServer in
14# <tt>xmlrpc/httpserver.rb</tt> in the Ruby standard library for an example of
15# GServer in action.
16#
17# Any kind of application-level server can be implemented using this class.
18# It accepts multiple simultaneous connections from clients, up to an optional
19# maximum number.  Several _services_ (i.e. one service per TCP port) can be
20# run simultaneously, and stopped at any time through the class method
21# <tt>GServer.stop(port)</tt>.  All the threading issues are handled, saving
22# you the effort.  All events are optionally logged, but you can provide your
23# own event handlers if you wish.
24#
25# == Example
26#
27# Using GServer is simple.  Below we implement a simple time server, run it,
28# query it, and shut it down.  Try this code in +irb+:
29#
30#   require 'gserver'
31#
32#   #
33#   # A server that returns the time in seconds since 1970.
34#   #
35#   class TimeServer < GServer
36#     def initialize(port=10001, *args)
37#       super(port, *args)
38#     end
39#     def serve(io)
40#       io.puts(Time.now.to_s)
41#     end
42#   end
43#
44#   # Run the server with logging enabled (it's a separate thread).
45#   server = TimeServer.new
46#   server.audit = true                  # Turn logging on.
47#   server.start
48#
49#   # *** Now point your browser to http://localhost:10001 to see it working ***
50#
51#   # See if it's still running.
52#   GServer.in_service?(10001)           # -> true
53#   server.stopped?                      # -> false
54#
55#   # Shut the server down gracefully.
56#   server.shutdown
57#
58#   # Alternatively, stop it immediately.
59#   GServer.stop(10001)
60#   # or, of course, "server.stop".
61#
62# All the business of accepting connections and exception handling is taken
63# care of.  All we have to do is implement the method that actually serves the
64# client.
65#
66# === Advanced
67#
68# As the example above shows, the way to use GServer is to subclass it to
69# create a specific server, overriding the +serve+ method.  You can override
70# other methods as well if you wish, perhaps to collect statistics, or emit
71# more detailed logging.
72#
73# * #connecting
74# * #disconnecting
75# * #starting
76# * #stopping
77#
78# The above methods are only called if auditing is enabled, via #audit=.
79#
80# You can also override #log and #error if, for example, you wish to use a
81# more sophisticated logging system.
82#
83class GServer
84
85  DEFAULT_HOST = "127.0.0.1"
86
87  def serve(io)
88  end
89
90  @@services = {}   # Hash of opened ports, i.e. services
91  @@servicesMutex = Mutex.new
92
93  # Stop the server running on the given port, bound to the given host
94  #
95  # +port+:: port, as a FixNum, of the server to stop
96  # +host+:: host on which to find the server to stop
97  def GServer.stop(port, host = DEFAULT_HOST)
98    @@servicesMutex.synchronize {
99      @@services[host][port].stop
100    }
101  end
102
103  # Check if a server is running on the given port and host
104  #
105  # +port+:: port, as a FixNum, of the server to check
106  # +host+:: host on which to find the server to check
107  #
108  # Returns true if a server is running on that port and host.
109  def GServer.in_service?(port, host = DEFAULT_HOST)
110    @@services.has_key?(host) and
111      @@services[host].has_key?(port)
112  end
113
114  # Stop the server
115  def stop
116    @connectionsMutex.synchronize  {
117      if @tcpServerThread
118        @tcpServerThread.raise "stop"
119      end
120    }
121  end
122
123  # Returns true if the server has stopped.
124  def stopped?
125    @tcpServerThread == nil
126  end
127
128  # Schedule a shutdown for the server
129  def shutdown
130    @shutdown = true
131  end
132
133  # Return the current number of connected clients
134  def connections
135    @connections.size
136  end
137
138  # Join with the server thread
139  def join
140    @tcpServerThread.join if @tcpServerThread
141  end
142
143  # Port on which to listen, as a FixNum
144  attr_reader :port
145  # Host on which to bind, as a String
146  attr_reader :host
147  # Maximum number of connections to accept at at ime, as a FixNum
148  attr_reader :maxConnections
149  # IO Device on which log messages should be written
150  attr_accessor :stdlog
151  # Set to true to cause the callbacks #connecting, #disconnecting, #starting,
152  # and #stopping to be called during the server's lifecycle
153  attr_accessor :audit
154  # Set to true to show more detailed logging
155  attr_accessor :debug
156
157  # Called when a client connects, if auditing is enabled.
158  #
159  # +client+:: a TCPSocket instances representing the client that connected
160  #
161  # Return true to allow this client to connect, false to prevent it.
162  def connecting(client)
163    addr = client.peeraddr
164    log("#{self.class.to_s} #{@host}:#{@port} client:#{addr[1]} " +
165        "#{addr[2]}<#{addr[3]}> connect")
166    true
167  end
168
169
170  # Called when a client disconnects, if audition is enabled.
171  #
172  # +clientPort+:: the port of the client that is connecting
173  def disconnecting(clientPort)
174    log("#{self.class.to_s} #{@host}:#{@port} " +
175      "client:#{clientPort} disconnect")
176  end
177
178  protected :connecting, :disconnecting
179
180  # Called when the server is starting up, if auditing is enabled.
181  def starting()
182    log("#{self.class.to_s} #{@host}:#{@port} start")
183  end
184
185  # Called when the server is shutting down, if auditing is enabled.
186  def stopping()
187    log("#{self.class.to_s} #{@host}:#{@port} stop")
188  end
189
190  protected :starting, :stopping
191
192  # Called if #debug is true whenever an unhandled exception is raised.
193  # This implementation simply logs the backtrace.
194  #
195  # +detail+:: The Exception that was caught
196  def error(detail)
197    log(detail.backtrace.join("\n"))
198  end
199
200  # Log a message to #stdlog, if it's defined.  This implementation
201  # outputs the timestamp and message to the log.
202  #
203  # +msg+:: the message to log
204  def log(msg)
205    if @stdlog
206      @stdlog.puts("[#{Time.new.ctime}] %s" % msg)
207      @stdlog.flush
208    end
209  end
210
211  protected :error, :log
212
213  # Create a new server
214  #
215  # +port+:: the port, as a FixNum, on which to listen.
216  # +host+:: the host to bind to
217  # +maxConnections+:: The maximum number of simultaneous connections to
218  #                    accept
219  # +stdlog+:: IO device on which to log messages
220  # +audit+:: if true, lifecycle callbacks will be called.  See #audit
221  # +debug+:: if true, error messages are logged.  See #debug
222  def initialize(port, host = DEFAULT_HOST, maxConnections = 4,
223    stdlog = $stderr, audit = false, debug = false)
224    @tcpServerThread = nil
225    @port = port
226    @host = host
227    @maxConnections = maxConnections
228    @connections = []
229    @connectionsMutex = Mutex.new
230    @connectionsCV = ConditionVariable.new
231    @stdlog = stdlog
232    @audit = audit
233    @debug = debug
234  end
235
236  # Start the server if it isn't already running
237  #
238  # +maxConnections+::
239  #   override +maxConnections+ given to the constructor.  A negative
240  #   value indicates that the value from the constructor should be used.
241  def start(maxConnections = -1)
242    raise "server is already running" if !stopped?
243    @shutdown = false
244    @maxConnections = maxConnections if maxConnections > 0
245    @@servicesMutex.synchronize  {
246      if GServer.in_service?(@port,@host)
247        raise "Port already in use: #{host}:#{@port}!"
248      end
249      @tcpServer = TCPServer.new(@host,@port)
250      @port = @tcpServer.addr[1]
251      @@services[@host] = {} unless @@services.has_key?(@host)
252      @@services[@host][@port] = self;
253    }
254    @tcpServerThread = Thread.new {
255      begin
256        starting if @audit
257        while !@shutdown
258          @connectionsMutex.synchronize  {
259             while @connections.size >= @maxConnections
260               @connectionsCV.wait(@connectionsMutex)
261             end
262          }
263          client = @tcpServer.accept
264          Thread.new(client)  { |myClient|
265            @connections << Thread.current
266            begin
267              myPort = myClient.peeraddr[1]
268              serve(myClient) if !@audit or connecting(myClient)
269            rescue => detail
270              error(detail) if @debug
271            ensure
272              begin
273                myClient.close
274              rescue
275              end
276              @connectionsMutex.synchronize {
277                @connections.delete(Thread.current)
278                @connectionsCV.signal
279              }
280              disconnecting(myPort) if @audit
281            end
282          }
283        end
284      rescue => detail
285        error(detail) if @debug
286      ensure
287        begin
288          @tcpServer.close
289        rescue
290        end
291        if @shutdown
292          @connectionsMutex.synchronize  {
293             while @connections.size > 0
294               @connectionsCV.wait(@connectionsMutex)
295             end
296          }
297        else
298          @connections.each { |c| c.raise "stop" }
299        end
300        @tcpServerThread = nil
301        @@servicesMutex.synchronize  {
302          @@services[@host].delete(@port)
303        }
304        stopping if @audit
305      end
306    }
307    self
308  end
309
310end
311