Skip to content

Job execution

Submit a scenario, pipeline, or task

In a Taipy application, running the Core service is required to execute jobs. To see how you can run different Taipy services, please refer to the Running Taipy services page.

Preventing configuration update when the Taipy Core service is running

After running the Core service, all configuration will be blocked from update to prepare for job execution.

In this section, it is assumed that my_config.py module contains a Taipy configuration already implemented.

To execute a scenario, you need to call the submit() method. It returns a list of created Jobs:

1
2
3
4
5
6
7
8
import taipy as tp
import my_config

tp.Core().run()

scenario = tp.create_scenario(my_config.monthly_scenario_cfg)

jobs = tp.submit(scenario)

In line 6, we create a new scenario from a scenario configuration and submit it for execution (line 8). The submit() method triggers the submission of all the scenario's pipelines. Then each task of each pipeline will be submitted.

The Core service can also be started after submit() method. Note that jobs can only be executed after the Taipy Core service is started.

1
2
3
4
5
6
7
8
import taipy as tp
import my_config

scenario = tp.create_scenario(my_config.monthly_scenario_cfg)

tp.submit(scenario)

tp.Core().run()

Another syntax.

To submit a scenario, you can also use the method Scenario.submit():

1
2
3
4
5
6
7
8
import taipy as tp
import my_config

tp.Core().run()

scenario = tp.create_scenario(my_config.monthly_scenario_cfg)

scenario.submit()

By default, Taipy will asynchronously execute the jobs. If you want to wait until the submitted jobs are finished, you can use the parameter wait and timeout:

1
2
3
4
5
6
7
8
9
import taipy as tp
import my_config

tp.Core().run()

scenario = tp.create_scenario(my_config.monthly_scenario_cfg)
task = scenario.predicting

tp.submit(task, wait=True, timeout=3)

timeout represents a time span in seconds. It can be an integer or a float. By default, wait is False and timeout is None. If wait is True and timeout is not specified or None, there is no limit to the wait time. If wait is True and timeout is specified, taipy waits until all the submitted jobs are finished, or the timeout expires (which ever occurred first).

It is also possible to submit a single pipeline using the same submit() method. . It returns a list of created Jobs:

1
2
3
4
5
6
7
8
9
import taipy as tp
import my_config

tp.Core().run()

scenario = tp.create_scenario(my_config.monthly_scenario_cfg)
pipeline = scenario.sales_pipeline

jobs = tp.submit(pipeline)

In line 5, we retrieve the pipeline named sales_pipeline from the created scenario. In line 7, we submit this pipeline for execution. The submit() method triggers the submission of all the pipeline's tasks. When submitting a pipeline, you can also use the two parameters wait and timeout.

Another syntax.

To submit a pipeline, you can also use the method Pipeline.submit():

1
2
3
4
5
6
7
8
import taipy as tp
import my_config

tp.Core().run()

scenario = tp.create_scenario(my_config.monthly_scenario_cfg)
pipeline = scenario.sales_pipeline
pipeline.submit()

You can also submit a single task with the same submit() method. It returns the created Job:

1
2
3
4
5
6
7
8
9
import taipy as tp
import my_config

tp.Core().run()

scenario = tp.create_scenario(my_config.monthly_scenario_cfg)
task = scenario.predicting

job = tp.submit(task)

In line 5, we retrieve the task named predicting from the created scenario. In line 7, we submit this task for execution. When submitting a task, you can also use the two parameters wait and timeout.

Another syntax.

To submit a task, you can also use the method Task.submit():

1
2
3
4
5
6
7
8
import taipy as tp
import my_config

tp.Core().run()

scenario = tp.create_scenario(my_config.monthly_scenario_cfg)
task = scenario.predicting
task.submit()

Job

Each time a task is submitted (through a Scenario or a Pipeline submission), a new Job entity is instantiated.

Job attributes

Here is the list of the job's attributes:

  • task: The Task of the job.
  • force: The force attribute is True if the execution of the job has been forced.
  • creation_date: The date of the creation of the job with the status SUBMITTED.
  • status: The status of the job.
  • stacktrace: The stacktrace of the exceptions handled during the execution of the jobs.

