Job execution
Submit a scenario, pipeline or task.¶
To execute a scenario, you need to call the submit()
method:
1 2 3 4 5 6 |
|
In line 4, we create a new scenario from a scenario configuration and submit it for execution (line 6). The submit
method triggers the submission of all the scenario's pipelines. Then each task of each pipeline will be submitted.
Another syntax.
To submit a scenario, you can also use the method Scenario.submit()
:
1 2 3 4 5 6 |
|
By default, Taipy will asynchronously execute the jobs. If you want to wait until the submitted jobs are finished, you can use the parameter wait and timeout:
1 2 3 4 5 6 7 |
|
timeout can be an integer or a float. By default, wait is False and timeout is None. If wait is True and timeout is not specified or None, there is no limit to the wait time. If wait is True and timeout is specified, taipy will wait until all the submitted jobs are finished or up to timeout seconds.
You can also submit just a single pipeline with the same submit()
method:
1 2 3 4 5 6 7 |
|
sales_pipeline
from the created scenario. In line 7, we submit only this
pipeline for execution. The submit()
method triggers the submission of all the pipeline's tasks. When
submitting a pipeline, you can also use the two parameters wait and timeout to wait until all the jobs are
finished or up to timeout seconds.
Another syntax.
To submit a pipeline, you can also use the method Pipeline.submit()
:
1 2 3 4 5 6 |
|
You can also submit just a single task with the same submit()
method:
1 2 3 4 5 6 7 |
|
predicting
from the created scenario. In line 7, we submit only this
task for execution. When submitting a task, you can also use the two parameters wait and timeout to wait until
the job is finished or up to timeout seconds.
Another syntax.
To submit a task, you can also use the method Task.submit()
:
1 2 3 4 5 6 |
|
Job¶
Each time a task is submitted (through a Scenario
or a Pipeline
submission), a new
Job
entity is instantiated.
Job attributes¶
Here is the list of the job's attributes:
- task: The
Task
of the job. - force: The force attribute is
True
if the execution of the job has been forced. - creation_date: The date of the creation of the job with the status
SUBMITTED
. - status: The status of the job.
- stacktrace: The stacktrace of the exceptions handled during the execution of the jobs.
Job Status¶
SUBMITTED
: The job is created but not enqueued for execution.BLOCKED
: The job is blocked because inputs are not ready.PENDING
: The job is waiting for execution.RUNNING
: The job is being executed.CANCELED
: The job was canceled by the user.FAILED
: The job failed due to timeout or execution error.COMPLETED
: The job execution is done and outputs were written.SKIPPED
: The job was and will not be executed.ABANDONED
: The job was abandoned and will not be executed.
Get/Delete Job¶
Jobs are created when a task is submitted.
- You can get all of them with
get_jobs()
. - You can get the latest job of a Task with
get_latest_job()
. - You can retrieve a job from its id by using the
get()
method.
A Job can be deleted using the delete_job()
method. You can also delete all jobs with delete_jobs()
.
Deleting a Job can raise an JobNotDeletedException
if the Status
of the Job is not SKIPPED
, COMPLETED
or
FAILED
. You can overcome this behaviour by forcing the deletion with the force parameter set to True:
taipy.delete_job(job, force=True)
.
Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
|
This example will produce the following output:
(1) Number of job: 0.
(2) Number of job: 1.
(3) Number of job: 0.
Cancel Job¶
Jobs are created when a task is submitted.
- You can cancel a job with the following statuses
SUBMITTED
,PENDING
, orBLOCKED
withcancel_job(job)
. When canceling a job, you will set theStatus
of subsequent jobs of the canceled job toABANDONED
. However, a job whose status isRUNNING
,COMPLETED
,SKIPPED
,FAILED
,CANCELED
, orABANDONED
, cannot be canceled. When the cancel method is called on a job with its status being eitherRUNNING
,COMPLETED
, orSKIPPED
, its subsequent jobs will be abandoned while its status remains unchanged.
Canceling a job
import taipy as tp
def double(nb):
sleep(5)
return nb * 2
print(f'(1) Number of jobs: {len(tp.get_jobs())}.')
# Create a scenario then submit it.
input_data_node_cfg = tp.configure_data_node("input", default_data=21)
output_data_node_cfg = tp.configure_data_node("output")
double_task_config = tp.configure_task("double_task", double, input_data_node_cfg, output_data_node_cfg)
print_task_config = tp.configure_task("print_task", print, output_data_node_cfg)
scenario_config = tp.configure_scenario_from_tasks("my_scenario", [double_task_config, print_task_config])
scenario = tp.create_scenario(scenario_config)
tp.submit(scenario)
# Count the jobs.
print(f'(2) Number of jobs: {len(tp.get_jobs())}.')
jobs = tp.get_latest_job(scenario.double_task)
# Get status of the job.
print(f'(3) Status of job double_task: {job[0].status}')
print(f'(4) Status of job print_task: {jobs[1].status}')
# Then cancel the second job.
tp.cancel_job(job[1])
sleep(10)
print(f'(5) Status of job double_task: {job[0].status}')
print(f'(6) Status of job print_task: {jobs[1].status}')
This example produces the following output:
(1) Number of jobs: 0.
(2) Number of jobs: 2.
(3) Status of job double_task: Status.RUNNING
(4) Status of job print_task: Status.BLOCKED
(5) Status of job double_task: Status.COMPLETED
(6) Status of job print_task: Status.CANCELED
Canceling a running job
import taipy as tp
def double(nb):
sleep(5)
return nb * 2
print(f'(1) Number of jobs: {len(tp.get_jobs())}.')
# Create a scenario then submit it.
input_data_node_cfg = tp.configure_data_node("input", default_data=21)
output_data_node_cfg = tp.configure_data_node("output")
double_task_config = tp.configure_task("double_task", double, input_data_node_cfg, output_data_node_cfg)
print_task_config = tp.configure_task("print_task", print, output_data_node_cfg)
scenario_config = tp.configure_scenario_from_tasks("my_scenario", [double_task_config, print_task_config])
scenario = tp.create_scenario(scenario_config)
tp.submit(scenario)
# Count the jobs.
print(f'(2) Number of jobs: {len(tp.get_jobs())}.')
jobs = tp.get_latest_job(scenario.double_task)
# Get status of the job.
print(f'(3) Status of job double_task: {job[0].status}')
print(f'(4) Status of job print_task: {jobs[1].status}')
# Then cancel the first job.
tp.cancel_job(job[0])
sleep(10)
print(f'(5) Status of job double_task: {job[0].status}')
print(f'(6) Status of job print_task: {jobs[1].status}')
This example produces the following output:
(1) Number of jobs: 0.
(2) Number of jobs: 2.
(3) Status of job double_task: Status.RUNNING
(4) Status of job print_task: Status.BLOCKED
(5) Status of job double_task: Status.COMPLETED
(6) Status of job print_task: Status.ABANDONED
Subscribe to job execution¶
After each Task
execution, you can be notified by subscribing to a Pipeline
or a Scenario
.
You will be notified for each scenario or pipeline by default, except if you specify one as a target.
If you want a function named my_function
to be called on each status change of each task execution of all scenarios,
use taipy.subscribe_scenario(my_function)
. You can use taipy.subscribe_pipeline(my_function)
to work at the
pipeline level.
If you want your function my_function
to be called for each task of a scenario called my_scenario
, you should call
taipy.subscribe_scenario(my_function, my_scenario)
. It is similar in the context of pipelines: to be notified on a
given pipeline stored in my_pipeline
, you must call taipy.subscribe_pipeline(my_function, my_pipeline)
.
You can also define a function that receives multiple parameters to be used as a subscriber. It is similar to the
example above, you can just add your parameters as a list, for example
taipy.subscribe_scenario(my_function, ["my_param", 42], my_scenario)
.
You can also unsubscribe to scenarios by using taipy.unsubscribe_scenario(function)
or tp.unsubscribe_pipeline(function)
for pipelines. Same as for subscription, the un-subscription can be global,
or you can specify the scenario or pipeline by passing it as a parameter.
Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
|
This example will produce the following output:
Submit: scenario_1
my_global_subscriber: scenario 'my_scenario_1'; task 'my_task_1'.
my_subscriber: scenario 'my_scenario_1'; task 'my_task_1'.
my_subscriber_multi_param: params ["my_param_1", 42]; task 'my_task_1 .
my_subscriber: scenario 'my_scenario_1' ; task 'my_task_2'.
my_subscriber_multi_param: params ["my_param_1", 42]; task 'my_task_2'.
Submit: scenario_2
my_global_subscriber: scenario 'my_scenario_2'; task 'my_task_1'.
Unsubscribe to my_global_subscriber for scenario_1
Submit: scenario_1
my_subscriber: scenario 'my_scenario_1'; task 'my_task_1'.
my_subscriber: scenario 'my_scenario_1'; task 'my_task_2'.