Skip to content

Job Execution mode

Estimated Time for Completion: 15 minutes; Difficulty Level: Advanced

Download the code

Taipy has different modes to execute the code. Changing the execution mode can be useful for running multiple tasks in parallel.

  • standalone mode: asynchronous. Jobs can be run in parallel depending on the graph of execution (if max_nb_of_workers > 1).

  • development mode: synchronous. The default execution mode is development.

We define a configuration and functions to showcase the two execution modes.

# Normal function used by Taipy
def double(nb):
    return nb * 2

def add(nb):
    print("Wait 10 seconds in add function")
    time.sleep(10)
    return nb + 10

Configuration

This line of code alters the execution mode. Setting it to standalone makes Task orchestration work asynchronously. In this configuration, a maximum of two tasks can run simultaneously.

Warning

In standalone mode, when a subprocess is spawned, the code is re-executed except for the code inside the if __name__ == "__main__": condition. This can lead to unexpected behavior if the code is not protected.

Therefore, it's a good practice to protect the main code with the if __name__ == "__main": condition in your Taipy application main script. By doing this, the code will not be re-executed when the subprocesses are spawned.

Config.configure_job_executions(mode="standalone", max_nb_of_workers=2)
if __name__=="__main__":
    tp.Orchestrator().run()
    scenario_1 = tp.create_scenario(scenario_cfg)
    scenario_2 = tp.create_scenario(scenario_cfg)

    scenario_1.submit()
    scenario_2.submit()

    time.sleep(30)

Jobs from the two submissions are being executed simultaneously. If max_nb_of_workers was greater, we could run multiple scenarios at the same time and multiple tasks of a scenario at the same time.

Some options for the submit function exist:

  • wait: if wait is True, the submission waits for the end of all the jobs (if timeout is not defined).

  • timeout: if wait is True, Taipy waits for the end of the submission up to a certain amount of time.

if __name__=="__main__":
    tp.Orchestrator().run()
    scenario_1 = tp.create_scenario(scenario_cfg)
    scenario_1.submit(wait=True)
    scenario_1.submit(wait=True, timeout=5)

Entire code

import time

import taipy as tp
from taipy.core.config import Config


# Normal function used by Taipy
def double(nb):
    return nb * 2

def add(nb):
    print("Wait 10 seconds in add function")
    time.sleep(10)
    return nb + 10

if __name__=="__main__":
    Config.configure_job_executions(mode="standalone", max_nb_of_workers=2)

    # Configuration of Data Nodes
    input_cfg = Config.configure_data_node("my_input", default_data=21)
    intermediate_cfg = Config.configure_data_node("intermediate", default_data=21)
    output_cfg = Config.configure_data_node("my_output")

    # Configuration of tasks
    first_task_cfg = Config.configure_task("double",
                                        double,
                                        input_cfg,
                                        intermediate_cfg)

    second_task_cfg = Config.configure_task("add",
                                            add,
                                            intermediate_cfg,
                                            output_cfg)

    # Configuration of the scenario
    scenario_cfg = Config.configure_scenario(id="my_scenario",
                                            task_configs=[first_task_cfg,
                                                        second_task_cfg])

    Config.export("config.toml")

    tp.Orchestrator().run()
    scenario_1 = tp.create_scenario(scenario_cfg)
    scenario_2 = tp.create_scenario(scenario_cfg)
    scenario_1.submit()
    scenario_2.submit()

    scenario_1 = tp.create_scenario(scenario_cfg)
    scenario_1.submit(wait=True)
    scenario_1.submit(wait=True, timeout=5)