CoreEventConsumerBase class
Bases: Thread
Abstract base class for implementing a Core event consumer.
This class provides a framework for consuming events from a queue in a separate thread.
It should be subclassed, and the process_event
method should be implemented to define
the custom logic for handling incoming events.
Basic usage
class MyEventConsumer(CoreEventConsumerBase):
def process_event(self, event: Event):
# Custom event processing logic here
print(f"Received event created at : {event.creation_date}")
pass
if __name__ == "__main__":
registration_id, registered_queue = Notifier.register(
entity_type=EventEntityType.SCENARIO,
operation=EventOperation.CREATION
)
consumer = MyEventConsumer(registration_id, registered_queue)
consumer.start()
# ...
consumer.stop()
Notifier.unregister(registration_id)
Firstly, we would create a consumer class extending from CoreEventConsumerBase and decide how to process the incoming events by defining the process_event. Then, we would specify the type of event we want to receive by registering with the Notifier. After that, we create an object of the consumer class by providing the registration_id and registered_queue and start consuming the event.
Methods¶
__init__() ¶
__init__(registration_id: str, queue: SimpleQueue) -> None
Initialize a CoreEventConsumerBase instance.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
str
|
A unique identifier of the registration. You can get a
registration id invoking |
required |
|
SimpleQueue
|
The queue from which events will be consumed. You can get a
queue invoking |
required |
process_event()
abstractmethod
¶
process_event(event: Event) -> None
This method should be overridden in subclasses to define how events are processed.