# Improve handling of crashes in pipetask

XMLWordPrintable

#### Details

• Type: Story
• Status: Done
• Resolution: Done
• Fix Version/s: None
• Component/s:
• Labels:
• Story Points:
4
• Sprint:
DB_F20_06
• Team:
Data Access and Database
• Urgent?:
No

#### Description

pipetask uses multiprocessing to run tasks concurrently and it appears that crash (like SEGV) in a child process is not handled in a reasonable way. Need to understand what happens when child dies unexpectedly and how to workaround that.

#### Activity

Hide
Andy Salnikov added a comment -

Another improvement that people want is an option for "keep running", if one or more quantum fail then pipetask should try to process remaining quanta that don't depend on failing ones.

Not really related to crashes but it is related to how we handle errors.

Show
Andy Salnikov added a comment - Another improvement that people want is an option for "keep running", if one or more quantum fail then pipetask should try to process remaining quanta that don't depend on failing ones. Not really related to crashes but it is related to how we handle errors.
Hide
Andy Salnikov added a comment -

Doing some testing and reading multiprocessing code it is clear that Pool cannot handle crashing processes. Pool protocol depends on worker process sending back some response and if process is gone then no response arrives and Pool fails to realize that job has failed. I don't think we can fix Pool, searching Python issue tracker I found bug reports from many years ago about this:

https://bugs.python.org/issue9205

https://bugs.python.org/issue22393

Latter has some "recent" activity last year so maybe in couple of years it will get fixed.

I think we have to switch to low-level Process and manage job queue ourselves in this case. We are not doing anything too complicated in pipetask (yet), should not be a big change.

Show
Andy Salnikov added a comment - Doing some testing and reading multiprocessing code it is clear that Pool cannot handle crashing processes. Pool protocol depends on worker process sending back some response and if process is gone then no response arrives and Pool fails to realize that job has failed. I don't think we can fix Pool, searching Python issue tracker I found bug reports from many years ago about this: https://bugs.python.org/issue9205 https://bugs.python.org/issue22393 Latter has some "recent" activity last year so maybe in couple of years it will get fixed. I think we have to switch to low-level Process and manage job queue ourselves in this case. We are not doing anything too complicated in pipetask (yet), should not be a big change.
Hide
Tim Jenness added a comment - - edited

Writing our own pool system seems like a big step that we should discuss on Thursday.

Given how rare segvs are I'm inclined to treat that case as a lower priority.

I think the priorities are:

• Run all possible quanta even if some are failing
• Trap SIGTERM during butler puts (which is a separate ticket).
Show
Tim Jenness added a comment - - edited Writing our own pool system seems like a big step that we should discuss on Thursday. Given how rare segvs are I'm inclined to treat that case as a lower priority. I think the priorities are: Run all possible quanta even if some are failing Trap SIGTERM during butler puts (which is a separate ticket).
Hide
Andy Salnikov added a comment -

