Merge commit '4459824cc8196ad78fe9258b6c560ad46fe4cd52'

* commit '4459824cc8196ad78fe9258b6c560ad46fe4cd52':
  websocket: do not exit at the middle of process
  websocket: restore signals after processing
  websocket: support SIGTERM as exit signal
This commit is contained in:
Peter Åstrand (astrand) 2013-11-27 14:49:54 +01:00
commit cbf05f84fe
1 changed files with 114 additions and 92 deletions

View File

@ -466,7 +466,7 @@ class WebSocketRequestHandler(SimpleHTTPRequestHandler):
try: try:
self.new_websocket_client() self.new_websocket_client()
except self.CClose: except self.CClose, WebSocketServer.Terminate:
# Close the client # Close the client
_, exc, _ = sys.exc_info() _, exc, _ = sys.exc_info()
self.send_close(exc.args[0], exc.args[1]) self.send_close(exc.args[0], exc.args[1])
@ -530,6 +530,9 @@ class WebSocketServer(object):
class EClose(Exception): class EClose(Exception):
pass pass
class Terminate(Exception):
pass
def __init__(self, RequestHandlerClass, listen_host='', def __init__(self, RequestHandlerClass, listen_host='',
listen_port=None, source_is_ipv6=False, listen_port=None, source_is_ipv6=False,
verbose=False, cert='', key='', ssl_only=None, verbose=False, cert='', key='', ssl_only=None,
@ -654,8 +657,7 @@ class WebSocketServer(object):
if os.fork() > 0: os._exit(0) # Parent exits if os.fork() > 0: os._exit(0) # Parent exits
# Signal handling # Signal handling
def terminate(a,b): os._exit(0) signal.signal(signal.SIGTERM, signal.SIG_IGN)
signal.signal(signal.SIGTERM, terminate)
signal.signal(signal.SIGINT, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN)
# Close open files # Close open files
@ -778,6 +780,9 @@ class WebSocketServer(object):
#self.vmsg("Running poll()") #self.vmsg("Running poll()")
pass pass
def terminate(self):
raise self.Terminate()
def fallback_SIGCHLD(self, sig, stack): def fallback_SIGCHLD(self, sig, stack):
# Reap zombies when using os.fork() (python 2.4) # Reap zombies when using os.fork() (python 2.4)
self.vmsg("Got SIGCHLD, reaping zombies") self.vmsg("Got SIGCHLD, reaping zombies")
@ -791,7 +796,11 @@ class WebSocketServer(object):
def do_SIGINT(self, sig, stack): def do_SIGINT(self, sig, stack):
self.msg("Got SIGINT, exiting") self.msg("Got SIGINT, exiting")
sys.exit(0) self.terminate()
def do_SIGTERM(self, sig, stack):
self.msg("Got SIGTERM, exiting")
self.terminate()
def top_new_client(self, startsock, address): def top_new_client(self, startsock, address):
""" Do something with a WebSockets client connection. """ """ Do something with a WebSockets client connection. """
@ -805,6 +814,8 @@ class WebSocketServer(object):
# Connection was not a WebSockets connection # Connection was not a WebSockets connection
if exc.args[0]: if exc.args[0]:
self.msg("%s: %s" % (address[0], exc.args[0])) self.msg("%s: %s" % (address[0], exc.args[0]))
except WebSocketServer.Terminate:
raise
except Exception: except Exception:
_, exc, _ = sys.exc_info() _, exc, _ = sys.exc_info()
self.msg("handler exception: %s" % str(exc)) self.msg("handler exception: %s" % str(exc))
@ -831,108 +842,119 @@ class WebSocketServer(object):
self.started() # Some things need to happen after daemonizing self.started() # Some things need to happen after daemonizing
# Allow override of SIGINT # Allow override of signals
original_signals = {
signal.SIGINT: signal.getsignal(signal.SIGINT),
signal.SIGTERM: signal.getsignal(signal.SIGTERM),
signal.SIGCHLD: signal.getsignal(signal.SIGCHLD),
}
signal.signal(signal.SIGINT, self.do_SIGINT) signal.signal(signal.SIGINT, self.do_SIGINT)
signal.signal(signal.SIGTERM, self.do_SIGTERM)
signal.signal(signal.SIGCHLD, self.fallback_SIGCHLD) signal.signal(signal.SIGCHLD, self.fallback_SIGCHLD)
last_active_time = self.launch_time last_active_time = self.launch_time
while True: try:
try: while True:
try: try:
startsock = None try:
pid = err = 0 startsock = None
child_count = 0 pid = err = 0
child_count = 0
if multiprocessing: if multiprocessing:
# Collect zombie child processes # Collect zombie child processes
child_count = len(multiprocessing.active_children()) child_count = len(multiprocessing.active_children())
time_elapsed = time.time() - self.launch_time time_elapsed = time.time() - self.launch_time
if self.timeout and time_elapsed > self.timeout: if self.timeout and time_elapsed > self.timeout:
self.msg('listener exit due to --timeout %s' self.msg('listener exit due to --timeout %s'
% self.timeout) % self.timeout)
break
if self.idle_timeout:
idle_time = 0
if child_count == 0:
idle_time = time.time() - last_active_time
else:
idle_time = 0
last_active_time = time.time()
if idle_time > self.idle_timeout and child_count == 0:
self.msg('listener exit due to --idle-timeout %s'
% self.idle_timeout)
break break
try: if self.idle_timeout:
self.poll() idle_time = 0
if child_count == 0:
idle_time = time.time() - last_active_time
else:
idle_time = 0
last_active_time = time.time()
ready = select.select([lsock], [], [], 1)[0] if idle_time > self.idle_timeout and child_count == 0:
if lsock in ready: self.msg('listener exit due to --idle-timeout %s'
startsock, address = lsock.accept() % self.idle_timeout)
break
try:
self.poll()
ready = select.select([lsock], [], [], 1)[0]
if lsock in ready:
startsock, address = lsock.accept()
else:
continue
except self.Terminate:
raise
except Exception:
_, exc, _ = sys.exc_info()
if hasattr(exc, 'errno'):
err = exc.errno
elif hasattr(exc, 'args'):
err = exc.args[0]
else:
err = exc[0]
if err == errno.EINTR:
self.vmsg("Ignoring interrupted syscall")
continue
else:
raise
if self.run_once:
# Run in same process if run_once
self.top_new_client(startsock, address)
if self.ws_connection :
self.msg('%s: exiting due to --run-once'
% address[0])
break
elif multiprocessing:
self.vmsg('%s: new handler Process' % address[0])
p = multiprocessing.Process(
target=self.top_new_client,
args=(startsock, address))
p.start()
# child will not return
else: else:
continue # python 2.4
self.vmsg('%s: forking handler' % address[0])
pid = os.fork()
if pid == 0:
# child handler process
self.top_new_client(startsock, address)
break # child process exits
# parent process
self.handler_id += 1
except (self.Terminate, SystemExit, KeyboardInterrupt):
_, exc, _ = sys.exc_info()
print("In exit")
break
except Exception: except Exception:
_, exc, _ = sys.exc_info() _, exc, _ = sys.exc_info()
if hasattr(exc, 'errno'): self.msg("handler exception: %s" % str(exc))
err = exc.errno if self.verbose:
elif hasattr(exc, 'args'): self.msg(traceback.format_exc())
err = exc.args[0]
else:
err = exc[0]
if err == errno.EINTR:
self.vmsg("Ignoring interrupted syscall")
continue
else:
raise
if self.run_once:
# Run in same process if run_once
self.top_new_client(startsock, address)
if self.ws_connection :
self.msg('%s: exiting due to --run-once'
% address[0])
break
elif multiprocessing:
self.vmsg('%s: new handler Process' % address[0])
p = multiprocessing.Process(
target=self.top_new_client,
args=(startsock, address))
p.start()
# child will not return
else:
# python 2.4
self.vmsg('%s: forking handler' % address[0])
pid = os.fork()
if pid == 0:
# child handler process
self.top_new_client(startsock, address)
break # child process exits
# parent process finally:
self.handler_id += 1 if startsock:
startsock.close()
finally:
# Close listen port
self.vmsg("Closing socket listening at %s:%s"
% (self.listen_host, self.listen_port))
lsock.close()
except KeyboardInterrupt: # Restore signals
_, exc, _ = sys.exc_info() for sig, func in original_signals.items():
print("In KeyboardInterrupt") signal.signal(sig, func)
pass
except SystemExit:
_, exc, _ = sys.exc_info()
print("In SystemExit")
break
except Exception:
_, exc, _ = sys.exc_info()
self.msg("handler exception: %s" % str(exc))
if self.verbose:
self.msg(traceback.format_exc())
finally:
if startsock:
startsock.close()
# Close listen port
self.vmsg("Closing socket listening at %s:%s"
% (self.listen_host, self.listen_port))
lsock.close()