# Re-factoring of ctrl_mpexec

XMLWordPrintable

## Details

• Type: Story
• Status: Done
• Resolution: Done
• Fix Version/s: None
• Component/s:
• Labels:
None
• Story Points:
4
• Sprint:
BG3_F18_11, BG3_S19_01
• Team:
Data Access and Database

## Description

This is the second of of implementation of RFC-554. pipe_supertask has been split into two packages now, things in pipe_base should be mostly OK, but modules in ctrl_mpexec need serious re-factoring to make them more reusable for other potential clients.

## Activity

Hide
Andy Salnikov added a comment -

There is basically one thing left in this package that can be refactored - CmdLineFwk class. Its current responsibilities are:

1. parse command line (using cmdLineParser module)
2. for "list" subcommand - dump a list of all tasks
3. make a pipeline using PipelineBuilder converting sequence of command line actions to the calls of PipelineBuilder instance
• optionally save pipeline in pickle file and/or make DOT file for it
4. either load QGraph from a file or build it from pipeline using GraphBuilder
• optionally save QGraph and make DOT for it
5. make Butler instance
6. run the whole QGraph
1. updates output collection with datasets from input collections
2. writes "InitOutputDataetTypes" to butler for all tasks
3. iterates over Quanta and executes each in turn (no sorting is done, expectations is that QGraph is ordered)
1. this can be done in a subprocess, all needed objects is copied from main process (pickle)
2. optionally updates input Quantum ref IDs from butler (for intermediate Quanta)
4. after return saves Quantum to butler (for provenance)
Show
Andy Salnikov added a comment - There is basically one thing left in this package that can be refactored - CmdLineFwk class. Its current responsibilities are: parse command line (using cmdLineParser module) for "list" subcommand - dump a list of all tasks make a pipeline using PipelineBuilder converting sequence of command line actions to the calls of PipelineBuilder instance optionally save pipeline in pickle file and/or make DOT file for it either load QGraph from a file or build it from pipeline using GraphBuilder optionally save QGraph and make DOT for it make Butler instance run the whole QGraph updates output collection with datasets from input collections writes "InitOutputDataetTypes" to butler for all tasks iterates over Quanta and executes each in turn (no sorting is done, expectations is that QGraph is ordered) this can be done in a subprocess, all needed objects is copied from main process (pickle)  optionally updates input Quantum ref IDs from butler (for intermediate Quanta) calls task.runQuantum() after return saves Quantum to butler (for provenance)
Hide
Andy Salnikov added a comment - - edited

One obvious candidate for factoring out of CmdLineFwk  is the piece of code that executes single Quantum, it should be universally reusable (possibly after some customization) by any other future activators. This code is all in a single method now (_executePipelineTask()) and it takes these items as input:

• configuration object for the task (with all overrides applied)
• quantum - single Quantum instance
• butler

(all parameters are passed as a single tuple but this is only because of multiprocessing)

Currently this method returns whatever is returned from a task.runQuantum() method which is a Struct instance but this return value probably of no use to anyone. (edited: task.runQuantum() is not returning anything)

Possible new interface for this functionality may be a class whose constructor takes butler and task factory and a single method that takes (taskClass, config, quantum). The method should probably call few customization points to do those three items from above list - update Quantum, call task.runQuantum, and store Quantum in a butler. Something like:

 class SingleQuantumExecutor:  def __init__(self, butler, taskFactory):  ...    def execute(self, taskClass, config, quantum):  self.setupLogging(taskClass, config, quantum)   self.updateQuantumInputs(quantum)  task = self.makeTask(taskClass, config)  result = self.runQuantum(task, quantum)  self.saveQuantum(quantum)  return result    # all 5 customization methods here with default implementation

Show
Hide
Andy Salnikov added a comment -

Executor for single Quantum was added in 8388452. On a level above that would be an executor for the whole QuantumGraph, to make that reusable it has to support different execution models, e.g.

• immediate in-process execution of all tasks (no multiprocessing)
• immediate parallel execution of the tasks on the same host but in subprocesses (what current multiprocess mode does)
• (preparation for) delayed execution on a batch system, e.g. HTCondor

This GraphExecutor needs to be parameterized via client-provided class which supports corresponding execution model, the responsibility of that class would be scheduling of execution of individual Quanta. For correct scheduling GraphExecutor has to provide enough info to that class which in addition to Quantum itself with its inputs/outputs should also have dependencies between Quanta. To specify pre-requisites for a given Quantum we could probably provide a list of other Quanta but that is not very useful as Quantum is not hashable. It may be better to just enumerate all Quanta in a graph and pass Quanta indices of pre-requisites instead.

I think my mental model for the whole thing is:

• QuantumGraphExecutor takes QGraph and QuantumExecutor as input (plus bunch of other needed stuff - butler, collection names, etc.) QuantumExecutor is the thing that implements specific execution model.
• QuantumGraphExecutor optionally updates registry doing items 6.1 and 6.2 from the list above. This can be optional in a delayed model, not clear to me yet.
• QuantumGraphExecutor then enumerates all Quanta in a graph and iteratively calls QuantumExecutor for each Quantum giving the Quantum, index of this Quantum, indices of its pre-requisites and all other needed info (butler, etc.)
• QuantumExecutor may need to know when it receives last Quantum in a graph, so QuantumGraphExecutor should call some special method for that.

Given that QuantumExecutor interface may look like:

 class QuantumExecutor:  def execute(self, quantum, taskDef, quantumId, preRequisites):  """  Parameters:  -----------  quantum : lsst.daf.butler.Quantum  taskDef : lsst.pipe.base.TaskDef  quantumId : int  preRequisites : list of int  """    def finish(self):  """Method called after last execute()  """ 

