Skip to content

Track activities and Trigger actions

In this section, we explore how to track activity and trigger actions by registering and processing events.

What is an Event?

Taipy, particularly its scenario and management capabilities, is designed to be natively multi-user and asynchronous. This means you cannot control when an action is completed (e.g., an end-user creating a scenario, a submission failing, a data node being edited, a job completing). To handle these situations, Taipy emits events that you can consume and react to. An event is a message emitted by Taipy when something occurs in the system. It is represented by an object of class Event. An event contains attributes necessary to identify the change.

Scenario creation event

For example, when a scenario is created, an event is emitted with the following attributes:

Events are particularly useful for:

  • Updating a user interface (e.g., refreshing a list of scenarios when a new one is created)
  • Triggering actions (e.g., automatically submitting a scenario when its input is updated)
  • Notifying end-users (e.g., sending a GUI notification when a job fails)
  • And more...

For more details, see the event description page.

What is a Registration?

Taipy provides the Notifier.register() method to register for events. When you register, you specify parameters that define the events you want to receive. These parameters allow you to filter the events you are interested in. For more details, see the registration page.

How to process events?

To process events, follow these steps:

  1. Create a new consumer class and extend it from CoreEventConsumerBase.
  2. Implement the process_event method to define your specific event-handling behavior.
  3. Register the consumer using the Notifier.register() method to obtain a registration_id and a registered_queue. These values are used to instantiate a consumer that can start consuming events.
  4. You can stop the consumer and unregister from the Notifier using the registration_id from earlier.

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
from time import sleep

from taipy.core.notification import CoreEventConsumerBase, Event, Notifier


# Define a custom event consumer.
class MyEventConsumer(CoreEventConsumerBase):
    def process_event(self, event: Event):
        # Custom event processing logic here
        print(f"Received a {event.entity_type} {event.operation} event at : {event.creation_date}")


if __name__ == "__main__":
    import taipy as tp
    from taipy import Config

    # Create a scenario configuration.
    some_datanode_cfg = Config.configure_data_node("data", default_data="Some content.")
    print_task_cfg = Config.configure_task("print", print, some_datanode_cfg)
    scenario_config = Config.configure_scenario("scenario", [print_task_cfg])

    # Run the core service.
    orchestrator = tp.Orchestrator().run()

    # Register to the Notifier to retrieve a registration id and a registered queue.
    registration_id, registered_queue = Notifier.register()

    # Create a consumer and start it.
    consumer = MyEventConsumer(registration_id, registered_queue)
    consumer.start()

    # The scenario creation and submission will trigger event emissions.
    scenario = tp.create_scenario(scenario_config)
    submission = tp.submit(scenario)

    # The events are processed in parallel by the consumer.
    # So we need to wait for the consumer to process the events.
    sleep(1)

    # Stop the consumer and unregister from the Notifier.
    consumer.stop()
    Notifier.unregister(registration_id)

This snippet shows a generic way to process all events. However, you might want to specify the types of events you are interested in to avoid processing all the events. For more details, see the registration page.

For more realistic examples, see the common use cases page.