ror.controlers.common package#

Submodules#

ror.controlers.common.base_controller module#

class ror.controlers.common.base_controller.BaseController(init_data: BaseSchema, init_stage: IInitStage)[source]#

Bases: object

Basic controller, which accepts an InitStage, and some inital data which is the input dataclass for the InitStage and performs the iterative process over all the linked stages.

Examples

>>> from pypipeline.schemas import BaseSchema
>>> from pypipeline.schemas.fields import field_perishable, field_persistance
>>> from pypipeline.stages import IForwardStage, IInitStage, ITerminalStage
>>> from pypipeline.controlers.common import BaseController

Let’s deine some very basic pipeline of three stages.

>>> @dataclass
>>> class InputTest(BaseSchema):
>>>     A: str = field_persistance()
>>>     B: str = field_perishable()
>>> @dataclass
>>> class OutputTest(BaseSchema):
>>>     A: str = field_perishable()
>>>     C: str = field_persistance()
>>> @dataclass
>>> class TerminalOutputTest(BaseSchema):
>>>     C: str = field_persistance()

Then using these dataclasses we can define the processing stages.

>>> class TerminalStageTest(ITerminalStage[OutputTest, TerminalOutputTest])
>>> ...
>>> class ForwardStageTest(IForwardStage[OutputTest, OutputTest, TerminalStageTest]):
>>> ...
>>> class InitStageTest(IInitStage[InputTest, OutputTest, ForwardStageTest]):
>>> ...

Then we can instantiate the contrsoller and discover the connections or perform a computation over the entire pipeline.

>>> data = {"A": "A", "B": "B"}
>>> dataclass = InputTest(**self._data)
>>> stage = InitStageTest
>>> controller = BaseController(dataclass, stage)
>>> controller.discover() # Prints out a table of the connected stages for debugging.
>>> output = controller.start() # Computes through the pipeline and return terminal data.
discover() None[source]#

Iteratively goes through the entire pipelineand adds rows to the discover table with the relationship links between the different stages, usefull for debugging a pipeline.

get_artifacts(run_id: str) dict[source]#

For some run_id try to access the artifacts which where produced during that specific run.

Parameters:

run_id (str) – Run id which to get the corresponding artifacts from.

Returns:

A dictionary where the keys are the stages, and the values are the artifacts produced for those stages.

Return type:

dict

start() Tuple[BaseSchema, str][source]#

Performs the iterative computation through the pipeline and returns a tuple of the output data and a run_id which can be used to access the artifact cache produced for the different runs.

Returns:

Tuple of the terminal stage output data and a run_id

Return type:

Tuple[BaseSchema, str]

Raises:

ReferenceError – Check that the get_output method of a stage indeed returns an instance of the next stage and not a class reference. If class reference the fail.

Module contents#