File tree Expand file tree Collapse file tree 1 file changed +7
-3
lines changed
Expand file tree Collapse file tree 1 file changed +7
-3
lines changed Original file line number Diff line number Diff line change 22
33from __future__ import annotations
44
5+ import multiprocessing as mp
56import os
67from concurrent .futures import ProcessPoolExecutor
78from itertools import repeat
@@ -46,15 +47,18 @@ def parse_index_headers(
4647 trace_ranges = []
4748 for idx in range (n_blocks ):
4849 start , stop = idx * block_size , (idx + 1 ) * block_size
49- if stop > trace_count :
50- stop = trace_count
50+ stop = min (stop , trace_count )
5151
5252 trace_ranges .append ((start , stop ))
5353
54+ # For Unix async reads with s3fs/fsspec & multiprocessing,
55+ # use 'spawn' instead of default 'fork' to avoid deadlocks
56+ # on cloud stores. Slower but necessary. Default on Windows.
5457 num_workers = min (n_blocks , NUM_CPUS )
58+ context = mp .get_context ("spawn" )
5559
5660 tqdm_kw = dict (unit = "block" , dynamic_ncols = True )
57- with ProcessPoolExecutor (num_workers ) as executor :
61+ with ProcessPoolExecutor (num_workers , mp_context = context ) as executor :
5862 # pool.imap is lazy
5963 lazy_work = executor .map (
6064 header_scan_worker , # fn
You can’t perform that action at this time.
0 commit comments