# Can't run RawIngestTask with processes != 1

XMLWordPrintable

#### Details

• Type: Bug
• Status: Done
• Resolution: Done
• Fix Version/s: None
• Component/s:
• Labels:
• Story Points:
6
• Sprint:
AP F20-4 (September)
• Team:
• Urgent?:
No

#### Description

Attempting to run RawIngestTask with multiple processes on DM-25806, after fixing a pickling problem with Gen3DatasetIngestTask, produces the following stack trace:

 Process ForkPoolWorker-1: Traceback (most recent call last):  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap  self.run()  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run  self._target(*self._args, **self._kwargs)  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker  task = get()  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 354, in get  return _ForkingPickler.loads(res) TypeError: __init__() takes from 1 to 2 positional arguments but 5 were given Process ForkPoolWorker-2: Traceback (most recent call last):  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap  self.run()  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run  self._target(*self._args, **self._kwargs)  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker  task = get()  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 354, in get  return _ForkingPickler.loads(res) TypeError: __init__() takes from 1 to 2 positional arguments but 5 were given Process ForkPoolWorker-3: Traceback (most recent call last):  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap  self.run()  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run  self._target(*self._args, **self._kwargs)  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker  task = get()  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 354, in get  return _ForkingPickler.loads(res) TypeError: __init__() takes from 1 to 2 positional arguments but 5 were given ^CProcess ForkPoolWorker-4: Process ForkPoolWorker-5: Traceback (most recent call last):  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 733, in next  item = self._items.popleft() IndexError: pop from an empty deque   During handling of the above exception, another exception occurred:   Traceback (most recent call last):  File "/scratch/krzys001/ap_verify/bin/ingest_dataset.py", line 29, in   result = runIngestion()  File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ap_verify.py", line 238, in runIngestion  ingestDatasetGen3(args.dataset, workspace, processes=args.processes)  File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ingestion.py", line 628, in ingestDatasetGen3  ingester.run(processes=processes)  File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ingestion.py", line 479, in run  self._ensureRaws(processes=processes)  File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ingestion.py", line 511, in _ensureRaws Traceback (most recent call last):  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap  self.run()  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run  self._target(*self._args, **self._kwargs)  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker  task = get()  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 351, in get  with self._rlock:  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__  return self._semlock.__enter__()  self._ingestRaws(dataFiles, processes=processes) KeyboardInterrupt  File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ingestion.py", line 538, in _ingestRaws Traceback (most recent call last):  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap  self.run()  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run  self._target(*self._args, **self._kwargs)  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker  task = get()  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 352, in get  res = self._reader.recv_bytes()  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes  buf = self._recv_bytes(maxlength)  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes  buf = self._recv(4)  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/connection.py", line 379, in _recv  chunk = read(handle, remaining) KeyboardInterrupt  self.ingester.run(dataFiles, run=None, processes=processes)  File "/software/lsstsw/stack_20200515/stack/miniconda3-4.7.12-46b24e8/Linux64/obs_base/20.0.0-36-g2de6156+58b4951e8a/python/lsst/obs/base/ingest.py", line 443, in run  exposureData = self.prep(files, pool=pool, processes=processes)  File "/software/lsstsw/stack_20200515/stack/miniconda3-4.7.12-46b24e8/Linux64/obs_base/20.0.0-36-g2de6156+58b4951e8a/python/lsst/obs/base/ingest.py", line 361, in prep  exposureData: List[RawExposureData] = self.groupByExposure(fileData)  File "/software/lsstsw/stack_20200515/stack/miniconda3-4.7.12-46b24e8/Linux64/obs_base/20.0.0-36-g2de6156+58b4951e8a/python/lsst/obs/base/ingest.py", line 280, in groupByExposure  for f in files:  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 737, in next  self._cond.wait(timeout)  File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/threading.py", line 296, in wait  waiter.acquire() 

Discussion on #dm-middleware revealed that this functionality has never been tested. Investigate this bug in a pure-obs_base context and fix the problem. Unit tests for pickling support of RawIngestTask and related classes are a good starting point.

#### Activity

Hide
Krzysztof Findeisen added a comment -

Jim Bosch, can you be more specific? How does the proposed factory convert positional arguments into keyword arguments without knowing what they are? Or are you saying that it's impossible to decouple this code from Task?

Show
Krzysztof Findeisen added a comment - Jim Bosch , can you be more specific? How does the proposed factory convert positional arguments into keyword arguments without knowing what they are? Or are you saying that it's impossible to decouple this code from Task ?
Hide
Tim Jenness added a comment -

He's suggesting that from reduce you return a callable trampoline function, rather than the class, that takes positional arguments and then calls the real constructor with keyword arguments. This is a special hand-crafted trampoline function specifically for unpickling of this particular class.

Show
Tim Jenness added a comment - He's suggesting that from reduce you return a callable trampoline function, rather than the class, that takes positional arguments and then calls the real constructor with keyword arguments. This is a special hand-crafted trampoline function specifically for unpickling of this particular class.
Hide
Krzysztof Findeisen added a comment - - edited

This ticket ended up scope-creeping a bit to cover DefineVisitsTask as well, which had a similar structure and issues to RawIngestTask.

Unfortunately, most parallelism support cannot be checked through unit tests, because calling either RawIngestTask.run or DefineVisitsTask.run requires a Camera object, which the hypothetical "DummyCam" doesn't have. For now, I've used ap_verify to check that everything appears to work with HSC; parallel ingestion should become part of our integration tests once DM-25806 merges.

Show
Krzysztof Findeisen added a comment - - edited This ticket ended up scope-creeping a bit to cover DefineVisitsTask as well, which had a similar structure and issues to RawIngestTask . Unfortunately, most parallelism support cannot be checked through unit tests, because calling either RawIngestTask.run or DefineVisitsTask.run requires a Camera object, which the hypothetical "DummyCam" doesn't have. For now, I've used ap_verify to check that everything appears to work with HSC; parallel ingestion should become part of our integration tests once DM-25806 merges.
Hide
Jim Bosch added a comment -

Looks good!

Any sense for whether the parallelization yields meaningful speedups?

Show
Jim Bosch added a comment - Looks good! Any sense for whether the parallelization yields meaningful speedups?
Hide
Krzysztof Findeisen added a comment - - edited

Sorry, it's hard to tell. I tried scaling up to the full HiTS-2015 dataset, but that revealed that the bottleneck (in ap_verify) was importing all the non-ingested datasets. I'll have to look into that separately (DM-26662).

Show
Krzysztof Findeisen added a comment - - edited Sorry, it's hard to tell. I tried scaling up to the full HiTS-2015 dataset, but that revealed that the bottleneck (in ap_verify ) was importing all the non-ingested datasets. I'll have to look into that separately ( DM-26662 ).

#### People

Assignee:
Krzysztof Findeisen
Reporter:
Krzysztof Findeisen
Reviewers:
Jim Bosch
Watchers:
Jim Bosch, Krzysztof Findeisen, Tim Jenness