Sequence class
Bases: _Entity
, Submittable
, _Labeled
A subset of scenario tasks grouped to be executed together independently of the others.
A sequence is attached to a Scenario
. It represents a subset of its tasks that need to
be executed together, independently of the other tasks in the scenario. They must form a
connected subgraph of the scenario's task graph. A scenario can hold multiple sequences.
For instance, in a typical machine learning scenario, we may have several sequences: a sequence dedicated to preprocessing and preparing data, a sequence for computing a training model, and a sequence dedicated to scoring.
Example
Let's assume we have a scenario configuration modelling a manufacturer that is training an ML model, predicting sales forecasts, and finally, based on the forecasts, planning its production. Three task are configured and linked together through data nodes.
First, the sales sequence (boxed in green in the picture) contains training and predict tasks. Second, a production sequence (boxed in dark gray in the picture) contains the planning task.
This problem has been modeled in two sequences - one sequence for the forecasting part and one for the production planning part. As a consequence, the two algorithms can have two different life cycles. They can run independently, under different schedules. For example, one on a fixed schedule (e.g. every week) and one on demand, interactively triggered by end-users.
import taipy as tp
from taipy import Config
def training(history):
...
def predict(model, month):
...
def planning(forecast, capacity):
...
if __name__ == "__main__":
# Configure data nodes
sales_history_cfg = Config.configure_csv_data_node("sales_history")
trained_model_cfg = Config.configure_data_node("trained_model")
current_month_cfg = Config.configure_data_node("current_month")
forecasts_cfg = Config.configure_data_node("sales_predictions")
capacity_cfg = Config.configure_data_node("capacity")
production_orders_cfg = Config.configure_sql_data_node("production_orders")
# Configure tasks and scenarios
train_cfg = Config.configure_task("train", function=training,
input=sales_history_cfg, output=trained_model_cfg)
predict_cfg = Config.configure_task("predict", function=predict,
input=[trained_model_cfg, current_month_cfg],
output=forecasts_cfg)
plan_cfg = Config.configure_task("planning", function=planning,
input=[forecasts_cfg, capacity_cfg],
output=production_orders_cfg)
scenario_cfg = Config.configure_scenario("scenario", task_configs=[train_cfg, predict_cfg, plan_cfg])
# Create a new scenario and sequences
scenario = tp.create_scenario(scenario_cfg)
scenario.add_sequence("sales_sequence", [train_cfg, predict_cfg])
scenario.add_sequence("production_sequence", [plan_cfg])
# Get all sequences
all_sequences = tp.get_sequences()
# Submit one sequence only
tp.submit(scenario.sales_sequence)
Note that the sequences are not necessarily disjoint and may share some tasks.
Attributes¶
data_nodes
property
¶
data_nodes: Dict[str, DataNode]
The dictionary of data nodes used by the sequence.
owner_id
property
¶
owner_id: Optional[str]
The identifier of the owner (scenario_id, cycle_id) or None.
subscribers
property
writable
¶
subscribers: _ListAttributes
The list of callbacks to be called on Job
's status change.
version
property
¶
version: str
The application version of the sequence.
The string indicates the application version of the sequence. If not provided, the latest version is used.
Methods¶
data_nodes_being_edited() ¶
data_nodes_being_edited() -> Set[DataNode]
Return the set of data nodes that are being edited.
Returns:
Type | Description |
---|---|
Set[DataNode]
|
The set of data nodes that are being edited. |
get_inputs() ¶
get_inputs() -> Set[DataNode]
Return the set of input data nodes of this submittable.
Returns:
Type | Description |
---|---|
Set[DataNode]
|
The set of input data nodes. |
get_intermediate() ¶
get_intermediate() -> Set[DataNode]
Return the set of intermediate data nodes of the submittable entity.
Returns:
Type | Description |
---|---|
Set[DataNode]
|
The set of intermediate data nodes. |
get_label() ¶
get_label() -> str
Returns the sequence simple label prefixed by its owner label.
Returns:
Type | Description |
---|---|
str
|
The label of the sequence as a string. |
get_outputs() ¶
get_outputs() -> Set[DataNode]
Return the set of output data nodes of the submittable entity.
Returns:
Type | Description |
---|---|
Set[DataNode]
|
The set of output data nodes. |
get_parents() ¶
get_parents() -> Dict[str, Set[_Entity]]
Get parent scenarios of the sequence.
Returns:
Type | Description |
---|---|
Dict[str, Set[_Entity]]
|
The dictionary of all parent entities. They are grouped by their type (Scenario^, Sequences^, or tasks^) so each key corresponds to a level of the parents and the value is a set of the parent entities. An empty dictionary is returned if the entity does not have parents. |
get_simple_label() ¶
get_simple_label() -> str
Returns the sequence simple label.
Returns:
Type | Description |
---|---|
str
|
The simple label of the sequence as a string. |
is_ready_to_run() ¶
is_ready_to_run() -> ReasonCollection
Indicate if the entity is ready to be run.
Returns:
Type | Description |
---|---|
ReasonCollection
|
A ReasonCollection object that can function as a Boolean value, which is True if the given entity is ready to be run or there is no reason to be blocked, False otherwise. |
submit() ¶
submit(
callbacks: Optional[List[Callable]] = None,
force: bool = False,
wait: bool = False,
timeout: Optional[Union[float, int]] = None,
**properties
) -> Submission
Submit the sequence for execution.
All the Task
s of the sequence will be submitted for execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callbacks |
List[Callable]
|
The list of callable functions to be called on status change. |
None
|
force |
bool
|
Force execution even if the data nodes are in cache. |
False
|
wait |
bool
|
Wait for the orchestrated jobs created from the sequence submission to be finished in asynchronous mode. |
False
|
timeout |
Union[float, int]
|
The maximum number of seconds to wait for the jobs to be finished before
returning. |
None
|
**properties |
dict[str, any]
|
A keyworded variable length list of additional arguments. |
{}
|
Returns:
Type | Description |
---|---|
Submission
|
A |
subscribe() ¶
subscribe(
callback: Callable[[Sequence, Job], None],
params: Optional[List[Any]] = None,
) -> None
Subscribe a function to be called on Job
status change.
The subscription is applied to all jobs created from the sequence's execution.
Note
Notification will be available only for jobs created after this subscription.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback |
Callable[[Sequence, Job], None]
|
The callable function to be called on status change. |
required |
params |
Optional[List[Any]]
|
The parameters to be passed to the callback. |
None
|
unsubscribe() ¶
unsubscribe(
callback: Callable[[Sequence, Job], None],
params: Optional[List[Any]] = None,
) -> None
Unsubscribe a function that is called when the status of a Job
changes.
Note
The function will continue to be called for ongoing jobs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback |
Callable[[Sequence, Job], None]
|
The callable function to unsubscribe. |
required |
params |
Optional[List[Any]]
|
The parameters to be passed to the callback. |
None
|