Skip to content

Job

This page describes how to manage jobs in a Taipy application. It explains how to create, read, delete jobs, and how to access their attributes.

A Job in Taipy represents a unique execution of a task. It is created every time a task is submitted for execution (either directly or as part of a scenario/sequence). A job holds various attributes such as the task it is associated with, its status, and the stack trace of any exceptions.

Job creation

Every time a task is submitted (through a Scenario, a Sequence or a Task submission), a new Job entity is instantiated from the Task.

The Job is created using the submit() function, passing a submittable entity (i.e., a Task, a Scenario, or a Sequence) as an argument. A Submission entity is returned, holding the list of jobs created.

Example

The code below demonstrates how to create a job by submitting a scenario. The scenario is made of a single task that doubles a number. When the scenario is submitted, a submission object is returned containing the list of jobs created. In our case, only one job is created.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import taipy as tp
from taipy import Config


def double(nb):
    return nb * 2

if __name__ == "__main__":
    # Create a scenario configuration made of one task configuration
    input_cfg = Config.configure_data_node("my_input", default_data=21)
    output_cfg = Config.configure_data_node("my_output")
    task_cfg = Config.configure_task("double_task", double, [input_cfg], [output_cfg])
    scenario_cfg = Config.configure_scenario("my_scenario", [task_cfg])

    # Run the Orchestrator service
    tp.Orchestrator().run()

    # Create a scenario and submit it
    scenario = tp.create_scenario(scenario_cfg)
    submission = tp.submit(scenario)

    # Retrieve the list of the jobs (only one job corresponding to the task in the scenario)
    jobs = submission.jobs

For more details and examples on how to submit scenarios, sequences or tasks, see the task orchestration page.

Graphical User Interface

Taipy offers a visual element named job selector (job_selector) dedicated to job management and monitoring. It is designed to help end-users list, select, filter, and visualize, jobs in an intuitive way.

The job selector displays the list of jobs submitted within a Taipy application. It lists all the jobs with other related information, in particular the job status, and provides users with the ability to select a job.

The default usage is really simple. It does not require any specific configuration to get the following display. However, thanks to its rich configurability, you can customize the columns to display in job selector.

For more details, see the job selector page.

Job attributes

A Job entity ìs identified by a unique identifier id that is generated by Taipy.

Here is the list of Job's main attributes:

  • task: The Task of the job.
  • force: The force attribute is True if the execution of the job has been forced.
  • creation_date: The date of the creation of the job with the status SUBMITTED.
  • status: The status of the job.
  • stacktrace: The stacktrace of the exceptions handled during the execution of the jobs.

Job Status

  • SUBMITTED: The job is created but not enqueued for execution.
  • BLOCKED: The job is blocked because inputs are not ready.
  • PENDING: The job is waiting for execution.
  • RUNNING: The job is being executed.
  • CANCELED: The job was canceled by the user.
  • FAILED: The job failed due to timeout or execution error.
  • COMPLETED: The job execution is done and outputs were written.
  • SKIPPED: The job was and will not be executed.
  • ABANDONED: The job was abandoned and will not be executed.

When the status of a job changes, the timestamp of the change is recorded. These timestamps can be accessed using the following properties:

  • submitted_at: The datetime when the job was submitted.
  • run_at: The datetime when the job started running.
  • finished_at: The datetime when the job finished.
  • execution_duration: The duration of the job execution in seconds, which is the difference between the finished_at and run_at timestamps. If the job is not finished, the duration is the difference between the current time and the run_at timestamp.
  • pending_duration: The duration of the job in the pending state in seconds, which is the duration between the PENDING status and the RUNNING status. If the job is pending, the duration is the difference between the current time and the datetime when the job was in the PENDING status.
  • blocked_duration: The duration of the job in the blocked state in seconds, which is the duration between the BLOCKED status and the PENDING status. If the job is blocked, the duration is the difference between the current time and the datetime when the job was in the BLOCKED status.

Get/Delete Job

Three methods are available to retrieve jobs:

A Job can be deleted using the taipy.delete_job() method. You can also delete all jobs with taipy.delete_jobs().

Deleting a Job can raise an JobNotDeletedException if the Status of the Job is not SKIPPED, COMPLETED or FAILED. You can overcome this behaviour by forcing the deletion with the force parameter set to True: taipy.delete_job(job, force=True).

Example

The code below demonstrates how to get and delete a job.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import taipy as tp
from taipy import Config


def double(nb):
    return nb * 2

