Details
-
Type:
Story
-
Status: Done
-
Resolution: Done
-
Fix Version/s: None
-
Component/s: ctrl_mpexec
-
Labels:
-
Story Points:4
-
Epic Link:
-
Sprint:DB_F20_06
-
Team:Data Access and Database
-
Urgent?:No
Description
`pipetask` uses `multiprocessing` to run tasks concurrently and it appears that crash (like SEGV) in a child process is not handled in a reasonable way. Need to understand what happens when child dies unexpectedly and how to workaround that.
Attachments
Issue Links
Activity
Doing some testing and reading multiprocessing code it is clear that Pool cannot handle crashing processes. Pool protocol depends on worker process sending back some response and if process is gone then no response arrives and Pool fails to realize that job has failed. I don't think we can fix Pool, searching Python issue tracker I found bug reports from many years ago about this:
https://bugs.python.org/issue9205
https://bugs.python.org/issue22393
Latter has some "recent" activity last year so maybe in couple of years it will get fixed.
I think we have to switch to low-level Process and manage job queue ourselves in this case. We are not doing anything too complicated in pipetask (yet), should not be a big change.
Writing our own pool system seems like a big step that we should discuss on Thursday.
Given how rare segvs are I'm inclined to treat that case as a lower priority.
I think the priorities are:
- Run all possible quanta even if some are failing
- Trap SIGTERM during butler puts (which is a separate ticket).
Trying to workaround Pool behavior convinces me that Pool is hopeless and it has to go. Worst thing about it is that you cannot stop the process, which is essential for handling timed out jobs in our case. I have replaced the whole thing with a trivial implementation of process management using multiprocessing.Process for process execution. multiprocessing still has potential issues because it does many things behind the curtain which could mess things up (in particular it defines Process.terminate() method but says that it's behavior is undefined). I think the right thing to do is to just fork() but I'm more or less OK now with multiprocessing as we are not using any advanced stuff that could break. I added more unit tests to verify that new things work as expected, waiting for Jenkins to pass (or break).
And of course Jenkins fails. I think the reason is that we carry database file descriptor across fork(), the errors that I see look like:
[2020-08-20T17:52:02.132263Z] Process task-2:
|
[2020-08-20T17:52:02.132322Z] Traceback (most recent call last):
|
[2020-08-20T17:52:02.132327Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1278, in _execute_context
|
[2020-08-20T17:52:02.132332Z] cursor, statement, parameters, context
|
[2020-08-20T17:52:02.132336Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
|
[2020-08-20T17:52:02.132339Z] cursor.execute(statement, parameters)
|
[2020-08-20T17:52:02.132343Z] sqlite3.DatabaseError: database disk image is malformed
|
[2020-08-20T17:52:02.132346Z]
|
[2020-08-20T17:52:02.132354Z] The above exception was the direct cause of the following exception:
|
[2020-08-20T17:52:02.132357Z]
|
[2020-08-20T17:52:02.132360Z] Traceback (most recent call last):
|
[2020-08-20T17:52:02.132363Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
|
[2020-08-20T17:52:02.132366Z] self.run()
|
[2020-08-20T17:52:02.132370Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 99, in run
|
[2020-08-20T17:52:02.132374Z] self._target(*self._args, **self._kwargs)
|
[2020-08-20T17:52:02.132377Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/ctrl_mpexec/tickets.DM-26136-gf458fc373b+6587f8fd12/python/lsst/ctrl/mpexec/singleQuantumExecutor.py", line 86, in execute
|
[2020-08-20T17:52:02.132381Z] self.runQuantum(task, quantum, taskDef, butler)
|
[2020-08-20T17:52:02.132384Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/ctrl_mpexec/tickets.DM-26136-gf458fc373b+6587f8fd12/python/lsst/ctrl/mpexec/singleQuantumExecutor.py", line 229, in runQuantum
|
[2020-08-20T17:52:02.132393Z] task.runQuantum(butlerQC, inputRefs, outputRefs)
|
[2020-08-20T17:52:02.132396Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/ip_isr/20.0.0-9-gf3ab18e+fe8ba18e56/python/lsst/ip/isr/isrTask.py", line 900, in runQuantum
|
[2020-08-20T17:52:02.132399Z] inputs = butlerQC.get(inputRefs)
|
[2020-08-20T17:52:02.132402Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/pipe_base/20.0.0-9-gabd0d4c+690d1dbe29/python/lsst/pipe/base/butlerQuantumContext.py", line 124, in get
|
[2020-08-20T17:52:02.132405Z] val = self._get(ref)
|
[2020-08-20T17:52:02.132407Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/pipe_base/20.0.0-9-gabd0d4c+690d1dbe29/python/lsst/pipe/base/butlerQuantumContext.py", line 78, in _get
|
[2020-08-20T17:52:02.132410Z] return butler.get(ref)
|
[2020-08-20T17:52:02.132413Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/daf_butler/19.0.0-141-gc1f0a489+fb98bf9d97/python/lsst/daf/butler/_butler.py", line 757, in get
|
[2020-08-20T17:52:02.132416Z] ref = self._findDatasetRef(datasetRefOrType, dataId, collections=collections, **kwds)
|
[2020-08-20T17:52:02.132420Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/daf_butler/19.0.0-141-gc1f0a489+fb98bf9d97/python/lsst/daf/butler/_butler.py", line 552, in _findDatasetRef
|
[2020-08-20T17:52:02.132423Z] ref = self.registry.findDataset(datasetType, dataId, collections=collections)
|
[2020-08-20T17:52:02.132426Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/daf_butler/19.0.0-141-gc1f0a489+fb98bf9d97/python/lsst/daf/butler/registry/_registry.py", line 569, in findDataset
|
[2020-08-20T17:52:02.132429Z] result = storage.find(collectionRecord, dataId)
|
[2020-08-20T17:52:02.132432Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/daf_butler/19.0.0-141-gc1f0a489+fb98bf9d97/python/lsst/daf/butler/registry/datasets/byDimensions/_storage.py", line 91, in find
|
[2020-08-20T17:52:02.132435Z] row = self._db.query(sql).fetchone()
|
[2020-08-20T17:52:02.132438Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/daf_butler/19.0.0-141-gc1f0a489+fb98bf9d97/python/lsst/daf/butler/registry/interfaces/_database.py", line 1301, in query
|
[2020-08-20T17:52:02.132441Z] return self._connection.execute(sql, *args, **kwds)
|
[2020-08-20T17:52:02.132443Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1014, in execute
|
[2020-08-20T17:52:02.132446Z] return meth(self, multiparams, params)
|
[2020-08-20T17:52:02.132450Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection
|
[2020-08-20T17:52:02.132453Z] return connection._execute_clauseelement(self, multiparams, params)
|
[2020-08-20T17:52:02.132456Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1133, in _execute_clauseelement
|
[2020-08-20T17:52:02.132459Z] distilled_params,
|
[2020-08-20T17:52:02.132469Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1318, in _execute_context
|
[2020-08-20T17:52:02.132472Z] e, statement, parameters, cursor, context
|
[2020-08-20T17:52:02.132475Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1512, in _handle_dbapi_exception
|
[2020-08-20T17:52:02.132478Z] sqlalchemy_exception, with_traceback=exc_info[2], from_=e
|
[2020-08-20T17:52:02.132481Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise_
|
[2020-08-20T17:52:02.132484Z] raise exception
|
[2020-08-20T17:52:02.132486Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1278, in _execute_context
|
[2020-08-20T17:52:02.132489Z] cursor, statement, parameters, context
|
[2020-08-20T17:52:02.132493Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
|
[2020-08-20T17:52:02.132508Z] cursor.execute(statement, parameters)
|
[2020-08-20T17:52:02.132511Z] sqlalchemy.exc.DatabaseError: (sqlite3.DatabaseError) database disk image is malformed
|
[2020-08-20T17:52:02.132514Z] [SQL: SELECT dataset.id AS id, dataset.run_id AS run_id
|
[2020-08-20T17:52:02.132517Z] FROM dataset JOIN dataset_collection_0068 ON dataset.id = dataset_collection_0068.dataset_id
|
[2020-08-20T17:52:02.132520Z] WHERE dataset.dataset_type_id = ? AND dataset.run_id = ? AND dataset_collection_0068.instrument = ? AND dataset_collection_0068.calibration_label = ? AND dataset_collection_0068.detector = ? AND dataset_collection_0068.collection_id = ?]
|
[2020-08-20T17:52:02.132524Z] [parameters: (5, 1, 'HSC', 'unbounded', 100, 1)]
|
[2020-08-20T17:52:02.132527Z] (Background on this error at: http://sqlalche.me/e/13/4xp6)
|
TLDR: sqlalchemy.exc.DatabaseError: (sqlite3.DatabaseError) database disk image is malformed
Interesting that it fails only on Linux but runs OK on OSX, I though this both should use fork activation, maybe BSD is better than Linux
I think I can do a quick fix for that - copy butler, just like it is done with Pool which pickles all stuf and sends it to subprocess.
That fix with re-instantiating butler object worked OK, Jenkins' happy now, going to make PR.
This should be ready for review, one last Jenkins pass is running after I added command line option. Not too much new code, and unit test gets improved.
Nate Pease [X], I updated click/pipetask2 and added new command line option there too, don't be surprised.
Looks good to me.
Another improvement that people want is an option for "keep running", if one or more quantum fail then pipetask should try to process remaining quanta that don't depend on failing ones.
Not really related to crashes but it is related to how we handle errors.