Trying to workaround Pool behavior convinces me that Pool is hopeless and it has to go. Worst thing about it is that you cannot stop the process, which is essential for handling timed out jobs in our case. I have replaced the whole thing with a trivial implementation of process management using multiprocessing.Process for process execution. multiprocessing still has potential issues because it does many things behind the curtain which could mess things up (in particular it defines Process.terminate() method but says that it's behavior is undefined). I think the right thing to do is to just fork() but I'm more or less OK now with multiprocessing as we are not using any advanced stuff that could break. I added more unit tests to verify that new things work as expected, waiting for Jenkins to pass (or break).

Show
Andy Salnikov added a comment - Trying to workaround Pool behavior convinces me that Pool is hopeless and it has to go. Worst thing about it is that you cannot stop the process, which is essential for handling timed out jobs in our case. I have replaced the whole thing with a trivial implementation of process management using multiprocessing.Process for process execution. multiprocessing still has potential issues because it does many things behind the curtain which could mess things up (in particular it defines Process.terminate() method but says that it's behavior is undefined). I think the right thing to do is to just fork() but I'm more or less OK now with multiprocessing as we are not using any advanced stuff that could break. I added more unit tests to verify that new things work as expected, waiting for Jenkins to pass (or break).
Hide
Andy Salnikov added a comment -

And of course Jenkins fails. I think the reason is that we carry database file descriptor across fork(), the errors that I see look like:

 [2020-08-20T17:52:02.132263Z] Process task-2: [2020-08-20T17:52:02.132322Z] Traceback (most recent call last): [2020-08-20T17:52:02.132327Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1278, in _execute_context [2020-08-20T17:52:02.132332Z] cursor, statement, parameters, context [2020-08-20T17:52:02.132336Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute [2020-08-20T17:52:02.132339Z] cursor.execute(statement, parameters) [2020-08-20T17:52:02.132343Z] sqlite3.DatabaseError: database disk image is malformed [2020-08-20T17:52:02.132346Z]  [2020-08-20T17:52:02.132354Z] The above exception was the direct cause of the following exception: [2020-08-20T17:52:02.132357Z]  [2020-08-20T17:52:02.132360Z] Traceback (most recent call last): [2020-08-20T17:52:02.132363Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap [2020-08-20T17:52:02.132366Z] self.run() [2020-08-20T17:52:02.132370Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/multiprocessing/process.py", line 99, in run [2020-08-20T17:52:02.132374Z] self._target(*self._args, **self._kwargs) [2020-08-20T17:52:02.132377Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/ctrl_mpexec/tickets.DM-26136-gf458fc373b+6587f8fd12/python/lsst/ctrl/mpexec/singleQuantumExecutor.py", line 86, in execute [2020-08-20T17:52:02.132381Z] self.runQuantum(task, quantum, taskDef, butler) [2020-08-20T17:52:02.132384Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/ctrl_mpexec/tickets.DM-26136-gf458fc373b+6587f8fd12/python/lsst/ctrl/mpexec/singleQuantumExecutor.py", line 229, in runQuantum [2020-08-20T17:52:02.132393Z] task.runQuantum(butlerQC, inputRefs, outputRefs) [2020-08-20T17:52:02.132396Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/ip_isr/20.0.0-9-gf3ab18e+fe8ba18e56/python/lsst/ip/isr/isrTask.py", line 900, in runQuantum [2020-08-20T17:52:02.132399Z] inputs = butlerQC.get(inputRefs) [2020-08-20T17:52:02.132402Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/pipe_base/20.0.0-9-gabd0d4c+690d1dbe29/python/lsst/pipe/base/butlerQuantumContext.py", line 124, in get [2020-08-20T17:52:02.132405Z] val = self._get(ref) [2020-08-20T17:52:02.132407Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/pipe_base/20.0.0-9-gabd0d4c+690d1dbe29/python/lsst/pipe/base/butlerQuantumContext.py", line 78, in _get [2020-08-20T17:52:02.132410Z] return butler.get(ref) [2020-08-20T17:52:02.132413Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/daf_butler/19.0.0-141-gc1f0a489+fb98bf9d97/python/lsst/daf/butler/_butler.py", line 757, in get [2020-08-20T17:52:02.132416Z] ref = self._findDatasetRef(datasetRefOrType, dataId, collections=collections, **kwds) [2020-08-20T17:52:02.132420Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/daf_butler/19.0.0-141-gc1f0a489+fb98bf9d97/python/lsst/daf/butler/_butler.py", line 552, in _findDatasetRef [2020-08-20T17:52:02.132423Z] ref = self.registry.findDataset(datasetType, dataId, collections=collections) [2020-08-20T17:52:02.132426Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/daf_butler/19.0.0-141-gc1f0a489+fb98bf9d97/python/lsst/daf/butler/registry/_registry.py", line 569, in findDataset [2020-08-20T17:52:02.132429Z] result = storage.find(collectionRecord, dataId) [2020-08-20T17:52:02.132432Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/daf_butler/19.0.0-141-gc1f0a489+fb98bf9d97/python/lsst/daf/butler/registry/datasets/byDimensions/_storage.py", line 91, in find [2020-08-20T17:52:02.132435Z] row = self._db.query(sql).fetchone() [2020-08-20T17:52:02.132438Z] File "/j/ws/stack-os-matrix/adacff179f/lsstsw/stack/cb4e2dc/Linux64/daf_butler/19.0.0-141-gc1f0a489+fb98bf9d97/python/lsst/daf/butler/registry/interfaces/_database.py", line 1301, in query [2020-08-20T17:52:02.132441Z] return self._connection.execute(sql, *args, **kwds) [2020-08-20T17:52:02.132443Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1014, in execute [2020-08-20T17:52:02.132446Z] return meth(self, multiparams, params) [2020-08-20T17:52:02.132450Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 298, in _execute_on_connection [2020-08-20T17:52:02.132453Z] return connection._execute_clauseelement(self, multiparams, params) [2020-08-20T17:52:02.132456Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1133, in _execute_clauseelement [2020-08-20T17:52:02.132459Z] distilled_params, [2020-08-20T17:52:02.132469Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1318, in _execute_context [2020-08-20T17:52:02.132472Z] e, statement, parameters, cursor, context [2020-08-20T17:52:02.132475Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1512, in _handle_dbapi_exception [2020-08-20T17:52:02.132478Z] sqlalchemy_exception, with_traceback=exc_info[2], from_=e [2020-08-20T17:52:02.132481Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 178, in raise_ [2020-08-20T17:52:02.132484Z] raise exception [2020-08-20T17:52:02.132486Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1278, in _execute_context [2020-08-20T17:52:02.132489Z] cursor, statement, parameters, context [2020-08-20T17:52:02.132493Z] File "/j/ws/stack-os-matrix/adacff179f/home/.conda/envs/lsst-scipipe-cb4e2dc/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute [2020-08-20T17:52:02.132508Z] cursor.execute(statement, parameters) [2020-08-20T17:52:02.132511Z] sqlalchemy.exc.DatabaseError: (sqlite3.DatabaseError) database disk image is malformed [2020-08-20T17:52:02.132514Z] [SQL: SELECT dataset.id AS id, dataset.run_id AS run_id  [2020-08-20T17:52:02.132517Z] FROM dataset JOIN dataset_collection_0068 ON dataset.id = dataset_collection_0068.dataset_id  [2020-08-20T17:52:02.132520Z] WHERE dataset.dataset_type_id = ? AND dataset.run_id = ? AND dataset_collection_0068.instrument = ? AND dataset_collection_0068.calibration_label = ? AND dataset_collection_0068.detector = ? AND dataset_collection_0068.collection_id = ?] [2020-08-20T17:52:02.132524Z] [parameters: (5, 1, 'HSC', 'unbounded', 100, 1)] [2020-08-20T17:52:02.132527Z] (Background on this error at: http://sqlalche.me/e/13/4xp6)

TLDR: sqlalchemy.exc.DatabaseError: (sqlite3.DatabaseError) database disk image is malformed

Interesting that it fails only on Linux but runs OK on OSX, I though this both should use fork activation, maybe BSD is better than Linux

I think I can do a quick fix for that - copy butler, just like it is done with Pool which pickles all stuf and sends it to subprocess.

Show
Hide
Andy Salnikov added a comment -

That fix with re-instantiating butler object worked OK, Jenkins' happy now, going to make PR.

Show
Andy Salnikov added a comment - That fix with re-instantiating butler object worked OK, Jenkins' happy now, going to make PR.
Hide
Andy Salnikov added a comment -

This should be ready for review, one last Jenkins pass is running after I added command line option. Not too much new code, and unit test gets improved.

Nate Pease [X], I updated click/pipetask2 and added new command line option there too, don't be surprised.

Show
Andy Salnikov added a comment - This should be ready for review, one last Jenkins pass is running after I added command line option. Not too much new code, and unit test gets improved. Nate Pease [X] , I updated click/pipetask2 and added new command line option there too, don't be surprised.
Hide
Tim Jenness added a comment -

Looks good to me.

Show
Tim Jenness added a comment - Looks good to me.

#### People

Assignee:
Andy Salnikov
Reporter:
Andy Salnikov
Reviewers:
Tim Jenness
Watchers:
Andy Salnikov, Tim Jenness