Job Status

  • SUBMITTED: The job is created but not enqueued for execution.
  • BLOCKED: The job is blocked because inputs are not ready.
  • PENDING: The job is waiting for execution.
  • RUNNING: The job is being executed.
  • CANCELED: The job was canceled by the user.
  • FAILED: The job failed due to timeout or execution error.
  • COMPLETED: The job execution is done and outputs were written.
  • SKIPPED: The job was and will not be executed.
  • ABANDONED: The job was abandoned and will not be executed.

Get/Delete Job

Jobs are created when a task is submitted.

  • You can get all of them with get_jobs().
  • You can get the latest job of a Task with get_latest_job().
  • You can retrieve a job from its id by using the get() method.

A Job can be deleted using the delete_job() method. You can also delete all jobs with delete_jobs().

Deleting a Job can raise an JobNotDeletedException if the Status of the Job is not SKIPPED, COMPLETED or FAILED. You can overcome this behaviour by forcing the deletion with the force parameter set to True: taipy.delete_job(job, force=True).

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import taipy as tp

def double(nb):
    return nb * 2

print(f'(1) Number of jobs: {len(tp.get_jobs())}.')

# Create a scenario then submit it.
input_data_node_config = tp.configure_data_node("input", default_data=21)
output_data_node_config = tp.configure_data_node("output")
task_config = tp.configure_task("double_task", double)
scenario_config = tp.configure_scenario_from_tasks("my_scenario", [task_config])

tp.Core().run()

scenario = tp.create_scenario(scenario_config)
tp.submit(scenario)

# Retrieve all jobs.
print(f'(2) Number of jobs: {len(tp.get_jobs())}.')

# Get the latest created job of a Task.
tp.get_latest_job(scenario.double_task)

# Then delete it.
tp.delete_job(scenario.double_task)
print(f'(3) Number of jobs: {len(tp.get_jobs())}.')

This example will produce the following output:

(1) Number of jobs: 0.
(2) Number of jobs: 1.
(3) Number of jobs: 0.

Cancel Job

Jobs are created when a task is submitted.

  • You can cancel a job with the following statuses SUBMITTED, PENDING, or BLOCKED with cancel_job(job). When canceling a job, you will set the Status of subsequent jobs of the canceled job to ABANDONED. However, a job whose status is RUNNING, COMPLETED, SKIPPED, FAILED, CANCELED, or ABANDONED, cannot be canceled. When the cancel method is called on a job with its status being either RUNNING, COMPLETED, or SKIPPED, its subsequent jobs will be abandoned while its status remains unchanged.

Canceling a job

import taipy as tp

def double(nb):
    sleep(5)
    return nb * 2

print(f'(1) Number of jobs: {len(tp.get_jobs())}.')

# Create a scenario then submit it.
input_data_node_cfg = tp.configure_data_node("input", default_data=21)
output_data_node_cfg = tp.configure_data_node("output")
double_task_config = tp.configure_task("double_task", double, input_data_node_cfg, output_data_node_cfg)
print_task_config = tp.configure_task("print_task", print, output_data_node_cfg)
scenario_config = tp.configure_scenario_from_tasks("my_scenario", [double_task_config, print_task_config])

tp.Core().run()

scenario = tp.create_scenario(scenario_config)
tp.submit(scenario)

# Count the jobs.
print(f'(2) Number of jobs: {len(tp.get_jobs())}.')

jobs = tp.get_latest_job(scenario.double_task)

# Get status of the job.
print(f'(3) Status of job double_task: {job[0].status}')
print(f'(4) Status of job print_task: {jobs[1].status}')

# Then cancel the second job.
tp.cancel_job(job[1])

sleep(10)

print(f'(5) Status of job double_task: {job[0].status}')
print(f'(6) Status of job print_task: {jobs[1].status}')

This example produces the following output:

(1) Number of jobs: 0.
(2) Number of jobs: 2.
(3) Status of job double_task: Status.RUNNING
(4) Status of job print_task: Status.BLOCKED
(5) Status of job double_task: Status.COMPLETED
(6) Status of job print_task: Status.CANCELED

Canceling a running job

import taipy as tp

def double(nb):
    sleep(5)
    return nb * 2

print(f'(1) Number of jobs: {len(tp.get_jobs())}.')

# Create a scenario then submit it.
input_data_node_cfg = tp.configure_data_node("input", default_data=21)
output_data_node_cfg = tp.configure_data_node("output")
double_task_config = tp.configure_task("double_task", double, input_data_node_cfg, output_data_node_cfg)
print_task_config = tp.configure_task("print_task", print, output_data_node_cfg)
scenario_config = tp.configure_scenario_from_tasks("my_scenario", [double_task_config, print_task_config])

tp.Core().run()

scenario = tp.create_scenario(scenario_config)
tp.submit(scenario)

# Count the jobs.
print(f'(2) Number of jobs: {len(tp.get_jobs())}.')

jobs = tp.get_latest_job(scenario.double_task)

# Get status of the job.
print(f'(3) Status of job double_task: {job[0].status}')
print(f'(4) Status of job print_task: {jobs[1].status}')

# Then cancel the first job.
tp.cancel_job(job[0])

sleep(10)

print(f'(5) Status of job double_task: {job[0].status}')
print(f'(6) Status of job print_task: {jobs[1].status}')

This example produces the following output:

(1) Number of jobs: 0.
(2) Number of jobs: 2.
(3) Status of job double_task: Status.RUNNING
(4) Status of job print_task: Status.BLOCKED
(5) Status of job double_task: Status.COMPLETED
(6) Status of job print_task: Status.ABANDONED

Subscribe to job execution

You can subscribe to a Pipeline or a Scenario execution to be notified when a job status changes.

If you want a function named my_function() to be called on each status change of each task execution of all scenarios, use taipy.subscribe_scenario(my_function). You can use taipy.subscribe_pipeline(my_function) to work at the pipeline level.

If you want your function my_function() to be called for each task of a scenario called my_scenario, you should call taipy.subscribe_scenario(my_function, my_scenario). It is similar in the context of pipelines: to be notified on a given pipeline stored in my_pipeline, you must call taipy.subscribe_pipeline(my_function, my_pipeline).

You can also define a function that receives multiple parameters to be used as a subscriber. It is similar to the example above, you can just add your parameters as a list, for example taipy.subscribe_scenario(my_function, ["my_param", 42], my_scenario).

You can also unsubscribe to scenarios by using taipy.unsubscribe_scenario(function) or tp.unsubscribe_pipeline(function) for pipelines. Same as for subscription, the un-subscription can be global, or you can specify the scenario or pipeline by passing it as a parameter.

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import taipy as tp

def do_nothing():
    ...

def my_global_subscriber(scenario, job):
    print(f"my_global_subscriber: scenario '{scenario.config_id}'; task '{job.task.config_id}'.")

def my_subscriber(scenario, job):
    print(f"my_subscriber: scenario '{scenario.config_id}'; task '{job.task.config_id}'.")

def my_subscriber_multi_param(scenario, job, params):
    print(f"my_subscriber_multi_param: params {params}; task '{job.task.config_id}'.")

task_1 = tp.configure_task("my_task_1", do_nothing)
task_2 = tp.configure_task("my_task_2", do_nothing)
scenario_1 = tp.configure_scenario_from_tasks("my_scenario", [task, task])
scenario_2 = tp.configure_scenario_from_tasks("my_scenario", [task, task])

tp.Core().run()

params = ["my_param_1", 42]

tp.subscribe_scenario(my_global_subscriber)  # Global subscription
tp.subscribe_scenario(my_subscriber, scenario_1)  # Subscribe only to one scenario
tp.subscribe_scenario(my_subscriber_multi_param, params, scenario_1)  # Subscribe with params

print('Submit: scenario_1')
tp.submit(scenario_1)
print('Submit: scenario_2')

print('Unsubscribe to my_global_subscriber for scenario_1')
tp.unsubscribe_scenario(my_global_subscriber, scenario_1)
print('Submit: scenario_1)
tp.submit(scenario_1)

This example will produce the following output:

Submit: scenario_1
my_global_subscriber: scenario 'my_scenario_1'; task 'my_task_1'.
my_subscriber: scenario 'my_scenario_1'; task 'my_task_1'.
my_subscriber_multi_param: params ["my_param_1", 42]; task 'my_task_1 .
my_subscriber: scenario 'my_scenario_1' ; task 'my_task_2'.
my_subscriber_multi_param: params ["my_param_1", 42]; task 'my_task_2'.
Submit: scenario_2
my_global_subscriber: scenario 'my_scenario_2'; task 'my_task_1'.
Unsubscribe to my_global_subscriber for scenario_1
Submit: scenario_1
my_subscriber: scenario 'my_scenario_1'; task 'my_task_1'.
my_subscriber: scenario 'my_scenario_1'; task 'my_task_2'.