Source code for hupper.reloader

from collections import deque
from contextlib import contextmanager
import fnmatch
from glob import glob
import os
import re
import signal
import sys
import threading
import time

from .ipc import ProcessGroup, close_fd
from .logger import DefaultLogger, SilentLogger
from .utils import (
    WIN,
    default,
    is_stream_interactive,
    is_watchdog_supported,
    is_watchman_supported,
    resolve_spec,
)
from .worker import Worker, get_reloader, is_active

if WIN:
    from . import winapi


class FileMonitorProxy(object):
    """
    Wrap an :class:`hupper.interfaces.IFileMonitor` into an object that
    exposes a thread-safe interface back to the reloader to detect
    when it should reload.

    """

    monitor = None

    def __init__(self, callback, logger, ignore_files=None):
        self.callback = callback
        self.logger = logger
        self.changed_paths = set()
        self.ignore_files = [
            re.compile(fnmatch.translate(x)) for x in set(ignore_files or [])
        ]
        self.lock = threading.Lock()
        self.is_changed = False

    def add_path(self, path):
        # if the glob does not match any files then go ahead and pass
        # the pattern to the monitor anyway incase it is just a file that
        # is currently missing
        for p in glob(path, recursive=True) or [path]:
            if not any(x.match(p) for x in self.ignore_files):
                self.monitor.add_path(p)

    def start(self):
        self.monitor.start()

    def stop(self):
        self.monitor.stop()
        self.monitor.join()

    def file_changed(self, path):
        with self.lock:
            if path not in self.changed_paths:
                self.logger.info('{} changed; reloading ...'.format(path))
                self.changed_paths.add(path)

                if not self.is_changed:
                    self.is_changed = True
                    self.callback(self.changed_paths)

    def clear_changes(self):
        with self.lock:
            self.changed_paths = set()
            self.is_changed = False


class ControlSignal:
    byte = lambda x: chr(x).encode('ascii')

    SIGINT = byte(1)
    SIGHUP = byte(2)
    SIGTERM = byte(3)
    SIGCHLD = byte(4)
    FILE_CHANGED = byte(10)
    WORKER_COMMAND = byte(11)

    del byte


class WorkerResult:
    # exit - do not reload
    EXIT = 'exit'

    # reload immediately
    RELOAD = 'reload'

    # wait for changes before reloading
    WAIT = 'wait'


