Job Execution mode
You can download the code here. Here is the Python version with the TOML file .
Executing jobs¶
Estimated Time for Completion: 15 minutes; Difficulty Level: Advanced
Taipy has different ways to execute the code. Changing the execution mode can be useful for running multiple tasks in parallel.
-
standalone mode: asynchronous. Jobs can be run in parallel depending on the graph of execution (if max_nb_of_workers > 1).
-
development mode: synchronous. The default execution mode is development.
We define a configuration and functions to showcase the two execution modes.
# Normal function used by Taipy
def double(nb):
return nb * 2
def add(nb):
print("Wait 10 seconds in add function")
time.sleep(10)
return nb + 10
This line of code alters the execution mode. Setting it to standalone makes Taipy Core work asynchronously. In this configuration, a maximum of two tasks can run simultaneously.
Config.configure_job_executions(mode="standalone", max_nb_of_workers=2)
if __name__=="__main__":
tp.Core().run()
scenario_1 = tp.create_scenario(scenario_cfg)
scenario_2 = tp.create_scenario(scenario_cfg)
scenario_1.submit()
scenario_2.submit()
time.sleep(30)
Jobs from the two submissions are being executed simultaneously. If max_nb_of_workers
was greater, we could run multiple scenarios at the same time and multiple tasks of a scenario at the same time.
Some options for the submit function exist:
-
wait: if wait is True, the submission waits for the end of all the jobs (if timeout is not defined).
-
timeout: if wait is True, Taipy waits for the end of the submission up to a certain amount of time.
if __name__=="__main__":
tp.Core().run()
scenario_1 = tp.create_scenario(scenario_cfg)
scenario_1.submit(wait=True)
scenario_1.submit(wait=True, timeout=5)
Entire code¶
from taipy.core.config import Config
import taipy as tp
import datetime as dt
import pandas as pd
import time
# Normal function used by Taipy
def double(nb):
return nb * 2
def add(nb):
print("Wait 10 seconds in add function")
time.sleep(10)
return nb + 10
Config.configure_job_executions(mode="standalone", max_nb_of_workers=2)
# Configuration of Data Nodes
input_cfg = Config.configure_data_node("input", default_data=21)
intermediate_cfg = Config.configure_data_node("intermediate", default_data=21)
output_cfg = Config.configure_data_node("output")
# Configuration of tasks
first_task_cfg = Config.configure_task("double",
double,
input_cfg,
intermediate_cfg)
second_task_cfg = Config.configure_task("add",
add,
intermediate_cfg,
output_cfg)
# Configuration of scenario
scenario_cfg = Config.configure_scenario(id="my_scenario",
task_configs=[first_task_cfg,
second_task_cfg])
Config.export("config_07.toml")
if __name__=="__main__":
tp.Core().run()
scenario_1 = tp.create_scenario(scenario_cfg)
scenario_2 = tp.create_scenario(scenario_cfg)
scenario_1.submit()
scenario_2.submit()
scenario_1 = tp.create_scenario(scenario_cfg)
scenario_1.submit(wait=True)
scenario_1.submit(wait=True, timeout=5)