1# 2# Copyright (C) 2001 John W. Small All Rights Reserved 3# 4# Author:: John W. Small 5# Documentation:: Gavin Sinclair 6# Licence:: Ruby License 7 8require "socket" 9require "thread" 10 11# 12# GServer implements a generic server, featuring thread pool management, 13# simple logging, and multi-server management. See HttpServer in 14# <tt>xmlrpc/httpserver.rb</tt> in the Ruby standard library for an example of 15# GServer in action. 16# 17# Any kind of application-level server can be implemented using this class. 18# It accepts multiple simultaneous connections from clients, up to an optional 19# maximum number. Several _services_ (i.e. one service per TCP port) can be 20# run simultaneously, and stopped at any time through the class method 21# <tt>GServer.stop(port)</tt>. All the threading issues are handled, saving 22# you the effort. All events are optionally logged, but you can provide your 23# own event handlers if you wish. 24# 25# == Example 26# 27# Using GServer is simple. Below we implement a simple time server, run it, 28# query it, and shut it down. Try this code in +irb+: 29# 30# require 'gserver' 31# 32# # 33# # A server that returns the time in seconds since 1970. 34# # 35# class TimeServer < GServer 36# def initialize(port=10001, *args) 37# super(port, *args) 38# end 39# def serve(io) 40# io.puts(Time.now.to_s) 41# end 42# end 43# 44# # Run the server with logging enabled (it's a separate thread). 45# server = TimeServer.new 46# server.audit = true # Turn logging on. 47# server.start 48# 49# # *** Now point your browser to http://localhost:10001 to see it working *** 50# 51# # See if it's still running. 52# GServer.in_service?(10001) # -> true 53# server.stopped? # -> false 54# 55# # Shut the server down gracefully. 56# server.shutdown 57# 58# # Alternatively, stop it immediately. 59# GServer.stop(10001) 60# # or, of course, "server.stop". 61# 62# All the business of accepting connections and exception handling is taken 63# care of. All we have to do is implement the method that actually serves the 64# client. 65# 66# === Advanced 67# 68# As the example above shows, the way to use GServer is to subclass it to 69# create a specific server, overriding the +serve+ method. You can override 70# other methods as well if you wish, perhaps to collect statistics, or emit 71# more detailed logging. 72# 73# * #connecting 74# * #disconnecting 75# * #starting 76# * #stopping 77# 78# The above methods are only called if auditing is enabled, via #audit=. 79# 80# You can also override #log and #error if, for example, you wish to use a 81# more sophisticated logging system. 82# 83class GServer 84 85 DEFAULT_HOST = "127.0.0.1" 86 87 def serve(io) 88 end 89 90 @@services = {} # Hash of opened ports, i.e. services 91 @@servicesMutex = Mutex.new 92 93 # Stop the server running on the given port, bound to the given host 94 # 95 # +port+:: port, as a FixNum, of the server to stop 96 # +host+:: host on which to find the server to stop 97 def GServer.stop(port, host = DEFAULT_HOST) 98 @@servicesMutex.synchronize { 99 @@services[host][port].stop 100 } 101 end 102 103 # Check if a server is running on the given port and host 104 # 105 # +port+:: port, as a FixNum, of the server to check 106 # +host+:: host on which to find the server to check 107 # 108 # Returns true if a server is running on that port and host. 109 def GServer.in_service?(port, host = DEFAULT_HOST) 110 @@services.has_key?(host) and 111 @@services[host].has_key?(port) 112 end 113 114 # Stop the server 115 def stop 116 @connectionsMutex.synchronize { 117 if @tcpServerThread 118 @tcpServerThread.raise "stop" 119 end 120 } 121 end 122 123 # Returns true if the server has stopped. 124 def stopped? 125 @tcpServerThread == nil 126 end 127 128 # Schedule a shutdown for the server 129 def shutdown 130 @shutdown = true 131 end 132 133 # Return the current number of connected clients 134 def connections 135 @connections.size 136 end 137 138 # Join with the server thread 139 def join 140 @tcpServerThread.join if @tcpServerThread 141 end 142 143 # Port on which to listen, as a FixNum 144 attr_reader :port 145 # Host on which to bind, as a String 146 attr_reader :host 147 # Maximum number of connections to accept at at ime, as a FixNum 148 attr_reader :maxConnections 149 # IO Device on which log messages should be written 150 attr_accessor :stdlog 151 # Set to true to cause the callbacks #connecting, #disconnecting, #starting, 152 # and #stopping to be called during the server's lifecycle 153 attr_accessor :audit 154 # Set to true to show more detailed logging 155 attr_accessor :debug 156 157 # Called when a client connects, if auditing is enabled. 158 # 159 # +client+:: a TCPSocket instances representing the client that connected 160 # 161 # Return true to allow this client to connect, false to prevent it. 162 def connecting(client) 163 addr = client.peeraddr 164 log("#{self.class.to_s} #{@host}:#{@port} client:#{addr[1]} " + 165 "#{addr[2]}<#{addr[3]}> connect") 166 true 167 end 168 169 170 # Called when a client disconnects, if audition is enabled. 171 # 172 # +clientPort+:: the port of the client that is connecting 173 def disconnecting(clientPort) 174 log("#{self.class.to_s} #{@host}:#{@port} " + 175 "client:#{clientPort} disconnect") 176 end 177 178 protected :connecting, :disconnecting 179 180 # Called when the server is starting up, if auditing is enabled. 181 def starting() 182 log("#{self.class.to_s} #{@host}:#{@port} start") 183 end 184 185 # Called when the server is shutting down, if auditing is enabled. 186 def stopping() 187 log("#{self.class.to_s} #{@host}:#{@port} stop") 188 end 189 190 protected :starting, :stopping 191 192 # Called if #debug is true whenever an unhandled exception is raised. 193 # This implementation simply logs the backtrace. 194 # 195 # +detail+:: The Exception that was caught 196 def error(detail) 197 log(detail.backtrace.join("\n")) 198 end 199 200 # Log a message to #stdlog, if it's defined. This implementation 201 # outputs the timestamp and message to the log. 202 # 203 # +msg+:: the message to log 204 def log(msg) 205 if @stdlog 206 @stdlog.puts("[#{Time.new.ctime}] %s" % msg) 207 @stdlog.flush 208 end 209 end 210 211 protected :error, :log 212 213 # Create a new server 214 # 215 # +port+:: the port, as a FixNum, on which to listen. 216 # +host+:: the host to bind to 217 # +maxConnections+:: The maximum number of simultaneous connections to 218 # accept 219 # +stdlog+:: IO device on which to log messages 220 # +audit+:: if true, lifecycle callbacks will be called. See #audit 221 # +debug+:: if true, error messages are logged. See #debug 222 def initialize(port, host = DEFAULT_HOST, maxConnections = 4, 223 stdlog = $stderr, audit = false, debug = false) 224 @tcpServerThread = nil 225 @port = port 226 @host = host 227 @maxConnections = maxConnections 228 @connections = [] 229 @connectionsMutex = Mutex.new 230 @connectionsCV = ConditionVariable.new 231 @stdlog = stdlog 232 @audit = audit 233 @debug = debug 234 end 235 236 # Start the server if it isn't already running 237 # 238 # +maxConnections+:: 239 # override +maxConnections+ given to the constructor. A negative 240 # value indicates that the value from the constructor should be used. 241 def start(maxConnections = -1) 242 raise "server is already running" if !stopped? 243 @shutdown = false 244 @maxConnections = maxConnections if maxConnections > 0 245 @@servicesMutex.synchronize { 246 if GServer.in_service?(@port,@host) 247 raise "Port already in use: #{host}:#{@port}!" 248 end 249 @tcpServer = TCPServer.new(@host,@port) 250 @port = @tcpServer.addr[1] 251 @@services[@host] = {} unless @@services.has_key?(@host) 252 @@services[@host][@port] = self; 253 } 254 @tcpServerThread = Thread.new { 255 begin 256 starting if @audit 257 while !@shutdown 258 @connectionsMutex.synchronize { 259 while @connections.size >= @maxConnections 260 @connectionsCV.wait(@connectionsMutex) 261 end 262 } 263 client = @tcpServer.accept 264 Thread.new(client) { |myClient| 265 @connections << Thread.current 266 begin 267 myPort = myClient.peeraddr[1] 268 serve(myClient) if !@audit or connecting(myClient) 269 rescue => detail 270 error(detail) if @debug 271 ensure 272 begin 273 myClient.close 274 rescue 275 end 276 @connectionsMutex.synchronize { 277 @connections.delete(Thread.current) 278 @connectionsCV.signal 279 } 280 disconnecting(myPort) if @audit 281 end 282 } 283 end 284 rescue => detail 285 error(detail) if @debug 286 ensure 287 begin 288 @tcpServer.close 289 rescue 290 end 291 if @shutdown 292 @connectionsMutex.synchronize { 293 while @connections.size > 0 294 @connectionsCV.wait(@connectionsMutex) 295 end 296 } 297 else 298 @connections.each { |c| c.raise "stop" } 299 end 300 @tcpServerThread = nil 301 @@servicesMutex.synchronize { 302 @@services[@host].delete(@port) 303 } 304 stopping if @audit 305 end 306 } 307 self 308 end 309 310end 311