ror.controlers.common package#
Submodules#
ror.controlers.common.base_controller module#
- class ror.controlers.common.base_controller.BaseController(init_data: BaseSchema, init_stage: IInitStage)[source]#
Bases:
object
Basic controller, which accepts an InitStage, and some inital data which is the input dataclass for the InitStage and performs the iterative process over all the linked stages.
Examples
>>> from pypipeline.schemas import BaseSchema >>> from pypipeline.schemas.fields import field_perishable, field_persistance >>> from pypipeline.stages import IForwardStage, IInitStage, ITerminalStage >>> from pypipeline.controlers.common import BaseController
Let’s deine some very basic pipeline of three stages.
>>> @dataclass >>> class InputTest(BaseSchema): >>> A: str = field_persistance() >>> B: str = field_perishable()
>>> @dataclass >>> class OutputTest(BaseSchema): >>> A: str = field_perishable() >>> C: str = field_persistance()
>>> @dataclass >>> class TerminalOutputTest(BaseSchema): >>> C: str = field_persistance()
Then using these dataclasses we can define the processing stages.
>>> class TerminalStageTest(ITerminalStage[OutputTest, TerminalOutputTest]) >>> ...
>>> class ForwardStageTest(IForwardStage[OutputTest, OutputTest, TerminalStageTest]): >>> ...
>>> class InitStageTest(IInitStage[InputTest, OutputTest, ForwardStageTest]): >>> ...
Then we can instantiate the contrsoller and discover the connections or perform a computation over the entire pipeline.
>>> data = {"A": "A", "B": "B"} >>> dataclass = InputTest(**self._data) >>> stage = InitStageTest >>> controller = BaseController(dataclass, stage)
>>> controller.discover() # Prints out a table of the connected stages for debugging. >>> output = controller.start() # Computes through the pipeline and return terminal data.
- discover() None [source]#
Iteratively goes through the entire pipelineand adds rows to the discover table with the relationship links between the different stages, usefull for debugging a pipeline.
- get_artifacts(run_id: str) dict [source]#
For some run_id try to access the artifacts which where produced during that specific run.
- Parameters:
run_id (str) – Run id which to get the corresponding artifacts from.
- Returns:
A dictionary where the keys are the stages, and the values are the artifacts produced for those stages.
- Return type:
dict
- start() Tuple[BaseSchema, str] [source]#
Performs the iterative computation through the pipeline and returns a tuple of the output data and a run_id which can be used to access the artifact cache produced for the different runs.
- Returns:
Tuple of the terminal stage output data and a run_id
- Return type:
Tuple[BaseSchema, str]
- Raises:
ReferenceError – Check that the get_output method of a stage indeed returns an instance of the next stage and not a class reference. If class reference the fail.