Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ export DIFFER_MAX_BODY_SIZE='10485760' # 10 MB
# Set how many diffs can be run in parallel.
# export DIFFER_PARALLELISM=10

# Once each worker process in the diff pool has done this many diffs, terminate
# it and start a new worker. If 0 or unset, there is no limit.
# (Think about this as restarting the worker pool after
# MAX_DIFFS_PER_WORKER * DIFFER_PARALLELISM diffs)
# export MAX_DIFFS_PER_WORKER=10

# Instead of crashing when the process pool used for running diffs breaks,
# keep accepting requests and try to restart the pool.
# RESTART_BROKEN_DIFFER='true'
2 changes: 2 additions & 0 deletions docs/source/release-history.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ In Development

- Fix XML prolog detection in diff server. This could occasionally have inferred character encoding in an XML document that was inaccurate. (:issue:`209`)

- Add ``MAX_DIFFS_PER_WORKER`` environment variable for diff server configuration. When set to a positive integer, a worker process that handles running the actual diff will be restarted after running this many diffs (the number of workers can be controlled with ``DIFFER_PARALLELISM``, which is not new). If ``0`` or not set, workers will only be restarted if they crash. Setting this appropriately can help keep resources within limits and prevent eventual hangs or crashes. (:issue:`210`)


Version 0.1.7 (2025-10-06)
--------------------------
Expand Down
20 changes: 18 additions & 2 deletions web_monitoring_diff/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
sentry_sdk.integrations.logging.ignore_logger('tornado.access')

DIFFER_PARALLELISM = int(os.environ.get('DIFFER_PARALLELISM', 10))
MAX_DIFFS_PER_WORKER = max(int(os.environ.get('MAX_DIFFS_PER_WORKER', 0)), 0)
RESTART_BROKEN_DIFFER = os.environ.get('RESTART_BROKEN_DIFFER', 'False').strip().lower() == 'true'

# Map tokens in the REST API to functions in modules.
Expand Down Expand Up @@ -509,10 +510,18 @@ async def diff(self, func, a, b, params, tries=2):
Actually do a diff between two pieces of content, optionally retrying
if the process pool that executes the diff breaks.
"""
executor = self.get_diff_executor()
reset = False
if MAX_DIFFS_PER_WORKER and self.settings.get('remaining_diffs_for_executor', 0) <= 0:
reset = True
self.settings['remaining_diffs_for_executor'] = MAX_DIFFS_PER_WORKER * DIFFER_PARALLELISM
executor = self.get_diff_executor(reset=reset)

# executor = self.get_diff_executor()
loop = asyncio.get_running_loop()
for attempt in range(tries):
try:
if MAX_DIFFS_PER_WORKER:
self.settings['remaining_diffs_for_executor'] -= 1
return await loop.run_in_executor(
executor, functools.partial(caller, func, a, b, **params))
except concurrent.futures.process.BrokenProcessPool:
Expand All @@ -522,7 +531,14 @@ async def diff(self, func, a, b, params, tries=2):
# parallel diffs haven't already done it. If it's already
# been reset, then we can just go and use the new one.
old_executor, executor = executor, self.get_diff_executor()
if executor == old_executor:
if (
executor == old_executor or
(
MAX_DIFFS_PER_WORKER and
self.settings.get('remaining_diffs_for_executor', 0) <= 0
)
):
self.settings['remaining_diffs_for_executor'] = MAX_DIFFS_PER_WORKER * DIFFER_PARALLELISM
executor = self.get_diff_executor(reset=True)
else:
# If we shouldn't allow the server to keep rebuilding the
Expand Down
28 changes: 28 additions & 0 deletions web_monitoring_diff/tests/test_server_exc_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,34 @@ def get_executor(self, reset=False):
assert not mock_quit.called


@patch('web_monitoring_diff.server.server.DIFFER_PARALLELISM', 2)
@patch('web_monitoring_diff.server.server.MAX_DIFFS_PER_WORKER', 2)
@tornado.testing.gen_test
async def test_max_diffs_per_worker(self):
# The executor is created lazily, so do one request to create it.
response = await self.fetch_async('/html_source_dmp?format=json&'
f'a=file://{fixture_path("empty.txt")}&'
f'b=file://{fixture_path("empty.txt")}')
assert response.code == 200
original_executor = self._app.settings.get('diff_executor')

# Make more than the max number of requests before restarting the diff
# executor. This needs to be done in parallel so we can make sure
# in-progress diffs don't get lost when rebuilding the executor.
requests = [
self.fetch_async('/html_source_dmp?format=json&'
f'a=file://{fixture_path("empty.txt")}&'
f'b=file://{fixture_path("empty.txt")}')
for _i in range(4)
]
responses = await asyncio.gather(*requests)
for response in responses:
assert response.code == 200

executor = self._app.settings.get('diff_executor')
assert original_executor is not executor


def mock_diffing_method(c_body):
return

Expand Down