EventProcessor class
Bases: _CoreEventConsumerBase
The Taipy event processor service.
This service listens for events in a Taipy application and triggers callback executions when events matching specific topics are produced. The service handle both cases where callbacks are broadcast to all states or executed once on the server side.
The main method to use is on_event(), that registers a callback to a topic.
Before starting the event processor service, register each callback to a topic. The topics are defined by the entity type, the entity id, the operation, and the attribute name of the events. If an event matching the provided topic is produced, the callback execution is triggered.
For more information about the event attributes please refer to the Event class.
Filters
For each registered callback, you can specify a custom filter function in addition to the topic. This is mostly useful when your filter is more complex than the topic. The filter must accept an event as the only argument and return a boolean. If the filter returns False on an event, the callback is not triggered. See an example below.
Callback extra arguments
For each registered callback, you can also specify extra arguments to be passed to the callback function in addition to the event. The extra arguments must be provided as a list of values.
Broadcast a callback to all states
When registering a callback, you can specify if the callback is automatically
broadcast to all states. In this case, the first argument of the callback must be
the state otherwise it is the Gui instance. The second argument is the event.
Optionally, the callback can accept more extra arguments (see the callback_args
argument).
Example
from taipy import Event, EventProcessor, Gui
def event_received(gui: Gui, event: Event):
print(f"Received event created at : {event.creation_date}")
if __name__ == "__main__":
event_processor = EventProcessor()
event_processor.on_event(callback=event_received)
event_processor.start()
from taipy import Event, EventProcessor, Gui
def on_entity_creation(event: Event, gui: Gui):
print(f" {event.entity_type} entity created at {event.creation_date}")
def on_scenario(event: Event, gui: Gui):
print(f"Scenario '{event.entity_id}' processed for a '{event.operation}' operation.")
if __name__ == "__main__":
event_processor = EventProcessor()
event_processor.on_event(callback=on_entity_creation, operation=EventOperation.CREATION)
event_processor.on_event(callback=on_scenario, entity_type=EventEntityType.SCENARIO)
event_processor.start()
import taipy as tp
from taipy import Event, EventProcessor, Gui
def event_received(state, event: Event):
scenario = tp.get(event.entity_id)
print(f"Received event created at : {event.creation_date} for scenario '{scenario.name}'.")
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.broadcast_on_event(callback=event_received)
event_processor.start()
taipy.run(gui)
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
def print_scenario_created(event: Event, scenario: Scenario, gui: Gui):
print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
def store_latest_scenario(state: State, event: Event, scenario: Scenario):
print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
state.latest_scenario = scenario
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.on_scenario_created(callback=print_scenario_created)
event_processor.broadcast_on_scenario_created(callback=store_latest_scenario)
event_processor.start()
...
taipy.run(gui)
import taipy as tp
from taipy import Event, EventProcessor, Gui
def cycle_filter(event: Event, gui: Gui):
scenario = tp.get(event.entity_id)
return scenario.cycle.name == "2023"
def event_received(state, event: Event):
scenario = tp.get(event.entity_id)
cycle = scenario.cycle
print(f"Received event for scenario '{scenario.name}' in cycle 'cycle.name'.")
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.broadcast_on_event(
callback=event_received,
entity_type=EventEntityType.SCENARIO,
filter=cycle_filter)
event_processor.start()
taipy.run(gui)
Others methods such as on_data_node_written() or on_submission_finished() are
utility methods as shortcuts to easily register callbacks for predefined topics and
filters.
Methods¶
__init__() ¶
__init__(gui: Optional[Gui] = None) -> None
Initialize the Event Processor service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gui
|
Gui
|
The Gui instance used to broadcast the callbacks to all states. |
None
|
broadcast_on_datanode_created() ¶
broadcast_on_datanode_created(
callback: Callable,
callback_args: Optional[List] = None,
datanode_config: Union[
str, DataNodeConfig, List, None
] = None,
) -> EventProcessor
Register a callback to be executed for each state on data node creation event.
Example
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
from taipy.gui import notify
def on_datanode_creations(state: State, event: Event, datanode: DataNode):
print(f"Datanode created at '{event.creation_date}'.")
notify(state, f"Datanode '{datanode.id}' created.")
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.broadcast_on_datanode_created(callback=record_creations)
event_processor.start()
...
taipy.run(gui)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed on data node creation events. callback_args argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the state, the event, and the datanode. |
None
|
datanode_config
|
Union[str, ScenarioConfig, List, None]
|
The optional datanode configuration ids or datanode configurations for which the callback is registered. If None, the callback is registered for all datanode configurations. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventProcessor |
EventProcessor
|
The current instance of the |
broadcast_on_datanode_deleted() ¶
broadcast_on_datanode_deleted(
callback: Callable,
callback_args: Optional[List] = None,
datanode_config: Union[
str, DataNodeConfig, List, None
] = None,
) -> EventProcessor
Register a callback for each state on data node deletion events.
Example
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
def record_deletions(state: State, event: Event, datanode_id: str):
print(f"Datanode deleted at '{event.creation_date}'.")
state.deleted_datanodes.append[datanode_id]
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.broadcast_on_datanode_deleted(callback=record_deletions)
event_processor.start()
...
taipy.run(gui)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed for each state on data node deletion events. callback_args argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the state, the event, and the datanode id. |
None
|
datanode_config
|
Union[str, DataNodeConfig, List, None]
|
The optional datanode configuration ids or datanode configurations for which the callback is registered. If None, the callback is registered for all datanode configurations. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventProcessor |
EventProcessor
|
The current instance of the |
broadcast_on_datanode_written() ¶
broadcast_on_datanode_written(
callback: Callable,
callback_args: Optional[List] = None,
datanode_config: Union[
str, DataNodeConfig, List, None
] = None,
) -> EventProcessor
Register a callback for data node written events.
The callback is triggered when a datanode is written (see methods
DataNode.write() or DataNode.append()).
Example
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
def last_data_edition(state: State, event: Event, datanode: DataNode, data: Any):
print(f"Datanode written at '{event.creation_date}'.")
state.last_data_edition.append[datanode.id]
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.broadcast_on_datanode_written(callback=last_data_edition)
event_processor.start()
...
taipy.run(gui)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed for all states on data node written events. callback_args argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the state, the event, the datanode, and the data. |
None
|
datanode_config
|
Union[str, DataNodeConfig, List, None]
|
The optional datanode configuration ids or datanode configurations for which the callback is registered. If None, the callback is registered for all datanode configurations. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventProcessor |
EventProcessor
|
The current instance of the |
broadcast_on_event() ¶
broadcast_on_event(
callback: Callable,
callback_args: Optional[List] = None,
entity_type: Optional[EventEntityType] = None,
entity_id: Optional[str] = None,
operation: Optional[EventOperation] = None,
attribute_name: Optional[str] = None,
filter: Optional[Callable[[Event], bool]] = None,
) -> EventProcessor
Register a callback to be broadcast to all states on a specific event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed for each state when the event is produced. The callback takes the state as the first argument and the event as the second argument. callback_args
argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the state and the event. |
None
|
entity_type
|
Optional[EventEntityType]
|
The entity type of the event. If None, the callback is registered for all entity types. |
None
|
entity_id
|
Optional[str]
|
The entity id of the event. If None, the callback is registered for all entities. |
None
|
operation
|
Optional[EventOperation]
|
The operation of the event. If None, the callback is registered for all operations. |
None
|
attribute_name
|
Optional[str]
|
The attribute name of an update event. If None, the callback is registered for all attribute names. |
None
|
filter
|
Optional[Callable[[Event], bool]]
|
A custom filter to apply to the event before triggering the callback. The filter must accept an event as the only argument and return a boolean. If the filter returns False, the callback is not triggered. |
None
|
broadcast_on_scenario_created() ¶
broadcast_on_scenario_created(
callback: Callable,
callback_args: Optional[List] = None,
scenario_config: Union[
str, ScenarioConfig, List, None
] = None,
) -> EventProcessor
Register a callback executed for all states on scenario creation events.
Example
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
def store_latest_scenario(state: State, event: Event, scenario: Scenario):
print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
state.latest_scenario = scenario
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.broadcast_on_scenario_created(callback=store_latest_scenario)
event_processor.start()
...
taipy.run(gui)
import taipy as tp
from taipy import Event, EventProcessor, Gui
def scenario_created(state, event: Event, scenario: Scenario):
print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
state.latest_scenario = scenario
if __name__ == "__main__":
event_processor = EventProcessor()
event_processor.broadcast_on_scenario_created(callback=scenario_created, scenario_config="my_cfg")
event_processor.start()
...
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed for each state when a scenario creation event occurs. callback_args argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the state, the event, and the scenario. |
None
|
scenario_config
|
Union[str, ScenarioConfig, List, None]
|
The optional scenario configuration ids or scenario configurations for which the callback is registered. If None, the callback is registered for all scenario configurations. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventProcessor |
EventProcessor
|
The current instance of the |
broadcast_on_scenario_deleted() ¶
broadcast_on_scenario_deleted(
callback: Callable,
callback_args: Optional[List] = None,
scenario_config: Union[
str, ScenarioConfig, List, None
] = None,
) -> EventProcessor
Register a callback executed for all states on scenario deletion events.
Example
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
from taipy.gui import notify
def on_scenario_deleted(state: State, event: Event, scenario_id: str):
notify(state, f"A scenario has been deleted at '{event.creation_date}'.")
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.broadcast_on_scenario_deleted(callback=on_scenario_deleted)
event_processor.start()
...
taipy.run(gui)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
Callable
|
The callback to be executed for each state on scenario deletion event. callback_args argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the state, the event, and the scenario id. |
None
|
scenario_config
|
Union[str, ScenarioConfig, List, None]
|
The optional scenario configuration ids or scenario configurations for which the callback is registered. If None, the callback is registered for all scenario configurations. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventProcessor |
EventProcessor
|
The current instance of the |
broadcast_on_submission_finished() ¶
broadcast_on_submission_finished(
callback: Callable,
callback_args: Optional[List] = None,
config_ids: Union[
str, ScenarioConfig, TaskConfig, List, None
] = None,
) -> EventProcessor
Register a callback to be executed for each state on submission finished events.
Example
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
def record_submissions(state: State, event: Event, submittable: Submittable, submission: Submission):
print(f"Submission finished at '{event.creation_date}'. Status: '{submission.submission_status}'")
if submission.submission_status == SubmissionStatus.COMPLETED:
state.completed.append[submittable.id]
elif submission.submission_status == SubmissionStatus.FAILED:
state.failed.append[submittable.id]
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.on_submission_finished(callback=record_submissions, broadcast=True)
event_processor.start()
...
taipy.run(gui)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed for each state on submission finished events. callback_args argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the state, the event, the submittable, and the submission. |
None
|
config_ids
|
Union[str, ScenarioConfig, TaskConfig, List, None]
|
The optional scenario configuration ids or task configuration ids or scenario configurations or task configurations for which the callback is registered. If None, the callback is registered for any submittable. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventProcessor |
EventProcessor
|
The current instance of the |
on_datanode_created() ¶
on_datanode_created(
callback: Callable,
callback_args: Optional[List] = None,
datanode_config: Union[
str, DataNodeConfig, List, None
] = None,
) -> EventProcessor
Register a callback to be executed on data node creation event.
Example
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
def on_datanode_creations(event: Event, datanode: DataNode, gui: Gui):
print(f"Datanode created at '{event.creation_date}'.")
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.on_datanode_created(callback=record_creations)
event_processor.start()
...
taipy.run(gui)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed on data node creation events. callback_args argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the event, the datanode, and the GUI. |
None
|
datanode_config
|
Union[str, ScenarioConfig, List, None]
|
The optional datanode configuration ids or datanode configurations for which the callback is registered. If None, the callback is registered for all datanode configurations. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventProcessor |
EventProcessor
|
The current instance of the |
on_datanode_deleted() ¶
on_datanode_deleted(
callback: Callable,
callback_args: Optional[List] = None,
datanode_config: Union[
str, DataNodeConfig, List, None
] = None,
) -> EventProcessor
Register a callback for data node deletion events.
Example
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
def on_deletions(event: Event, datanode_id: str, gui: Gui):
print(f"Datanode deleted at '{event.creation_date}'.")
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.on_datanode_deleted(callback=record_deletions)
event_processor.start()
...
taipy.run(gui)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed when consuming the event. callback_args argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the event, the datanode id, and the GUI. |
None
|
datanode_config
|
Union[str, DataNodeConfig, List, None]
|
The optional datanode configuration ids or datanode configurations for which the callback is registered. If None, the callback is registered for all datanode configurations. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventProcessor |
EventProcessor
|
The current instance of the |
on_datanode_written() ¶
on_datanode_written(
callback: Callable,
callback_args: Optional[List] = None,
datanode_config: Union[
str, DataNodeConfig, List, None
] = None,
) -> EventProcessor
Register a callback for data node written events.
The callback is triggered when a datanode is written (see methods
DataNode.write() or DataNode.append()).
Example
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
def last_data_edition(event: Event, datanode: DataNode, data: Any, gui: Gui):
print(f"Datanode written at '{event.creation_date}'.")
state.last_data_edition.append[datanode.id]
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.on_datanode_written(callback=last_data_edition, broadcast=True)
event_processor.start()
...
taipy.run(gui)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed when consuming the event. callback_args argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the event, the datanode, the data, and the GUI. |
None
|
datanode_config
|
Union[str, DataNodeConfig, List, None]
|
The optional datanode configuration ids or datanode configurations for which the callback is registered. If None, the callback is registered for all datanode configurations. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventProcessor |
EventProcessor
|
The current instance of the |
on_event() ¶
on_event(
callback: Callable,
callback_args: Optional[List] = None,
entity_type: Optional[EventEntityType] = None,
entity_id: Optional[str] = None,
operation: Optional[EventOperation] = None,
attribute_name: Optional[str] = None,
filter: Optional[Callable[[Event], bool]] = None,
) -> EventProcessor
Register a callback to be executed on a specific event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed when the event is produced. The callback takes the event as the first argument and the GUI instance as the second argument. callback_args
argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the event and the GUI. |
None
|
entity_type
|
Optional[EventEntityType]
|
The entity type of the event. If None, the callback is registered for all entity types. |
None
|
entity_id
|
Optional[str]
|
The entity id of the event. If None, the callback is registered for all entities. |
None
|
operation
|
Optional[EventOperation]
|
The operation of the event. If None, the callback is registered for all operations. |
None
|
attribute_name
|
Optional[str]
|
The attribute name of an update event. If None, the callback is registered for all attribute names. |
None
|
filter
|
Optional[Callable[[Event], bool]]
|
A custom filter to apply to the event before triggering the callback. The filter must accept an event as the only argument and return a boolean. If the filter returns False, the callback is not triggered. |
None
|
on_scenario_created() ¶
on_scenario_created(
callback: Callable,
callback_args: Optional[List] = None,
scenario_config: Union[
str, ScenarioConfig, List, None
] = None,
) -> EventProcessor
Register a callback for scenario creation events.
Example
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
def print_scenario_created(event: Event, scenario: Scenario, gui: Gui):
print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.on_scenario_created(callback=print_scenario_created)
event_processor.start()
...
taipy.run(gui)
import taipy as tp
from taipy import Event, EventProcessor, Gui
def print_scenario_created(event: Event, scenario: Scenario, gui: Gui):
print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
if __name__ == "__main__":
event_processor = EventProcessor()
event_processor.on_scenario_created(callback=print_scenario_created, scenario_config="my_cfg")
event_processor.start()
...
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed when consuming the event. callback_args argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the event, the scenario and the GUI. |
None
|
scenario_config
|
Union[str, ScenarioConfig, List, None]
|
The optional scenario configuration ids or scenario configurations for which the callback is registered. If None, the callback is registered for all scenario configurations. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventProcessor |
EventProcessor
|
The current instance of the |
on_scenario_deleted() ¶
on_scenario_deleted(
callback: Callable,
callback_args: Optional[List] = None,
scenario_config: Union[
str, ScenarioConfig, List, None
] = None,
) -> EventProcessor
Register a callback for scenario deletion events.
Example
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
def print_scenario_deleted(event: Event, scenario_id: str, gui: Gui):
print(f"A scenario has been deleted at '{event.creation_date}'.")
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.on_scenario_deleted(callback=print_scenario_)
event_processor.on_scenario_deleted(callback=print_scenario_deleted)
event_processor.start()
...
taipy.run(gui)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed on scenario deletion event. callback_args argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the event, the scenario id, and the GUI. |
None
|
scenario_config
|
Union[str, ScenarioConfig, List, None]
|
The optional scenario configuration ids or scenario configurations for which the callback is registered. If None, the callback is registered for all scenario configurations. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventProcessor |
EventProcessor
|
The current instance of the |
on_submission_finished() ¶
on_submission_finished(
callback: Callable,
callback_args: Optional[List] = None,
config_ids: Union[
str, ScenarioConfig, TaskConfig, List, None
] = None,
) -> EventProcessor
Register a callback for submission finished events.
Example
import taipy as tp
from taipy import Event, EventProcessor, Gui, State
def record_submissions(event: Event, submittable: Submittable, submission: Submission, gui: Gui):
if submission.submission_status == SubmissionStatus.COMPLETED:
print(f"Submission completed at '{event.creation_date}'. Status: '{submission.submission_status}'")
elif submission.submission_status == SubmissionStatus.FAILED:
print(f"Submission failed at '{event.creation_date}'. Status: '{submission.submission_status}'")
if __name__ == "__main__":
gui = Gui()
event_processor = EventProcessor(gui)
event_processor.on_submission_finished(callback=record_submissions)
event_processor.start()
...
taipy.run(gui)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
callable
|
The callback to be executed on submission finished events. callback_args argument).
|
required |
callback_args
|
List[AnyOf]
|
The extra arguments to be passed to the callback function in addition to the event, the submittable, the submission, and the GUI. |
None
|
config_ids
|
Union[str, ScenarioConfig, TaskConfig, List, None]
|
The optional scenario configuration ids or task configuration ids or scenario configurations or task configurations for which the callback is registered. If None, the callback is registered for any submittable. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
EventProcessor |
EventProcessor
|
The current instance of the |
process_event() ¶
process_event(event: Event) -> None
Process an event.
This method is responsible for processing the incoming event.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
event
|
Event
|
The event to be processed. |
required |