I'm not very happy with this interface though, there is probably a lot of state that needs to be carried between calls to execute() so it will be very messy logic. It may be better to reverse the logic a bit and pass an iterator into execute() which will return all needed stuff. Maybe this will work better:

 # helper class with the same attributes as passed to execute() above QuantumData = namedtuple("QuantumData", "quantum, taskDef, quantumId, preRequisites")   class QuantumExecutor:  def execute(self, iterable, butler, ...):  """  Parameters:  -----------  iterable : iterable of QuantumData  It is guaranteed that Quantum will be seen before it can appear  on any pre-requisite list (first item will have empty pre-requisites).  """ 

I think I like this approach better. QuantumGraphExecutor will just need to instantiate an iterator over Quanta (this has still to be implemented) and pass it down to QuantumExecutor.

Another option for this design is to make that single execute() method to be a method of QuantumGraphExecutor itself, classes implementing different execution models would just inherit it and customize it as much as needed. pipetask would probably just use single model (mpexec), though potentially we can think of adding different ones if that makes sense.

Show
Andy Salnikov added a comment - Executor for single Quantum was added in 8388452 . On a level above that would be an executor for the whole QuantumGraph, to make that reusable it has to support different execution models, e.g. immediate in-process execution of all tasks (no multiprocessing) immediate parallel execution of the tasks on the same host but in subprocesses (what current multiprocess mode does) (preparation for) delayed execution on a batch system, e.g. HTCondor This GraphExecutor needs to be parameterized via client-provided class which supports corresponding execution model, the responsibility of that class would be scheduling of execution of individual Quanta. For correct scheduling GraphExecutor has to provide enough info to that class which in addition to Quantum itself with its inputs/outputs should also have dependencies between Quanta. To specify pre-requisites for a given Quantum we could probably provide a list of other Quanta but that is not very useful as Quantum is not hashable. It may be better to just enumerate all Quanta in a graph and pass Quanta indices of pre-requisites instead. I think my mental model for the whole thing is: QuantumGraphExecutor takes QGraph and QuantumExecutor as input (plus bunch of other needed stuff - butler, collection names, etc.) QuantumExecutor is the thing that implements specific execution model. QuantumGraphExecutor optionally updates registry doing items 6.1 and 6.2 from the list above. This can be optional in a delayed model, not clear to me yet. QuantumGraphExecutor then enumerates all Quanta in a graph and iteratively calls QuantumExecutor for each Quantum giving the Quantum, index of this Quantum, indices of its pre-requisites and all other needed info (butler, etc.) QuantumExecutor may need to know when it receives last Quantum in a graph, so QuantumGraphExecutor should call some special method for that. Given that QuantumExecutor interface may look like: class QuantumExecutor: def execute( self , quantum, taskDef, quantumId, preRequisites): """ Parameters: ----------- quantum : lsst.daf.butler.Quantum taskDef : lsst.pipe.base.TaskDef quantumId : int preRequisites : list of int """   def finish( self ): """Method called after last execute() """ I'm not very happy with this interface though, there is probably a lot of state that needs to be carried between calls to execute() so it will be very messy logic. It may be better to reverse the logic a bit and pass an iterator into execute() which will return all needed stuff. Maybe this will work better: # helper class with the same attributes as passed to execute() above QuantumData = namedtuple( "QuantumData" , "quantum, taskDef, quantumId, preRequisites" ) class QuantumExecutor: def execute( self , iterable, butler, ...): """ Parameters: - - - - - - - - - - - iterable : iterable of QuantumData It is guaranteed that Quantum will be seen before it can appear on any pre - requisite list (first item will have empty pre - requisites). """ I think I like this approach better. QuantumGraphExecutor will just need to instantiate an iterator over Quanta (this has still to be implemented) and pass it down to QuantumExecutor. Another option for this design is to make that single execute() method to be a method of QuantumGraphExecutor itself, classes implementing different execution models would just inherit it and customize it as much as needed. pipetask would probably just use single model (mpexec), though potentially we can think of adding different ones if that makes sense.
Hide
Andy Salnikov added a comment - - edited

I think I'm more or less happy with the current round of refactoring - ctrl_mpexec should be now more reusable for other types of activators. Any further potential improvements would need input from people implementing new activators. Jim Bosch, I hope it won't take much of your time or let me know if you are too busy for review. This is not urgent and after this I think we can close RFC (after adding ctrl_mpexec to lsst_distrib).

Show
Andy Salnikov added a comment - - edited I think I'm more or less happy with the current round of refactoring - ctrl_mpexec should be now more reusable for other types of activators. Any further potential improvements would need input from people implementing new activators.  Jim Bosch , I hope it won't take much of your time or let me know if you are too busy for review. This is not urgent and after this I think we can close RFC (after adding ctrl_mpexec to lsst_distrib).
Hide
Andy Salnikov added a comment -

Jim Bosch, do you want to look more at my changes or is it OK to start merging them?

Show
Andy Salnikov added a comment - Jim Bosch , do you want to look more at my changes or is it OK to start merging them?
Hide
Jim Bosch added a comment -

Feel free to start merging.

Show
Jim Bosch added a comment - Feel free to start merging.
Hide
Andy Salnikov added a comment -

Thanks for review! Merged both packages.

Show
Andy Salnikov added a comment - Thanks for review! Merged both packages.

## People

• Assignee:
Andy Salnikov
Reporter:
Andy Salnikov
Reviewers:
Jim Bosch
Watchers:
Andy Salnikov, Christopher Waters, Jim Bosch, Krzysztof Findeisen, Vaikunth Thukral