if __name__ == "__main__":
    # Create a scenario configuration made of one task configuration
    input_cfg = Config.configure_data_node("my_input", default_data=21)
    output_cfg = Config.configure_data_node("my_output")
    task_cfg = Config.configure_task("double_task", double, [input_cfg], [output_cfg])
    scenario_cfg = Config.configure_scenario("my_scenario", [task_cfg])

    # Run the Orchestrator service
    tp.Orchestrator().run()

    # Create a scenario and submit it
    scenario = tp.create_scenario(scenario_cfg)
    submission = tp.submit(scenario)

    # Get all jobs.
    all_jobs = tp.get_jobs()

    # Get the latest job of a Task.
    task = scenario.double_task
    job = tp.get_latest_job(task)

    # Delete a job
    tp.delete_job(job)

Cancel Job

A job with the following statuses SUBMITTED, PENDING, or BLOCKED can be cancelled using the taipy.cancel_job(job) function.

When canceling a job, you will set the Status of subsequent jobs of the canceled job to ABANDONED. However, a job whose status is RUNNING, COMPLETED, SKIPPED, FAILED, CANCELED, or ABANDONED, cannot be canceled. When the cancel method is called on a job with its status being either RUNNING, COMPLETED, or SKIPPED, its subsequent jobs will be abandoned while its status remains unchanged.

Canceling a job

The code below demonstrates how to cancel a job.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from time import sleep

import taipy as tp
from taipy import Config


def double(nb):
    sleep(5)
    return nb * 2

if __name__ == "__main__":
    print(f'(1) Number of jobs: {len(tp.get_jobs())}.')

    # Create a scenario configuration with 2 sequential tasks.
    input_data_node_cfg = Config.configure_data_node("my_input", default_data=21)
    output_data_node_cfg = Config.configure_data_node("my_output")
    double_task_cfg = Config.configure_task("double_task", double, input_data_node_cfg, output_data_node_cfg)
    print_task_cfg = Config.configure_task("print_task", print, output_data_node_cfg)
    scenario_config = Config.configure_scenario("my_scenario", [double_task_cfg, print_task_cfg])

    # Run the Orchestrator service.
    tp.Orchestrator().run()

    # Create a scenario and submit it.
    scenario = tp.create_scenario(scenario_config)
    submission = tp.submit(scenario)

    # Count and get the jobs.
    print(f'(2) Number of jobs: {len(tp.get_jobs())}.')
    job_double = tp.get_latest_job(scenario.double_task)
    job_print = tp.get_latest_job(scenario.print_task)

    # Get status of the job.
    print(f'(3) Status of job double_task: {job_double.status}')
    print(f'(4) Status of job print_task: {job_print.status}')

    # Then cancel the second job from print_task.
    tp.cancel_job(job_print)

    sleep(10)

    print(f'(5) Status of job double_task: {job_double.status}')
    print(f'(6) Status of job print_task: {job_print.status}')

IT produces the following output:

(1) Number of jobs: 0.
(2) Number of jobs: 2.
(3) Status of job double_task: Status.RUNNING
(4) Status of job print_task: Status.BLOCKED
(5) Status of job double_task: Status.COMPLETED
(6) Status of job print_task: Status.CANCELED

Canceling a running job

The code below demonstrates how to cancel a running job.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from time import sleep

import taipy as tp
from taipy import Config


def double(nb):
    sleep(5)
    return nb * 2

if __name__ == "__main__":
    print(f'(1) Number of jobs: {len(tp.get_jobs())}.')

    # Create a scenario configuration with 2 sequential tasks.
    input_data_node_cfg = Config.configure_data_node("my_input", default_data=21)
    output_data_node_cfg = Config.configure_data_node("my_output")
    double_task_cfg = Config.configure_task("double_task", double, input_data_node_cfg, output_data_node_cfg)
    print_task_cfg = Config.configure_task("print_task", print, output_data_node_cfg)
    scenario_cfg = Config.configure_scenario("my_scenario", [double_task_cfg, print_task_cfg])

    # Run the Orchestrator service.
    tp.Orchestrator().run()

    # Create and submit a scenario.
    scenario = tp.create_scenario(scenario_cfg)
    submission = tp.submit(scenario)

    # Count and get the jobs.
    print(f'(2) Number of jobs: {len(tp.get_jobs())}.')
    job_double = tp.get_latest_job(scenario.double_task)
    job_print = tp.get_latest_job(scenario.print_task)

    # Get status of the job.
    print(f'(3) Status of job double_task: {job_double.status}')
    print(f'(4) Status of job print_task: {job_print.status}')

    # Then cancel the first job from double_task.
    tp.cancel_job(job_double)

    sleep(10)

    print(f'(5) Status of job double_task: {job_double.status}')
    print(f'(6) Status of job print_task: {job_print.status}')

This example produces the following output:

(1) Number of jobs: 0.
(2) Number of jobs: 2.
(3) Status of job double_task: Status.RUNNING
(4) Status of job print_task: Status.BLOCKED
(5) Status of job double_task: Status.COMPLETED
(6) Status of job print_task: Status.ABANDONED

Subscribe to job execution

You can subscribe to a job status changes if you want a user defined function to be called on each status change.

For more details and examples, see the job subscription section.