Examples
Examples of using Taipy event notifications to capture and consume events.
Real-Time GUI Updates with Taipy Event Consumers
This example demonstrates the power of event-driven programming in real-world applications. By
capturing and processing events, developers can build responsive, dynamic systems that notify
users of important changes (such as new scenario creations or data node updates) as they happen.
This approach significantly enhances the user experience by providing real-time updates and
ensuring users are always informed through an interactive and engaging interface.
This script defines a custom event consumer class SpecificCoreConsumer
, which listens
for all events published by Taipy and triggers GUI notification based on those events.
It includes determining if the event is published from a Scenario
entity or DataNode
entity
and if the action is CREATION
or UPDATE
.
Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77 | import taipy as tp
import taipy.gui.builder as tgb
from taipy import Config, Gui, Orchestrator
from taipy.core import SubmissionStatus
from taipy.core.notification import CoreEventConsumerBase, EventEntityType, EventOperation, Notifier
from taipy.gui import notify
##### Configuration and Functions #####
def build_message(name: str):
return f"Hello {name}!"
name_data_node_cfg = Config.configure_data_node(id="input_name", default_data="Florian")
message_data_node_cfg = Config.configure_data_node(id="message")
build_msg_task_cfg = Config.configure_task("build_msg", build_message, name_data_node_cfg, message_data_node_cfg)
scenario_cfg = Config.configure_scenario("scenario", task_configs=[build_msg_task_cfg])
value = "Default text"
#### Notification function to be called ####
def notify_users_of_creation(state):
state.value = "Scenario created and submitted"
notify(state, "s", "Scenario Created")
def notify_users_of_update(state, new_value_of_dn):
print("Value of Data Node:", new_value_of_dn)
state.value = f"Data Node updated with value: {new_value_of_dn}"
notify(state, "i", "Data Node Updated")
class SpecificCoreConsumer(CoreEventConsumerBase):
def __init__(self, gui):
self.gui = gui
reg_id, queue = Notifier.register() # Adapt the registration to the events you want to listen to
super().__init__(reg_id, queue)
def process_event(self, event):
if event.operation == EventOperation.CREATION:
if event.entity_type == EventEntityType.SCENARIO:
self.gui.broadcast_callback(notify_users_of_creation)
elif event.operation == EventOperation.UPDATE:
if event.entity_type == EventEntityType.SUBMISSION:
print(event)
if event.attribute_value == SubmissionStatus.COMPLETED:
scenario_id = event.metadata["origin_entity_id"]
scenario = tp.get(scenario_id)
new_value_of_dn = scenario.message.read()
self.gui.broadcast_callback(notify_users_of_update, [new_value_of_dn])
#### Normal callbacks ####
def create_and_submit_scenario(state):
scenario = tp.create_scenario(config=scenario_cfg)
tp.submit(scenario)
#### Page ####
with tgb.Page() as page:
tgb.text("{value}")
tgb.button("Create and submit a scenario!", on_action=create_and_submit_scenario)
if __name__ == "__main__":
orchestrator = Orchestrator()
gui = Gui(page)
orchestrator.run()
SpecificCoreConsumer(gui).start()
gui.run()
|
This snippet shows how you can capture and process events to notify users whenever
a new scenario is created, or a data node's value is updated.
For more details, see the registration page.
External API triggered with Taipy Event Consumers
This example illustrates leveraging event-driven programming to monitor and respond to
specific event types. By implementing a custom event consumer, JobFailureCoreConsumer
,
developers can easily trigger external API calls based on specific job status updates. This
approach ensures that critical job status changes are promptly communicated to external systems,
enhancing the application's monitoring and integration with third-party systems.
This script defines a custom event consumer class JobFailureCoreConsumer
, which listens
for all events published by Taipy, when a JOB
entity's status
attribute is UPDATE
,
and triggers an external API call based on the JOB
's id
.
Example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 | import requests
import taipy as tp
import taipy.gui.builder as tgb
from taipy import Config, Gui, Orchestrator
from taipy.core import SubmissionStatus
from taipy.core.job.status import Status
from taipy.core.notification import CoreEventConsumerBase, EventEntityType, EventOperation, Notifier
from taipy.gui import notify
##### Configuration and Functions #####
def fail_task(name: str):
raise Exception(f"This function is trigger by {name} and is supposed to fail, and it did!")
name_data_node_cfg = Config.configure_data_node(id="input_name", default_data="Florian")
message_data_node_cfg = Config.configure_data_node(id="message")
build_msg_task_cfg = Config.configure_task("build_msg", fail_task, name_data_node_cfg, message_data_node_cfg)
scenario_cfg = Config.configure_scenario("scenario", task_configs=[build_msg_task_cfg])
value = "Default text"
#### Notification function to be called ####
def trigger_api_of_job_failure(job_id):
requests.get("http://127.0.0.1:5000/replace-this-with-your-api", params={"message": f"Job {job_id} failed."})
class JobFailureCoreConsumer(CoreEventConsumerBase):
def __init__(self):
reg_id, queue = Notifier.register(
entity_type=EventEntityType.JOB, operation=EventOperation.UPDATE, attribute_name="status"
) # Adapt the registration to the events you want to listen to
super().__init__(reg_id, queue)
def process_event(self, event):
if event.attribute_value == Status.FAILED:
trigger_api_of_job_failure(event.entity_id)
#### Normal callbacks ####
def create_and_submit_scenario(state):
scenario = tp.create_scenario(config=scenario_cfg)
tp.submit(scenario)
#### Page ####
with tgb.Page() as page:
tgb.text("{value}")
tgb.button("Create and submit a scenario!", on_action=create_and_submit_scenario)
if __name__ == "__main__":
orchestrator = Orchestrator()
gui = Gui(page)
orchestrator.run()
JobFailureCoreConsumer().start()
gui.run()
|
This snippet shows how you can capture and process JOB
events when an UPDATE
is made to the status
of the JOB
and request an external API.
For more details, see the registration page.