From e9e41a4cf5d829b4177897fa47f3ef1814173064 Mon Sep 17 00:00:00 2001 From: Rob Brackett Date: Fri, 12 Dec 2025 13:51:43 -0800 Subject: [PATCH 1/4] Limit number of diffs per worker process in server This adds a new `MAX_DIFFS_PER_WORKER` environment variable that limits the number of diffs performed by a single worker. After a worker process performs this many diffs, it is shut down and replaced with a fresh process. This is an ugly first cut and needs a lot of cleaning up. It also doesn't really account for things per worker -- it just restarts the pool after DIFFER_PARALLELISM * MAX_DIFFS_PER_WORKER diffs. But newer versions of Python have an API that does this right and which we should eventually switch to when compatible. Fixes #202. --- .env.example | 6 ++++ web_monitoring_diff/server/server.py | 21 ++++++++++++-- .../tests/test_server_exc_handling.py | 28 +++++++++++++++++++ 3 files changed, 53 insertions(+), 2 deletions(-) diff --git a/.env.example b/.env.example index c0729f0..b1b01b0 100644 --- a/.env.example +++ b/.env.example @@ -34,6 +34,12 @@ export DIFFER_MAX_BODY_SIZE='10485760' # 10 MB # Set how many diffs can be run in parallel. # export DIFFER_PARALLELISM=10 +# Once each worker process in the diff pool has done this many diffs, terminate +# it and start a new worker. If 0 or unset, there is no limit. +# (Think about this as restarting the worker pool after +# MAX_DIFFS_PER_WORKER * DIFFER_PARALLELISM diffs) +# export MAX_DIFFS_PER_WORKER=10 + # Instead of crashing when the process pool used for running diffs breaks, # keep accepting requests and try to restart the pool. # RESTART_BROKEN_DIFFER='true' diff --git a/web_monitoring_diff/server/server.py b/web_monitoring_diff/server/server.py index 5e1e362..80eda98 100644 --- a/web_monitoring_diff/server/server.py +++ b/web_monitoring_diff/server/server.py @@ -36,6 +36,7 @@ sentry_sdk.integrations.logging.ignore_logger('tornado.access') DIFFER_PARALLELISM = int(os.environ.get('DIFFER_PARALLELISM', 10)) +MAX_DIFFS_PER_WORKER = max(int(os.environ.get('MAX_DIFFS_PER_WORKER', 0)), 0) RESTART_BROKEN_DIFFER = os.environ.get('RESTART_BROKEN_DIFFER', 'False').strip().lower() == 'true' # Map tokens in the REST API to functions in modules. @@ -509,10 +510,19 @@ async def diff(self, func, a, b, params, tries=2): Actually do a diff between two pieces of content, optionally retrying if the process pool that executes the diff breaks. """ - executor = self.get_diff_executor() + reset = False + if MAX_DIFFS_PER_WORKER and self.settings.get('remaining_diffs_for_executor', 0) <= 0: + reset = True + self.settings['remaining_diffs_for_executor'] = MAX_DIFFS_PER_WORKER * DIFFER_PARALLELISM + print(f'REMAINING:: {self.settings["remaining_diffs_for_executor"]}') + executor = self.get_diff_executor(reset=reset) + + # executor = self.get_diff_executor() loop = asyncio.get_running_loop() for attempt in range(tries): try: + if MAX_DIFFS_PER_WORKER: + self.settings['remaining_diffs_for_executor'] -= 1 return await loop.run_in_executor( executor, functools.partial(caller, func, a, b, **params)) except concurrent.futures.process.BrokenProcessPool: @@ -522,7 +532,14 @@ async def diff(self, func, a, b, params, tries=2): # parallel diffs haven't already done it. If it's already # been reset, then we can just go and use the new one. old_executor, executor = executor, self.get_diff_executor() - if executor == old_executor: + if ( + executor == old_executor or + ( + MAX_DIFFS_PER_WORKER and + self.settings.get('remaining_diffs_for_executor', 0) <= 0 + ) + ): + self.settings['remaining_diffs_for_executor'] = MAX_DIFFS_PER_WORKER * DIFFER_PARALLELISM executor = self.get_diff_executor(reset=True) else: # If we shouldn't allow the server to keep rebuilding the diff --git a/web_monitoring_diff/tests/test_server_exc_handling.py b/web_monitoring_diff/tests/test_server_exc_handling.py index 8a706db..66c3fad 100644 --- a/web_monitoring_diff/tests/test_server_exc_handling.py +++ b/web_monitoring_diff/tests/test_server_exc_handling.py @@ -558,6 +558,34 @@ def get_executor(self, reset=False): assert not mock_quit.called + @tornado.testing.gen_test + @patch('web_monitoring_diff.server.server.DIFFER_PARALLELISM', 2) + @patch('web_monitoring_diff.server.server.MAX_DIFFS_PER_WORKER', 2) + async def test_max_diffs_per_worker(self): + # The executor is created lazily, so do one request to create it. + response = await self.fetch_async('/html_source_dmp?format=json&' + f'a=file://{fixture_path("empty.txt")}&' + f'b=file://{fixture_path("empty.txt")}') + assert response.code == 200 + original_executor = self._app.settings.get('diff_executor') + + # Make more than the max number of requests before restarting the diff + # executor. This needs to be done in parallel so we can make sure + # in-progress diffs don't get lost when rebuilding the executor. + requests = [ + self.fetch_async('/html_source_dmp?format=json&' + f'a=file://{fixture_path("empty.txt")}&' + f'b=file://{fixture_path("empty.txt")}') + for _i in range(4) + ] + responses = await asyncio.gather(*requests) + for response in responses: + assert response.code == 200 + + executor = self._app.settings.get('diff_executor') + assert original_executor is not executor + + def mock_diffing_method(c_body): return From a1e99ec853c17c2a14d238268b42533df4042236 Mon Sep 17 00:00:00 2001 From: Rob Brackett Date: Fri, 12 Dec 2025 14:08:28 -0800 Subject: [PATCH 2/4] Stub release notes --- docs/source/release-history.rst | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/source/release-history.rst b/docs/source/release-history.rst index 2995324..f6842db 100644 --- a/docs/source/release-history.rst +++ b/docs/source/release-history.rst @@ -7,6 +7,8 @@ In Development - Fix XML prolog detection in diff server. This could occasionally have inferred character encoding in an XML document that was inaccurate. (:issue:`209`) +- Add ``MAX_DIFFS_PER_WORKER`` environment variable for diff server configuration. When set to a positive integer, a worker process that handles running the actual diff will be restarted after running this many diffs (the number of workers can be controlled with ``DIFFER_PARALLELISM``, which is not new). If ``0`` or not set, workers will only be restarted if they crash. Setting this appropriately can help keep resources within limits and prevent eventual hangs or crashes. (:issue:`210`) + Version 0.1.7 (2025-10-06) -------------------------- From 61ebba3b17f7dc64edd5f3fd160b6a1c91b5c619 Mon Sep 17 00:00:00 2001 From: Rob Brackett Date: Fri, 12 Dec 2025 14:13:45 -0800 Subject: [PATCH 3/4] Might have gotten these decorators backward --- web_monitoring_diff/tests/test_server_exc_handling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/web_monitoring_diff/tests/test_server_exc_handling.py b/web_monitoring_diff/tests/test_server_exc_handling.py index 66c3fad..6c7b446 100644 --- a/web_monitoring_diff/tests/test_server_exc_handling.py +++ b/web_monitoring_diff/tests/test_server_exc_handling.py @@ -558,9 +558,9 @@ def get_executor(self, reset=False): assert not mock_quit.called - @tornado.testing.gen_test @patch('web_monitoring_diff.server.server.DIFFER_PARALLELISM', 2) @patch('web_monitoring_diff.server.server.MAX_DIFFS_PER_WORKER', 2) + @tornado.testing.gen_test async def test_max_diffs_per_worker(self): # The executor is created lazily, so do one request to create it. response = await self.fetch_async('/html_source_dmp?format=json&' From 5a09fdc54304a10b0317e9975c6e3968dc677074 Mon Sep 17 00:00:00 2001 From: Rob Brackett Date: Thu, 18 Dec 2025 11:11:52 -0800 Subject: [PATCH 4/4] Delete stray print statement --- web_monitoring_diff/server/server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/web_monitoring_diff/server/server.py b/web_monitoring_diff/server/server.py index 80eda98..8f94a30 100644 --- a/web_monitoring_diff/server/server.py +++ b/web_monitoring_diff/server/server.py @@ -514,7 +514,6 @@ async def diff(self, func, a, b, params, tries=2): if MAX_DIFFS_PER_WORKER and self.settings.get('remaining_diffs_for_executor', 0) <= 0: reset = True self.settings['remaining_diffs_for_executor'] = MAX_DIFFS_PER_WORKER * DIFFER_PARALLELISM - print(f'REMAINING:: {self.settings["remaining_diffs_for_executor"]}') executor = self.get_diff_executor(reset=reset) # executor = self.get_diff_executor()