Executor for single Quantum was added in 8388452. On a level above that would be an executor for the whole QuantumGraph, to make that reusable it has to support different execution models, e.g.
- immediate in-process execution of all tasks (no multiprocessing)
- immediate parallel execution of the tasks on the same host but in subprocesses (what current multiprocess mode does)
- (preparation for) delayed execution on a batch system, e.g. HTCondor
This GraphExecutor needs to be parameterized via client-provided class which supports corresponding execution model, the responsibility of that class would be scheduling of execution of individual Quanta. For correct scheduling GraphExecutor has to provide enough info to that class which in addition to Quantum itself with its inputs/outputs should also have dependencies between Quanta. To specify pre-requisites for a given Quantum we could probably provide a list of other Quanta but that is not very useful as Quantum is not hashable. It may be better to just enumerate all Quanta in a graph and pass Quanta indices of pre-requisites instead.
I think my mental model for the whole thing is:
- QuantumGraphExecutor takes QGraph and QuantumExecutor as input (plus bunch of other needed stuff - butler, collection names, etc.) QuantumExecutor is the thing that implements specific execution model.
- QuantumGraphExecutor optionally updates registry doing items 6.1 and 6.2 from the list above. This can be optional in a delayed model, not clear to me yet.
- QuantumGraphExecutor then enumerates all Quanta in a graph and iteratively calls QuantumExecutor for each Quantum giving the Quantum, index of this Quantum, indices of its pre-requisites and all other needed info (butler, etc.)
- QuantumExecutor may need to know when it receives last Quantum in a graph, so QuantumGraphExecutor should call some special method for that.
Given that QuantumExecutor interface may look like:
class QuantumExecutor:
|
def execute(self, quantum, taskDef, quantumId, preRequisites):
|
"""
|
Parameters:
|
-----------
|
quantum : `lsst.daf.butler.Quantum`
|
taskDef : `lsst.pipe.base.TaskDef`
|
quantumId : `int`
|
preRequisites : `list` of `int`
|
"""
|
|
def finish(self):
|
"""Method called after last execute()
|
"""
|
I'm not very happy with this interface though, there is probably a lot of state that needs to be carried between calls to execute() so it will be very messy logic. It may be better to reverse the logic a bit and pass an iterator into execute() which will return all needed stuff. Maybe this will work better:
# helper class with the same attributes as passed to execute() above
|
QuantumData = namedtuple("QuantumData", "quantum, taskDef, quantumId, preRequisites")
|
|
class QuantumExecutor:
|
def execute(self, iterable, butler, ...):
|
"""
|
Parameters:
|
-----------
|
iterable : iterable of `QuantumData`
|
It is guaranteed that Quantum will be seen before it can appear
|
on any pre-requisite list (first item will have empty pre-requisites).
|
"""
|
I think I like this approach better. QuantumGraphExecutor will just need to instantiate an iterator over Quanta (this has still to be implemented) and pass it down to QuantumExecutor.
Another option for this design is to make that single execute() method to be a method of QuantumGraphExecutor itself, classes implementing different execution models would just inherit it and customize it as much as needed. pipetask would probably just use single model (mpexec), though potentially we can think of adding different ones if that makes sense.
There is basically one thing left in this package that can be refactored - CmdLineFwk class. Its current responsibilities are: