Skip to content

Sequence class

Bases: _Entity, Submittable, _Labeled

A subset of scenario tasks grouped to be executed together independently of the others.

A sequence is attached to a Scenario. It represents a subset of its tasks that need to be executed together, independently of the other tasks in the scenario. They must form a connected subgraph of the scenario's task graph. A scenario can hold multiple sequences.

For instance, in a typical machine learning scenario, we may have several sequences: a sequence dedicated to preprocessing and preparing data, a sequence for computing a training model, and a sequence dedicated to scoring.

Example

Let's assume we have a scenario configuration modelling a manufacturer that is training an ML model, predicting sales forecasts, and finally, based on the forecasts, planning its production. Three task are configured and linked together through data nodes.

sequences

First, the sales sequence (boxed in green in the picture) contains training and predict tasks. Second, a production sequence (boxed in dark gray in the picture) contains the planning task.

This problem has been modeled in two sequences - one sequence for the forecasting part and one for the production planning part. As a consequence, the two algorithms can have two different life cycles. They can run independently, under different schedules. For example, one on a fixed schedule (e.g. every week) and one on demand, interactively triggered by end-users.

import taipy as tp
from taipy import Config

def training(history):
    ...

def predict(model, month):
    ...

def planning(forecast, capacity):
    ...

if __name__ == "__main__":
    # Configure data nodes
    sales_history_cfg = Config.configure_csv_data_node("sales_history")
    trained_model_cfg = Config.configure_data_node("trained_model")
    current_month_cfg = Config.configure_data_node("current_month")
    forecasts_cfg = Config.configure_data_node("sales_predictions")
    capacity_cfg = Config.configure_data_node("capacity")
    production_orders_cfg = Config.configure_sql_data_node("production_orders")

    # Configure tasks and scenarios
    train_cfg = Config.configure_task("train", function=training,
                                      input=sales_history_cfg, output=trained_model_cfg)
    predict_cfg = Config.configure_task("predict", function=predict,
                                        input=[trained_model_cfg, current_month_cfg],
                                        output=forecasts_cfg)
    plan_cfg = Config.configure_task("planning", function=planning,
                                    input=[forecasts_cfg, capacity_cfg],
                                    output=production_orders_cfg)
    scenario_cfg = Config.configure_scenario("scenario", task_configs=[train_cfg, predict_cfg, plan_cfg])

    # Create a new scenario and sequences
    scenario = tp.create_scenario(scenario_cfg)
    scenario.add_sequence("sales_sequence", [train_cfg, predict_cfg])
    scenario.add_sequence("production_sequence", [plan_cfg])

    # Get all sequences
    all_sequences = tp.get_sequences()

    # Submit one sequence only
    tp.submit(scenario.sales_sequence)

Note that the sequences are not necessarily disjoint and may share some tasks.

Attributes

data_nodes property

data_nodes: Dict[str, DataNode]

The dictionary of data nodes used by the sequence.

id instance-attribute

id: SequenceId = sequence_id

The unique identifier of the sequence.

owner_id property

owner_id: Optional[str]

The identifier of the owner (scenario_id, cycle_id) or None.

parent_ids property

parent_ids: Set[str]

The set of identifiers of the parent scenarios.

properties property

properties: _Properties

The dictionary of additional properties.

subscribers property writable

subscribers: _ListAttributes

The list of callbacks to be called on Job's status change.

tasks property writable

tasks: Dict[str, Task]

The dictionary of tasks used by the sequence.

version property

version: str

The application version of the sequence.

The string indicates the application version of the sequence. If not provided, the latest version is used.

Methods

data_nodes_being_edited()

data_nodes_being_edited() -> Set[DataNode]

Return the set of data nodes that are being edited.

Returns:

Type Description
Set[DataNode]

The set of data nodes that are being edited.

get_inputs()

get_inputs() -> Set[DataNode]

Return the set of input data nodes of this submittable.

Returns:

Type Description
Set[DataNode]

The set of input data nodes.

get_intermediate()

get_intermediate() -> Set[DataNode]

Return the set of intermediate data nodes of the submittable entity.

Returns:

Type Description
Set[DataNode]

The set of intermediate data nodes.

get_label()

get_label() -> str

Returns the sequence simple label prefixed by its owner label.

Returns:

Type Description
str

The label of the sequence as a string.

get_outputs()

get_outputs() -> Set[DataNode]

Return the set of output data nodes of the submittable entity.

Returns:

Type Description
Set[DataNode]

The set of output data nodes.

get_parents()

get_parents() -> Dict[str, Set[_Entity]]

Get parent scenarios of the sequence.

Returns:

Type Description
Dict[str, Set[_Entity]]

The dictionary of all parent entities. They are grouped by their type (Scenario^, Sequences^, or tasks^) so each key corresponds to a level of the parents and the value is a set of the parent entities. An empty dictionary is returned if the entity does not have parents.

get_simple_label()

get_simple_label() -> str

Returns the sequence simple label.

Returns:

Type Description
str

The simple label of the sequence as a string.

is_ready_to_run()

is_ready_to_run() -> ReasonCollection

Indicate if the entity is ready to be run.

Returns:

Type Description
ReasonCollection

A ReasonCollection object that can function as a Boolean value, which is True if the given entity is ready to be run or there is no reason to be blocked, False otherwise.

submit()

submit(
    callbacks: Optional[List[Callable]] = None,
    force: bool = False,
    wait: bool = False,
    timeout: Optional[Union[float, int]] = None,
    **properties
) -> Submission

Submit the sequence for execution.

All the Tasks of the sequence will be submitted for execution.

Parameters:

Name Type Description Default
callbacks List[Callable]

The list of callable functions to be called on status change.

None
force bool

Force execution even if the data nodes are in cache.

False
wait bool

Wait for the orchestrated jobs created from the sequence submission to be finished in asynchronous mode.

False
timeout Union[float, int]

The maximum number of seconds to wait for the jobs to be finished before returning.
If not provided and wait is True, the function waits indefinitely.

None
**properties dict[str, any]

A keyworded variable length list of additional arguments.

{}

Returns:

Type Description
Submission

A Submission containing the information of the submission.

subscribe()

subscribe(
    callback: Callable[[Sequence, Job], None],
    params: Optional[List[Any]] = None,
) -> None

Subscribe a function to be called on Job status change. The subscription is applied to all jobs created from the sequence's execution.

Note

Notification will be available only for jobs created after this subscription.

Parameters:

Name Type Description Default
callback Callable[[Sequence, Job], None]

The callable function to be called on status change.

required
params Optional[List[Any]]

The parameters to be passed to the callback.

None

unsubscribe()

unsubscribe(
    callback: Callable[[Sequence, Job], None],
    params: Optional[List[Any]] = None,
) -> None

Unsubscribe a function that is called when the status of a Job changes.

Note

The function will continue to be called for ongoing jobs.

Parameters:

Name Type Description Default
callback Callable[[Sequence, Job], None]

The callable function to unsubscribe.

required
params Optional[List[Any]]

The parameters to be passed to the callback.

None