diff --git a/.env.example b/.env.example index c0729f0..b1b01b0 100644 --- a/.env.example +++ b/.env.example @@ -34,6 +34,12 @@ export DIFFER_MAX_BODY_SIZE='10485760' # 10 MB # Set how many diffs can be run in parallel. # export DIFFER_PARALLELISM=10 +# Once each worker process in the diff pool has done this many diffs, terminate +# it and start a new worker. If 0 or unset, there is no limit. +# (Think about this as restarting the worker pool after +# MAX_DIFFS_PER_WORKER * DIFFER_PARALLELISM diffs) +# export MAX_DIFFS_PER_WORKER=10 + # Instead of crashing when the process pool used for running diffs breaks, # keep accepting requests and try to restart the pool. # RESTART_BROKEN_DIFFER='true' diff --git a/docs/source/release-history.rst b/docs/source/release-history.rst index 2995324..f6842db 100644 --- a/docs/source/release-history.rst +++ b/docs/source/release-history.rst @@ -7,6 +7,8 @@ In Development - Fix XML prolog detection in diff server. This could occasionally have inferred character encoding in an XML document that was inaccurate. (:issue:`209`) +- Add ``MAX_DIFFS_PER_WORKER`` environment variable for diff server configuration. When set to a positive integer, a worker process that handles running the actual diff will be restarted after running this many diffs (the number of workers can be controlled with ``DIFFER_PARALLELISM``, which is not new). If ``0`` or not set, workers will only be restarted if they crash. Setting this appropriately can help keep resources within limits and prevent eventual hangs or crashes. (:issue:`210`) + Version 0.1.7 (2025-10-06) -------------------------- diff --git a/web_monitoring_diff/server/server.py b/web_monitoring_diff/server/server.py index 5e1e362..8f94a30 100644 --- a/web_monitoring_diff/server/server.py +++ b/web_monitoring_diff/server/server.py @@ -36,6 +36,7 @@ sentry_sdk.integrations.logging.ignore_logger('tornado.access') DIFFER_PARALLELISM = int(os.environ.get('DIFFER_PARALLELISM', 10)) +MAX_DIFFS_PER_WORKER = max(int(os.environ.get('MAX_DIFFS_PER_WORKER', 0)), 0) RESTART_BROKEN_DIFFER = os.environ.get('RESTART_BROKEN_DIFFER', 'False').strip().lower() == 'true' # Map tokens in the REST API to functions in modules. @@ -509,10 +510,18 @@ async def diff(self, func, a, b, params, tries=2): Actually do a diff between two pieces of content, optionally retrying if the process pool that executes the diff breaks. """ - executor = self.get_diff_executor() + reset = False + if MAX_DIFFS_PER_WORKER and self.settings.get('remaining_diffs_for_executor', 0) <= 0: + reset = True + self.settings['remaining_diffs_for_executor'] = MAX_DIFFS_PER_WORKER * DIFFER_PARALLELISM + executor = self.get_diff_executor(reset=reset) + + # executor = self.get_diff_executor() loop = asyncio.get_running_loop() for attempt in range(tries): try: + if MAX_DIFFS_PER_WORKER: + self.settings['remaining_diffs_for_executor'] -= 1 return await loop.run_in_executor( executor, functools.partial(caller, func, a, b, **params)) except concurrent.futures.process.BrokenProcessPool: @@ -522,7 +531,14 @@ async def diff(self, func, a, b, params, tries=2): # parallel diffs haven't already done it. If it's already # been reset, then we can just go and use the new one. old_executor, executor = executor, self.get_diff_executor() - if executor == old_executor: + if ( + executor == old_executor or + ( + MAX_DIFFS_PER_WORKER and + self.settings.get('remaining_diffs_for_executor', 0) <= 0 + ) + ): + self.settings['remaining_diffs_for_executor'] = MAX_DIFFS_PER_WORKER * DIFFER_PARALLELISM executor = self.get_diff_executor(reset=True) else: # If we shouldn't allow the server to keep rebuilding the diff --git a/web_monitoring_diff/tests/test_server_exc_handling.py b/web_monitoring_diff/tests/test_server_exc_handling.py index 8a706db..6c7b446 100644 --- a/web_monitoring_diff/tests/test_server_exc_handling.py +++ b/web_monitoring_diff/tests/test_server_exc_handling.py @@ -558,6 +558,34 @@ def get_executor(self, reset=False): assert not mock_quit.called + @patch('web_monitoring_diff.server.server.DIFFER_PARALLELISM', 2) + @patch('web_monitoring_diff.server.server.MAX_DIFFS_PER_WORKER', 2) + @tornado.testing.gen_test + async def test_max_diffs_per_worker(self): + # The executor is created lazily, so do one request to create it. + response = await self.fetch_async('/html_source_dmp?format=json&' + f'a=file://{fixture_path("empty.txt")}&' + f'b=file://{fixture_path("empty.txt")}') + assert response.code == 200 + original_executor = self._app.settings.get('diff_executor') + + # Make more than the max number of requests before restarting the diff + # executor. This needs to be done in parallel so we can make sure + # in-progress diffs don't get lost when rebuilding the executor. + requests = [ + self.fetch_async('/html_source_dmp?format=json&' + f'a=file://{fixture_path("empty.txt")}&' + f'b=file://{fixture_path("empty.txt")}') + for _i in range(4) + ] + responses = await asyncio.gather(*requests) + for response in responses: + assert response.code == 200 + + executor = self._app.settings.get('diff_executor') + assert original_executor is not executor + + def mock_diffing_method(c_body): return