Uploaded image for project: 'Data Management'
  1. Data Management
  2. DM-26526

Can't run RawIngestTask with processes != 1

    XMLWordPrintable

Details

    • 6
    • AP F20-4 (September)
    • Alert Production
    • No

    Description

      Attempting to run RawIngestTask with multiple processes on DM-25806, after fixing a pickling problem with Gen3DatasetIngestTask, produces the following stack trace:

      Process ForkPoolWorker-1:
      Traceback (most recent call last):
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
          self.run()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run
          self._target(*self._args, **self._kwargs)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker
          task = get()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 354, in get
          return _ForkingPickler.loads(res)
      TypeError: __init__() takes from 1 to 2 positional arguments but 5 were given
      Process ForkPoolWorker-2:
      Traceback (most recent call last):
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
          self.run()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run
          self._target(*self._args, **self._kwargs)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker
          task = get()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 354, in get
          return _ForkingPickler.loads(res)
      TypeError: __init__() takes from 1 to 2 positional arguments but 5 were given
      Process ForkPoolWorker-3:
      Traceback (most recent call last):
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
          self.run()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run
          self._target(*self._args, **self._kwargs)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker
          task = get()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 354, in get
          return _ForkingPickler.loads(res)
      TypeError: __init__() takes from 1 to 2 positional arguments but 5 were given
      ^CProcess ForkPoolWorker-4:
      Process ForkPoolWorker-5:
      Traceback (most recent call last):
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 733, in next
          item = self._items.popleft()
      IndexError: pop from an empty deque
       
      During handling of the above exception, another exception occurred:
       
      Traceback (most recent call last):
        File "/scratch/krzys001/ap_verify/bin/ingest_dataset.py", line 29, in <module>
          result = runIngestion()
        File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ap_verify.py", line 238, in runIngestion
          ingestDatasetGen3(args.dataset, workspace, processes=args.processes)
        File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ingestion.py", line 628, in ingestDatasetGen3
          ingester.run(processes=processes)
        File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ingestion.py", line 479, in run
          self._ensureRaws(processes=processes)
        File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ingestion.py", line 511, in _ensureRaws
      Traceback (most recent call last):
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
          self.run()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run
          self._target(*self._args, **self._kwargs)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker
          task = get()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 351, in get
          with self._rlock:
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/synchronize.py", line 95, in __enter__
          return self._semlock.__enter__()
          self._ingestRaws(dataFiles, processes=processes)
      KeyboardInterrupt
        File "/scratch/krzys001/ap_verify/python/lsst/ap/verify/ingestion.py", line 538, in _ingestRaws
      Traceback (most recent call last):
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
          self.run()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/process.py", line 99, in run
          self._target(*self._args, **self._kwargs)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 110, in worker
          task = get()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/queues.py", line 352, in get
          res = self._reader.recv_bytes()
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/connection.py", line 216, in recv_bytes
          buf = self._recv_bytes(maxlength)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/connection.py", line 407, in _recv_bytes
          buf = self._recv(4)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/connection.py", line 379, in _recv
          chunk = read(handle, remaining)
      KeyboardInterrupt
          self.ingester.run(dataFiles, run=None, processes=processes)
        File "/software/lsstsw/stack_20200515/stack/miniconda3-4.7.12-46b24e8/Linux64/obs_base/20.0.0-36-g2de6156+58b4951e8a/python/lsst/obs/base/ingest.py", line 443, in run
          exposureData = self.prep(files, pool=pool, processes=processes)
        File "/software/lsstsw/stack_20200515/stack/miniconda3-4.7.12-46b24e8/Linux64/obs_base/20.0.0-36-g2de6156+58b4951e8a/python/lsst/obs/base/ingest.py", line 361, in prep
          exposureData: List[RawExposureData] = self.groupByExposure(fileData)
        File "/software/lsstsw/stack_20200515/stack/miniconda3-4.7.12-46b24e8/Linux64/obs_base/20.0.0-36-g2de6156+58b4951e8a/python/lsst/obs/base/ingest.py", line 280, in groupByExposure
          for f in files:
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/multiprocessing/pool.py", line 737, in next
          self._cond.wait(timeout)
        File "/software/lsstsw/stack_20200515/python/miniconda3-4.7.12/envs/lsst-scipipe/lib/python3.7/threading.py", line 296, in wait
          waiter.acquire()
      

      Discussion on #dm-middleware revealed that this functionality has never been tested. Investigate this bug in a pure-obs_base context and fix the problem. Unit tests for pickling support of RawIngestTask and related classes are a good starting point.

      Attachments

        Issue Links

          Activity

            Note that the above stack trace was generated with processes=2, so I'm not sure why there are 5 workers.

            krzys Krzysztof Findeisen added a comment - Note that the above stack trace was generated with processes=2 , so I'm not sure why there are 5 workers.

            I've run into an implementation problem. Task serializes all its state (currently config, name, parent, and log) as a positional tuple. RawIngestTask requires that all of these (except config) be passed by keyword to the constructor, but the information about these arguments' identities is contained in Task. I can't think of an implementation of either __reduce__ or __setstate__ that can delegate to Task, but I'm loath to have a subclass depend on exactly what extra state Task defines.

            jbosch or tjenness, since you've been pushing for heavier use of keywords for a while, how do you usually resolve these kinds of API mismatches?

            krzys Krzysztof Findeisen added a comment - I've run into an implementation problem. Task serializes all its state (currently config, name, parent, and log) as a positional tuple. RawIngestTask requires that all of these (except config) be passed by keyword to the constructor, but the information about these arguments' identities is contained in Task . I can't think of an implementation of either __reduce__ or __setstate__ that can delegate to Task , but I'm loath to have a subclass depend on exactly what extra state Task defines. jbosch or tjenness , since you've been pushing for heavier use of keywords for a while, how do you usually resolve these kinds of API mismatches?
            tjenness Tim Jenness added a comment -

            Pickle fundamentally confuses me in its opaqueness. __getnewargs_ex__ seems to be the only method that explicitly deals with keyword arguments.

            tjenness Tim Jenness added a comment - Pickle fundamentally confuses me in its opaqueness. __getnewargs_ex__ seems to be the only method that explicitly deals with keyword arguments.
            jbosch Jim Bosch added a comment -

            My preference is to use __reduce__, but with a private classmethod that takes only positional arguments as the callable.

            jbosch Jim Bosch added a comment - My preference is to use __reduce__ , but with a private classmethod that takes only positional arguments as the callable.

            jbosch, can you be more specific? How does the proposed factory convert positional arguments into keyword arguments without knowing what they are? Or are you saying that it's impossible to decouple this code from Task?

            krzys Krzysztof Findeisen added a comment - jbosch , can you be more specific? How does the proposed factory convert positional arguments into keyword arguments without knowing what they are? Or are you saying that it's impossible to decouple this code from Task ?
            tjenness Tim Jenness added a comment -

            He's suggesting that from reduce you return a callable trampoline function, rather than the class, that takes positional arguments and then calls the real constructor with keyword arguments. This is a special hand-crafted trampoline function specifically for unpickling of this particular class.

            tjenness Tim Jenness added a comment - He's suggesting that from reduce you return a callable trampoline function, rather than the class, that takes positional arguments and then calls the real constructor with keyword arguments. This is a special hand-crafted trampoline function specifically for unpickling of this particular class.
            krzys Krzysztof Findeisen added a comment - - edited

            This ticket ended up scope-creeping a bit to cover DefineVisitsTask as well, which had a similar structure and issues to RawIngestTask.

            Unfortunately, most parallelism support cannot be checked through unit tests, because calling either RawIngestTask.run or DefineVisitsTask.run requires a Camera object, which the hypothetical "DummyCam" doesn't have. For now, I've used ap_verify to check that everything appears to work with HSC; parallel ingestion should become part of our integration tests once DM-25806 merges.

            krzys Krzysztof Findeisen added a comment - - edited This ticket ended up scope-creeping a bit to cover DefineVisitsTask as well, which had a similar structure and issues to RawIngestTask . Unfortunately, most parallelism support cannot be checked through unit tests, because calling either RawIngestTask.run or DefineVisitsTask.run requires a Camera object, which the hypothetical "DummyCam" doesn't have. For now, I've used ap_verify to check that everything appears to work with HSC; parallel ingestion should become part of our integration tests once DM-25806 merges.
            jbosch Jim Bosch added a comment -

            Looks good!

            Any sense for whether the parallelization yields meaningful speedups?

            jbosch Jim Bosch added a comment - Looks good! Any sense for whether the parallelization yields meaningful speedups?
            krzys Krzysztof Findeisen added a comment - - edited

            Sorry, it's hard to tell. I tried scaling up to the full HiTS-2015 dataset, but that revealed that the bottleneck (in ap_verify) was importing all the non-ingested datasets. I'll have to look into that separately (DM-26662).

            krzys Krzysztof Findeisen added a comment - - edited Sorry, it's hard to tell. I tried scaling up to the full HiTS-2015 dataset, but that revealed that the bottleneck (in ap_verify ) was importing all the non-ingested datasets. I'll have to look into that separately ( DM-26662 ).

            People

              krzys Krzysztof Findeisen
              krzys Krzysztof Findeisen
              Jim Bosch
              Jim Bosch, Krzysztof Findeisen, Tim Jenness
              Votes:
              0 Vote for this issue
              Watchers:
              3 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved:

                Jenkins

                  No builds found.