Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-26526

Can't run RawIngestTask with processes != 1

    Details

      Description

      Attempting to run RawIngestTask with multiple processes on DM-25806, after fixing a pickling problem with Gen3DatasetIngestTask, produces the following stack trace:

      Process ForkPoolWorker-1:
      Traceback (most recent call last):
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
          self.run()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run
          self._target(*self._args, **self._kwargs)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker
          task = get()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 354, in get
          return _ForkingPickler.loads(res)
      TypeError: __init__() takes from 1 to 2 positional arguments but 5 were given
      Process ForkPoolWorker-2:
      Traceback (most recent call last):
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
          self.run()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run
          self._target(*self._args, **self._kwargs)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker
          task = get()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 354, in get
          return _ForkingPickler.loads(res)
      TypeError: __init__() takes from 1 to 2 positional arguments but 5 were given
      Process ForkPoolWorker-3:
      Traceback (most recent call last):
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
          self.run()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run
          self._target(*self._args, **self._kwargs)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker
          task = get()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 354, in get
          return _ForkingPickler.loads(res)
      TypeError: __init__() takes from 1 to 2 positional arguments but 5 were given
      ^CProcess ForkPoolWorker-4:
      Process ForkPoolWorker-5:
      Traceback (most recent call last):
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 733, in next
          item = self._items.popleft()
      IndexError: pop from an empty deque
       
      During handling of the above exception, another exception occurred:
       
      Traceback (most recent call last):
        File "/scratch/krzys001/ap_verify/bin/ingest_dataset.py", line 29, in <module>
          result = runIngestion()
        File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ap_verify.py", line 238, in runIngestion
          ingestDatasetGen3(args.dataset, workspace, processes=args.processes)
        File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ingestion.py", line 628, in ingestDatasetGen3
          ingester.run(processes=processes)
        File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ingestion.py", line 479, in run
          self._ensureRaws(processes=processes)
        File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ingestion.py", line 511, in _ensureRaws
      Traceback (most recent call last):
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
          self.run()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run
          self._target(*self._args, **self._kwargs)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker
          task = get()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 351, in get
          with self._rlock:
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
          return self._semlock.__enter__()
          self._ingestRaws(dataFiles, processes=processes)
      KeyboardInterrupt
        File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ingestion.py", line 538, in _ingestRaws
      Traceback (most recent call last):
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
          self.run()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run
          self._target(*self._args, **self._kwargs)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker
          task = get()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 352, in get
          res = self._reader.recv_bytes()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
          buf = self._recv_bytes(maxlength)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
          buf = self._recv(4)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
          chunk = read(handle, remaining)
      KeyboardInterrupt
          self.ingester.run(dataFiles, run=None, processes=processes)
        File "/software/lsstsw/stack_20200515/stack/miniconda3-4.7.12-46b24e8/Linux64/obs_base/20.0.0-36-g2de6156+58b4951e8a/python/lsst/obs/base/ingest.py", line 443, in run
          exposureData = self.prep(files, pool=pool, processes=processes)
        File "/software/lsstsw/stack_20200515/stack/miniconda3-4.7.12-46b24e8/Linux64/obs_base/20.0.0-36-g2de6156+58b4951e8a/python/lsst/obs/base/ingest.py", line 361, in prep
          exposureData: List[RawExposureData] = self.groupByExposure(fileData)
        File "/software/lsstsw/stack_20200515/stack/miniconda3-4.7.12-46b24e8/Linux64/obs_base/20.0.0-36-g2de6156+58b4951e8a/python/lsst/obs/base/ingest.py", line 280, in groupByExposure
          for f in files:
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 737, in next
          self._cond.wait(timeout)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/threading.py", line 296, in wait
          waiter.acquire()
      

      Discussion on #dm-middleware revealed that this functionality has never been tested. Investigate this bug in a pure-obs_base context and fix the problem. Unit tests for pickling support of RawIngestTask and related classes are a good starting point.

        Attachments

          Issue Links

            Activity

            Hide
            krzys Krzysztof Findeisen added a comment -

            Jim Bosch, can you be more specific? How does the proposed factory convert positional arguments into keyword arguments without knowing what they are? Or are you saying that it's impossible to decouple this code from Task?

            Show
            krzys Krzysztof Findeisen added a comment - Jim Bosch , can you be more specific? How does the proposed factory convert positional arguments into keyword arguments without knowing what they are? Or are you saying that it's impossible to decouple this code from Task ?
            Hide
            tjenness Tim Jenness added a comment -

            He's suggesting that from reduce you return a callable trampoline function, rather than the class, that takes positional arguments and then calls the real constructor with keyword arguments. This is a special hand-crafted trampoline function specifically for unpickling of this particular class.

            Show
            tjenness Tim Jenness added a comment - He's suggesting that from reduce you return a callable trampoline function, rather than the class, that takes positional arguments and then calls the real constructor with keyword arguments. This is a special hand-crafted trampoline function specifically for unpickling of this particular class.
            Hide
            krzys Krzysztof Findeisen added a comment - - edited

            This ticket ended up scope-creeping a bit to cover DefineVisitsTask as well, which had a similar structure and issues to RawIngestTask.

            Unfortunately, most parallelism support cannot be checked through unit tests, because calling either RawIngestTask.run or DefineVisitsTask.run requires a Camera object, which the hypothetical "DummyCam" doesn't have. For now, I've used ap_verify to check that everything appears to work with HSC; parallel ingestion should become part of our integration tests once DM-25806 merges.

            Show
            krzys Krzysztof Findeisen added a comment - - edited This ticket ended up scope-creeping a bit to cover DefineVisitsTask as well, which had a similar structure and issues to RawIngestTask . Unfortunately, most parallelism support cannot be checked through unit tests, because calling either RawIngestTask.run or DefineVisitsTask.run requires a Camera object, which the hypothetical "DummyCam" doesn't have. For now, I've used ap_verify to check that everything appears to work with HSC; parallel ingestion should become part of our integration tests once DM-25806 merges.
            Hide
            jbosch Jim Bosch added a comment -

            Looks good!

            Any sense for whether the parallelization yields meaningful speedups?

            Show
            jbosch Jim Bosch added a comment - Looks good! Any sense for whether the parallelization yields meaningful speedups?
            Hide
            krzys Krzysztof Findeisen added a comment - - edited

            Sorry, it's hard to tell. I tried scaling up to the full HiTS-2015 dataset, but that revealed that the bottleneck (in ap_verify) was importing all the non-ingested datasets. I'll have to look into that separately (DM-26662).

            Show
            krzys Krzysztof Findeisen added a comment - - edited Sorry, it's hard to tell. I tried scaling up to the full HiTS-2015 dataset, but that revealed that the bottleneck (in ap_verify ) was importing all the non-ingested datasets. I'll have to look into that separately ( DM-26662 ).

              People

              • Assignee:
                krzys Krzysztof Findeisen
                Reporter:
                krzys Krzysztof Findeisen
                Reviewers:
                Jim Bosch
                Watchers:
                Jim Bosch, Krzysztof Findeisen, Tim Jenness
              • Votes:
                0 Vote for this issue
                Watchers:
                3 Start watching this issue

                Dates

                • Created:
                  Updated:
                  Resolved: