Examples
These examples demonstrate the power of event-driven programming in real-world applications.
Real-Time GUI Updates
By consuming and processing events, developers can build reactive and dynamic systems that
notify end-users of important changes (such as new scenario creations or data node updates)
as they happen. This approach significantly enhances the user experience by providing
real-time updates and ensuring users are always informed through an interactive and
engaging interface.
This example registers callbacks to trigger GUI notifications depending on the event
parameters.
Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72 | import taipy as tp
import taipy.gui.builder as tgb
from taipy import Config, Gui, Orchestrator, Scenario, SubmissionStatus
from taipy.event import EventEntityType, EventOperation, EventProcessor
from taipy.gui import notify
##### Configuration and Functions #####
def build_message(name: str):
return f"Hello {name}!"
name_data_node_cfg = Config.configure_data_node(id="input_name", default_data="Florian")
message_data_node_cfg = Config.configure_data_node(id="message")
build_msg_task_cfg = Config.configure_task("build_msg", build_message, name_data_node_cfg, message_data_node_cfg)
scenario_cfg = Config.configure_scenario("scenario", task_configs=[build_msg_task_cfg])
value = "Default text"
#### Event callbacks ####
def notify_users_of_creation(state, event):
state.value = "Scenario created"
notify(state, "s", "Scenario Created")
def notify_users_of_submission(state, event):
state.value = "Scenario submitted"
notify(state, "s", "Scenario Submitted")
def notify_users_of_update(state, event):
submission = tp.get(event.entity_id)
if not submission:
return
scenario = tp.get(submission.entity_id)
if not scenario or not isinstance(scenario, Scenario):
return
if event.attribute_value != SubmissionStatus.COMPLETED:
return
new_value_of_dn = scenario.message.read()
state.value = f"Data Node updated with value: {new_value_of_dn}"
notify(state, "i", "Data Node Updated")
#### Normal callbacks ####
def create_and_submit_scenario(state):
scenario = tp.create_scenario(config=scenario_cfg)
tp.submit(scenario)
#### Page ####
with tgb.Page() as page:
tgb.text("{value}")
tgb.button("Create and submit a scenario!", on_action=create_and_submit_scenario)
if __name__ == "__main__":
orchestrator = Orchestrator()
gui = Gui(page)
event_processor = EventProcessor(gui)
event_processor.broadcast_on_event(callback=notify_users_of_creation,
entity_type=EventEntityType.SCENARIO,
operation=EventOperation.CREATION)
event_processor.broadcast_on_event(callback=notify_users_of_submission,
entity_type=EventEntityType.SCENARIO,
operation=EventOperation.SUBMISSION)
event_processor.broadcast_on_event(callback=notify_users_of_update,
entity_type=EventEntityType.SUBMISSION,
operation=EventOperation.UPDATE,
attribute_name="status")
event_processor.start()
orchestrator.run()
gui.run()
|
This snippet shows how you can process events to notify users whenever
a scenario is created, submitted or completed.
External API calls
The following example illustrates how to integrate external systems with a Taipy
application by consuming events. It demonstrates how to trigger external API calls
when specific job status updates occur, such as when a job fails. This integration
allows for seamless communication between a Taipy application and an external service.
This script defines a custom event consumer class JobFailureCoreConsumer, which listens
for all events published by Taipy, when a JOB entity's status attribute is UPDATE,
and triggers an external API call based on the JOB's id.
Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 | import requests
import taipy as tp
import taipy.gui.builder as tgb
from taipy import Config, Gui, Orchestrator
from taipy.core import SubmissionStatus
from taipy.core.job.status import Status
from taipy.core.notification import CoreEventConsumerBase, EventEntityType, EventOperation, Notifier
from taipy.event import EventProcessor
from taipy.gui import notify
##### Configuration and Functions #####
def fail_task(name: str):
raise Exception(f"This function is trigger by {name} and is designed to fail!")
name_data_node_cfg = Config.configure_data_node(id="input_name", default_data="Florian")
message_data_node_cfg = Config.configure_data_node(id="message")
build_msg_task_cfg = Config.configure_task("build_msg", fail_task, name_data_node_cfg, message_data_node_cfg)
scenario_cfg = Config.configure_scenario("scenario", task_configs=[build_msg_task_cfg])
value = "Default text"
#### Event callbacks ####
def trigger_external_api(event, gui):
if event.attribute_value == Status.FAILED:
job_id = event.entity_id
requests.get("http://127.0.0.1:5000/replace-this-with-your-api", params={"message": f"Job {job_id} failed."})
#### Normal callbacks ####
def create_and_submit_scenario(state):
scenario = tp.create_scenario(config=scenario_cfg)
tp.submit(scenario)
#### Page ####
with tgb.Page() as page:
tgb.text("{value}")
tgb.button("Create and submit a scenario!", on_action=create_and_submit_scenario)
if __name__ == "__main__":
orchestrator = Orchestrator()
gui = Gui(page)
event_processor = EventProcessor(gui)
event_processor.on_event(callback=trigger_external_api,
entity_type=EventEntityType.JOB,
operation=EventOperation.UPDATE,
attribute_name="status")
event_processor.start()
orchestrator.run()
gui.run()
|
This snippet shows how you can process Taipy events to trigger calls to an external
API whenever a job fails.