Skip to content

Pipeline management

The Entities' creation section provides documentation on Pipeline creation. Now that we know how to create a new Pipeline, this section focuses on describing the pipeline's attributes and utility methods.

In this section, it is assumed that my_config.py module contains a Taipy configuration already implemented.

Pipeline attributes

The pipeline creation method returns a Pipeline entity. It is identified by a unique identifier id that Taipy generates. A pipeline also holds various properties accessible as an attribute of the pipeline:

  • config_id: The id of the pipeline configuration.
  • subscribers: The list of Tuples (callback, params) representing the subscribers.
  • properties: The complete dictionary of the pipeline properties. It includes a copy of the properties of the pipeline configuration, in addition to the properties provided at the creation and at runtime.
  • tasks: The dictionary holding the various tasks of the pipeline. The key corresponds to the config_id of the task while the value is the task itself.
  • owner_id: The identifier of the owner, which can be a pipeline, scenario, cycle, or None.
  • version: The string indicates the application version of the pipeline to instantiate. If not provided, the current version is used. Refer to the version management page for more details.
  • Each property of the properties dictionary is also directly exposed as an attribute.
  • Each nested entity is also exposed as an attribute of the pipeline. the attribute name corresponds to the config_id of the nested entity.

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
import taipy as tp
from datetime import datetime
import my_config

pipeline = tp.create_pipeline(my_config.sales_pipeline_cfg, name="Pipeline for sales prediction")

# The config_id is an attribute of the pipeline. Here it is "pipeline_configuration"
pipeline.config_id
# There was no subscription, so subscribers is an empty list
pipeline.subscribers # []
# The properties dictionary equals {"name": "Pipeline for sales prediction"}. It
# contains all the properties, including the `name` provided at the creation
pipeline.properties # {"name": "Pipeline for sales prediction"}
# The `name` property is also exposed directly as an attribute. It
# equals "Pipeline for sales prediction"
pipeline.name
# The training task entity is exposed as an attribute of the pipeline
training_task = pipeline.training
# The predicting task entity as well
predicting_task = pipeline.predicting
# The data nodes are also exposed as attributes of the pipeline.
current_month_data_node = pipeline.current_month

Get pipeline by id

The method to access a pipeline is from its id by using the get() method :

1
2
3
4
5
6
import taipy as tp
import my_config

pipeline = tp.create_pipeline(my_config.sales_pipeline_cfg)
pipeline_retrieved = tp.get(pipeline.id)
pipeline == pipeline_retrieved

Here the two variables pipeline and pipeline_retrieved are equal.

Get pipeline by config id

A pipeline can also be retrieved from a scenario by accessing the pipeline's config_id of the scenario.

1
2
3
4
5
6
7
8
import taipy as tp
import my_config

scenario = tp.create_scenario(my_config.monthly_scenario_cfg)

# Get the pipelines by config id
sales_pipeline = scenario.sales
production_pipeline = scenario.production

Get all pipelines

All the pipelines can be retrieved using the method get_pipelines(). This method returns the list of all existing pipelines.

Delete a pipeline

A pipeline can be deleted by using delete() which takes the pipeline id as a parameter. The deletion is also propagated to the nested tasks, data nodes, and jobs if they are not shared with any other pipeline.

Get parent scenarios

To get the parent entities of a pipeline (i.e., scenarios) you can use either the method DataNode.get_parents() or the function get_parents(). Both return the parents of the pipeline.

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
import taipy as tp
import my_config

# Create a scenario from a config
scenario = tp.create_scenario(my_config.monthly_scenario_cfg)

# Retrieve a pipeline
pipeline = scenario.sales_pipeline_cfg

# Retrieve the parent entities of the pipeline
parent_entities = pipeline.get_parents()  # {'scenarios': [Scenario 1]}

# Retrieve the parent entities of the pipeline
tp.get_parents(pipeline)  # {'scenarios': [Scenario 1]}

The next sections show the task management.