Source code for hupper.worker
from _thread import interrupt_main
from importlib.util import source_from_cache
import os
import signal
import site
import sys
import sysconfig
import threading
import time
import traceback
from . import ipc
from .interfaces import IReloaderProxy
from .utils import resolve_spec
class WatchSysModules(threading.Thread):
"""Poll ``sys.modules`` for imported modules."""
poll_interval = 1
ignore_system_paths = True
def __init__(self, callback):
super(WatchSysModules, self).__init__()
self.paths = set()
self.callback = callback
self.lock = threading.Lock()
self.stopped = False
self.system_paths = get_system_paths()
def run(self):
while not self.stopped:
self.update_paths()
time.sleep(self.poll_interval)
def stop(self):
self.stopped = True
def update_paths(self):
"""Check sys.modules for paths to add to our path set."""
new_paths = []
with self.lock:
for path in expand_source_paths(iter_module_paths()):
if path not in self.paths:
self.paths.add(path)
new_paths.append(path)
if new_paths:
self.watch_paths(new_paths)
def search_traceback(self, tb):
"""Inspect a traceback for new paths to add to our path set."""
new_paths = []
with self.lock:
for filename, *_ in traceback.extract_tb(tb):
path = os.path.abspath(filename)
if path not in self.paths:
self.paths.add(path)
new_paths.append(path)
if new_paths:
self.watch_paths(new_paths)
def watch_paths(self, paths):
if self.ignore_system_paths:
paths = [path for path in paths if not self.in_system_paths(path)]
if paths:
self.callback(paths)
def in_system_paths(self, path):
# use realpath to only ignore files that live in a system path
# versus a symlink which lives elsewhere
path = os.path.realpath(path)
for prefix in self.system_paths:
if path.startswith(prefix):
return True
return False
def get_py_path(path):
try:
return source_from_cache(path)
except ValueError:
# fallback for solitary *.pyc files outside of __pycache__
return path[:-1]
def get_site_packages(): # pragma: no cover
try:
paths = site.getsitepackages()
if site.ENABLE_USER_SITE:
paths.append(site.getusersitepackages())
return paths
# virtualenv does not ship with a getsitepackages impl so we fallback
# to using distutils if we can
# https://github.com/pypa/virtualenv/issues/355
except Exception:
try:
from distutils.sysconfig import get_python_lib
return [get_python_lib()]
# just incase, don't fail here, it's not worth it
except Exception:
return []
def get_system_paths():
paths = get_site_packages()
for name in {'stdlib', 'platstdlib', 'platlib', 'purelib'}:
path = sysconfig.get_path(name)
if path is not None:
paths.append(path)
return paths
def expand_source_paths(paths):
"""Convert pyc files into their source equivalents."""
for src_path in paths:
# only track the source path if we can find it to avoid double-reloads
# when the source and the compiled path change because on some
# platforms they are not changed at the same time
if src_path.endswith(('.pyc', '.pyo')):
py_path = get_py_path(src_path)
if os.path.exists(py_path):
src_path = py_path
yield src_path
def iter_module_paths(modules=None):
"""Yield paths of all imported modules."""
modules = modules or list(sys.modules.values())
for module in modules:
try:
filename = module.__file__
except (AttributeError, ImportError): # pragma: no cover
continue
if filename is not None:
abs_filename = os.path.abspath(filename)
if os.path.isfile(abs_filename):
yield abs_filename
class Worker(object):
"""A helper object for managing a worker process lifecycle."""
def __init__(self, spec, args=None, kwargs=None):
super(Worker, self).__init__()
self.worker_spec = spec
self.worker_args = args
self.worker_kwargs = kwargs
self.pipe, self._child_pipe = ipc.Pipe()
self.pid = None
self.process = None
self.exitcode = None
self.stdin_termios = None
def start(self, on_packet=None):
self.stdin_termios = ipc.snapshot_termios(sys.stdin)
kw = dict(
spec=self.worker_spec,
spec_args=self.worker_args,
spec_kwargs=self.worker_kwargs,
pipe=self._child_pipe,
)
self.process = ipc.spawn(
__name__ + '.worker_main',
kwargs=kw,
pass_fds=[self._child_pipe.r_fd, self._child_pipe.w_fd],
)
self.pid = self.process.pid
# activate the pipe after forking
self.pipe.activate(on_packet)
# kill the child side of the pipe after forking as the child is now
# responsible for it
self._child_pipe.close()
@property
def is_alive(self):
if self.exitcode is not None:
return False
if self.process:
return ipc.wait(self.process, timeout=0) is None
return False
def kill(self, soft=False):
return ipc.kill(self.process, soft=soft)
def wait(self, timeout=None):
return ipc.wait(self.process, timeout=timeout)
def join(self):
self.exitcode = self.wait()
if self.stdin_termios:
ipc.restore_termios(sys.stdin, self.stdin_termios)
if self.pipe:
try:
self.pipe.close()
except Exception: # pragma: no cover
pass
finally:
self.pipe = None
# set when the current process is being monitored
_reloader_proxy = None
[docs]
def get_reloader():
"""
Get a reference to the current :class:`hupper.interfaces.IReloaderProxy`.
Raises a ``RuntimeError`` if the current process is not actively being
monitored by a parent process.
"""
if _reloader_proxy is None:
raise RuntimeError('process is not controlled by hupper')
return _reloader_proxy
[docs]
def is_active():
"""
Return ``True`` if the current process being monitored by a parent process.
"""
return _reloader_proxy is not None
class ReloaderProxy(IReloaderProxy):
def __init__(self, pipe):
self.pipe = pipe
def watch_files(self, files):
files = [os.path.abspath(f) for f in files]
self.pipe.send(('watch_files', files))
def trigger_reload(self):
self.pipe.send(('reload',))
def graceful_shutdown(self):
self.pipe.send(('graceful_shutdown',))
def watch_control_pipe(pipe):
def handle_packet(packet):
if packet is None:
interrupt_main()
pipe.activate(handle_packet)
def worker_main(spec, pipe, spec_args=None, spec_kwargs=None):
if spec_args is None:
spec_args = []
if spec_kwargs is None:
spec_kwargs = {}
# activate the pipe after forking
watch_control_pipe(pipe)
# SIGHUP is not supported on windows
if hasattr(signal, 'SIGHUP'):
signal.signal(signal.SIGHUP, signal.SIG_IGN)
# disable pyc files for project code because it can cause timestamp
# issues in which files are reloaded twice
sys.dont_write_bytecode = True
global _reloader_proxy
_reloader_proxy = ReloaderProxy(pipe)
poller = WatchSysModules(_reloader_proxy.watch_files)
poller.daemon = True
poller.start()
# import the worker path before polling sys.modules
func = resolve_spec(spec)
# start the worker
try:
func(*spec_args, **spec_kwargs)
except BaseException: # catch any error
try:
# add files from the traceback before crashing
poller.search_traceback(sys.exc_info()[2])
except Exception: # pragma: no cover
pass
raise
finally:
try:
# attempt to send imported paths to the reloader process prior to
# closing
poller.update_paths()
poller.stop()
poller.join()
except Exception: # pragma: no cover
pass