[docs] class Reloader(object): """ A wrapper class around a file monitor which will handle changes by restarting a new worker process. """ def __init__( self, worker_path, monitor_factory, logger, reload_interval=1, shutdown_interval=1, worker_args=None, worker_kwargs=None, ignore_files=None, ): self.worker_path = worker_path self.worker_args = worker_args self.worker_kwargs = worker_kwargs self.ignore_files = ignore_files self.monitor_factory = monitor_factory self.reload_interval = reload_interval self.shutdown_interval = shutdown_interval self.logger = logger self.monitor = None self.process_group = ProcessGroup()
[docs] def run(self): """ Execute the reloader forever, blocking the current thread. This will invoke ``sys.exit`` with the return code from the subprocess. If interrupted before the process starts then it'll exit with ``-1``. """ exitcode = -1 with self._setup_runtime(): while True: result, exitcode = self._run_worker() if result == WorkerResult.EXIT: break start = time.time() if result == WorkerResult.WAIT: result, _ = self._wait_for_changes() if result == WorkerResult.EXIT: break dt = self.reload_interval - (time.time() - start) if dt > 0: time.sleep(dt) sys.exit(exitcode)
[docs] def run_once(self): """ Execute the worker once. This method will return after the worker exits. Returns the exit code from the worker process. """ with self._setup_runtime(): _, exitcode = self._run_worker() return exitcode
def _run_worker(self): worker = Worker( self.worker_path, args=self.worker_args, kwargs=self.worker_kwargs ) return _run_worker(self, worker) def _wait_for_changes(self): worker = Worker(__name__ + '.wait_main') return _run_worker( self, worker, logger=SilentLogger(), shutdown_interval=0, ) @contextmanager def _setup_runtime(self): with self._start_control(): with self._start_monitor(): with self._capture_signals(): yield @contextmanager def _start_control(self): self.control_r, self.control_w = os.pipe() try: yield finally: close_fd(self.control_w) close_fd(self.control_r) self.control_r = self.control_w = None def _control_proxy(self, signal): return lambda *args: os.write(self.control_w, signal) @contextmanager def _start_monitor(self): proxy = FileMonitorProxy( self._control_proxy(ControlSignal.FILE_CHANGED), self.logger, self.ignore_files, ) proxy.monitor = self.monitor_factory( proxy.file_changed, interval=self.reload_interval, logger=self.logger, ) self.monitor = proxy self.monitor.start() try: yield finally: self.monitor = None proxy.stop() _signals = { 'SIGINT': ControlSignal.SIGINT, 'SIGHUP': ControlSignal.SIGHUP, 'SIGTERM': ControlSignal.SIGTERM, 'SIGCHLD': ControlSignal.SIGCHLD, } @contextmanager def _capture_signals(self): undo_handlers = [] try: for signame, control in self._signals.items(): signum = getattr(signal, signame, None) if signum is None: self.logger.debug( 'Skipping unsupported signal={}'.format(signame) ) continue handler = self._control_proxy(control) if WIN and signame == 'SIGINT': undo = winapi.AddConsoleCtrlHandler(handler) undo_handlers.append(undo) handler = signal.SIG_IGN psig = signal.signal(signum, handler) undo_handlers.append( lambda s=signum, p=psig: signal.signal(s, p) ) yield finally: for undo in reversed(undo_handlers): undo()
def _run_worker(self, worker, logger=None, shutdown_interval=None): if logger is None: logger = self.logger if shutdown_interval is None: shutdown_interval = self.shutdown_interval packets = deque() def handle_packet(packet): packets.append(packet) os.write(self.control_w, ControlSignal.WORKER_COMMAND) self.monitor.clear_changes() worker.start(handle_packet) result = WorkerResult.WAIT soft_kill = True logger.info('Starting monitor for PID %s.' % worker.pid) try: # register the worker with the process group self.process_group.add_child(worker.pid) while True: # process all packets before moving on to signals to avoid # missing any files that need to be watched if packets: cmd = packets.popleft() if cmd is None: if worker.is_alive: # the worker socket has died but the process is still # alive (somehow) so wait a brief period to see if it # dies on its own - if it does die then we want to # treat it as a crash and wait for changes before # reloading, if it doesn't die then we want to force # reload the app immediately because it probably # didn't die due to some file changes time.sleep(1) if worker.is_alive: logger.info( 'Worker pipe died unexpectedly, triggering a ' 'reload.' ) result = WorkerResult.RELOAD break os.write(self.control_w, ControlSignal.SIGCHLD) continue logger.debug('Received worker command "{}".'.format(cmd[0])) if cmd[0] == 'reload': result = WorkerResult.RELOAD break elif cmd[0] == 'watch_files': for path in cmd[1]: self.monitor.add_path(path) elif cmd[0] == 'graceful_shutdown': os.write(self.control_w, ControlSignal.SIGTERM) else: # pragma: no cover raise RuntimeError('received unknown control signal', cmd) # done handling the packet, continue to the next one # do not fall through here because it will block continue signal = os.read(self.control_r, 1) if not signal: logger.error('Control pipe died unexpectedly.') result = WorkerResult.EXIT break elif signal == ControlSignal.SIGINT: logger.info('Received SIGINT, waiting for server to exit ...') result = WorkerResult.EXIT # normally a SIGINT is sent automatically to the process # group and we want to avoid forwarding both a SIGINT and a # SIGTERM at the same time # # in the off chance that the SIGINT is not sent, we'll # just terminate after waiting shutdown_interval soft_kill = False break elif signal == ControlSignal.SIGHUP: logger.info('Received SIGHUP, triggering a reload.') result = WorkerResult.RELOAD break elif signal == ControlSignal.SIGTERM: logger.info('Received SIGTERM, triggering a shutdown.') result = WorkerResult.EXIT break elif signal == ControlSignal.FILE_CHANGED: if self.monitor.is_changed: result = WorkerResult.RELOAD break elif signal == ControlSignal.SIGCHLD: if not worker.is_alive: break if worker.is_alive and shutdown_interval: if soft_kill: logger.info('Gracefully killing the server.') worker.kill(soft=True) worker.wait(shutdown_interval) finally: if worker.is_alive: logger.info('Server did not exit, forcefully killing.') worker.kill() worker.join() else: worker.join() logger.debug('Server exited with code %d.' % worker.exitcode) return result, worker.exitcode def wait_main(): try: reloader = get_reloader() if is_stream_interactive(sys.stdin): input('Press ENTER or change a file to reload.\n') reloader.trigger_reload() else: # just block while we wait for a file to change print('Waiting for a file to change before reload.') while True: time.sleep(10) except KeyboardInterrupt: pass def find_default_monitor_factory(logger): spec = os.getenv('HUPPER_DEFAULT_MONITOR') if spec: monitor_factory = resolve_spec(spec) logger.debug('File monitor backend: ' + spec) elif is_watchman_supported(): from .watchman import WatchmanFileMonitor as monitor_factory logger.debug('File monitor backend: watchman') elif is_watchdog_supported(): from .watchdog import WatchdogFileMonitor as monitor_factory logger.debug('File monitor backend: watchdog') else: from .polling import PollingFileMonitor as monitor_factory logger.debug('File monitor backend: polling') return monitor_factory
[docs] def start_reloader( worker_path, reload_interval=1, shutdown_interval=default, verbose=1, logger=None, monitor_factory=None, worker_args=None, worker_kwargs=None, ignore_files=None, ): """ Start a monitor and then fork a worker process which starts by executing the importable function at ``worker_path``. If this function is called from a worker process that is already being monitored then it will return a reference to the current :class:`hupper.interfaces.IReloaderProxy` which can be used to communicate with the monitor. ``worker_path`` must be a dotted string pointing to a globally importable function that will be executed to start the worker. An example could be ``myapp.cli.main``. In most cases it will point at the same function that is invoking ``start_reloader`` in the first place. ``reload_interval`` is a value in seconds and will be used to throttle restarts. Default is ``1``. ``shutdown_interval`` is a value in seconds and will be used to trigger a graceful shutdown of the server. Set to ``None`` to disable the graceful shutdown. Default is the same as ``reload_interval``. ``verbose`` controls the output. Set to ``0`` to turn off any logging of activity and turn up to ``2`` for extra output. Default is ``1``. ``logger``, if supplied, supersedes ``verbose`` and should be an object implementing :class:`hupper.interfaces.ILogger`. ``monitor_factory`` is an instance of :class:`hupper.interfaces.IFileMonitorFactory`. If left unspecified, this will try to create a :class:`hupper.watchdog.WatchdogFileMonitor` if `watchdog <https://pypi.org/project/watchdog/>`_ is installed and will fallback to the less efficient :class:`hupper.polling.PollingFileMonitor` otherwise. If ``monitor_factory`` is ``None`` it can be overridden by the ``HUPPER_DEFAULT_MONITOR`` environment variable. It should be a dotted python path pointing at an object implementing :class:`hupper.interfaces.IFileMonitorFactory`. ``ignore_files`` if provided must be an iterable of shell-style patterns to ignore. """ if is_active(): return get_reloader() if logger is None: logger = DefaultLogger(verbose) if monitor_factory is None: monitor_factory = find_default_monitor_factory(logger) if shutdown_interval is default: shutdown_interval = reload_interval if reload_interval <= 0: raise ValueError( 'reload_interval must be greater than 0 to avoid spinning' ) reloader = Reloader( worker_path=worker_path, worker_args=worker_args, worker_kwargs=worker_kwargs, reload_interval=reload_interval, shutdown_interval=shutdown_interval, monitor_factory=monitor_factory, logger=logger, ignore_files=ignore_files, ) return reloader.run()