-
-
Notifications
You must be signed in to change notification settings - Fork 33.9k
Description
Bug report
Bug description:
When using ProcessPoolExecutor with any context (forkserver, spawn or fork) the child processes will not use, inherit or share any multiprocess RLock when writing to or flushing the underlying stream. This can garble the stream. Below is a simple program to reproduce this behavior:
To execute it, do python3 -u ./bug.py --producer | python3 -u ./bug.py --consumer.
#!/usr/bin/env python3
import sys
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
import logging
import time
big_line = '*' * 10_000
def current_milli_time():
return time.time_ns() // 1_000_000
def single_producer():
logging.basicConfig(level=logging.INFO, format="%(message)s", stream=sys.stdout)
try:
while True:
logging.log(logging.INFO, big_line)
except KeyboardInterrupt:
pass
except BrokenPipeError:
pass
def run_producer():
mp_context = mp.get_context("forkserver")
futures = []
max_workers = 8
try:
with ProcessPoolExecutor(max_workers, mp_context=mp_context) as executor:
for _ in range(max_workers):
fut = executor.submit(single_producer)
futures.append(fut)
for fut in futures:
fut.result()
except KeyboardInterrupt:
pass
except BrokenPipeError:
pass
def run_consumer():
bad_line_count = 0
ok_line_count = 0
total_lines = 0
start_time = current_milli_time()
try:
print(f'Line length is {len(big_line):_}', flush=True)
for line in sys.stdin:
line = line.strip()
total_lines += 1
all_stars = '*' * len(line)
if line == all_stars:
if len(line) == len(big_line):
ok_line_count += 1
else:
bad_line_count += 1
else:
print(f'Got unexpected line: {line}', flush=True)
sys.exit(1)
if total_lines % 10_000 == 0:
spent_ms = current_milli_time() - start_time
lines_per_ms = total_lines // spent_ms
msg = f'OK line count: {ok_line_count:_}, garbled line count: {bad_line_count:_}, lines/ms: {lines_per_ms:_}'
print(msg, flush=True)
print('Stdin closed', flush=True)
except BrokenPipeError:
pass
except KeyboardInterrupt:
print('Consumer received KeyboardInterrupt', flush=True)
finally:
try:
print('Consumer exiting', flush=True)
print(f'OK line count: {ok_line_count:_}', flush=True)
print(f'Bad line count: {bad_line_count:_}', flush=True)
except BrokenPipeError:
pass
if __name__ == "__main__":
mp.set_start_method("forkserver")
logging.basicConfig(level=logging.INFO, format="%(message)s", stream=sys.stdout)
if '--producer' in sys.argv:
run_producer()
elif '--consumer' in sys.argv:
run_consumer()
else:
print('Unknown mode, exiting', flush=True)
sys.exit(1)Here is a repository containing this code, a manual fix that passes a multiprocessing.RLock to all children which solves the problem and a two helper scripts: https://github.com/ivarref/py-logging
I've verified that this issue is present on 3.13.*, 3.14.* and the main branch.
I think that a multiprocessing.RLock should be shared between the parent process and all sub-processes for logging to stdout and stderr. This should be done automatically.
Thanks and kind regards.
CPython versions tested on:
CPython main branch
Operating systems tested on:
Linux, macOS