Registration

Taipy exposes the Notifier.register() method to register to events. The registration result is passed to a consumer that processes the events. When you register, you specify parameters that define the events you want to process. These parameters and the registration mechanism allows you to tailor your event consumer to your application's precise needs. For example, you can register to:

  • All events emitted
  • All operations related to scenarios
  • All operations related to a specific data node
  • All job creations
  • A specific data node update
  • A sequence submission
  • A scenario deletion
  • Job failures

To register for event notifications, use the Notifier.register() method. The following parameters define the events you want to process, like a topic of interest:

  1. entity_type

    • Type: EventEntityType (Enum)
    • Description: Specifies the entity type for which you want to receive notifications. If omitted, the consumer will be called for events across all entity types. The possible entity types are:

      • CYCLE
      • SCENARIO
      • SEQUENCE
      • TASK
      • DATA_NODE
      • JOB
      • SUBMISSION
  2. entity_id

    • Type: str
    • Description: Identifies the specific entity instance to register for. If omitted, the consumer will be called of events for all entities of the specified entity_type.
  3. operation

    • Type: EventOperation (Enum)
    • Description: Specifies the type of operation to monitor (e.g., CREATION, UPDATE, DELETION, SUBMISSION). If omitted, the consumer will be called for all operations performed on the specified entity_type.
  4. attribute_name

    • Type: str
    • Description: Targets a specific attribute within an entity. This is particularly useful when you are interested in changes to a particular attribute, such as when an attribute's value is updated. If omitted, the consumer will be called for changes to all attributes.

Example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from time import sleep

from taipy.core.notification import CoreEventConsumerBase, Event, EventEntityType, EventOperation, Notifier


def double(nb):
    return nb * 2


# Define a custom event consumer.
class MyEventConsumer(CoreEventConsumerBase):
    def process_event(self, event: Event):
        # Custom event processing logic here'
        print(f"Received event of type: {event.entity_type}; and of operation: {event.operation}.")


if __name__ == "__main__":
    import taipy as tp
    from taipy import Config

    print(f"(1) Number of jobs: {len(tp.get_jobs())}.")

    # Create a scenario configuration with 2 sequential tasks.
    input_data_node_cfg = Config.configure_data_node("my_input", default_data=21)
    print_task_cfg = Config.configure_task("print_task", print, input_data_node_cfg)
    scenario_config = Config.configure_scenario("my_scenario", [print_task_cfg])

    # Run the core service.
    tp.Orchestrator().run()

    # Register to the Notifier to retrieve events related to all scenarios' creations.
    registration_id, registered_queue = Notifier.register(EventEntityType.SCENARIO, operation=EventOperation.CREATION)

    # Create a consumer and start it.
    consumer = MyEventConsumer(registration_id, registered_queue)
    consumer.start()

    # Create a scenario and submit it.
    scenario = tp.create_scenario(scenario_config)

    sleep(1)

    # Stop the consumer and unregister from the Notifier.
    consumer.stop()
    Notifier.unregister(registration_id)

To see complete and realistic examples, see the examples page.