Skip to content

EventProcessor class

Bases: _CoreEventConsumerBase

The Taipy event processor service.

This service listens for events in a Taipy application and triggers callback executions when events matching specific topics are produced. The service handle both cases where callbacks are broadcast to all states or executed once on the server side.

The main method to use is on_event(), that registers a callback to a topic.

Before starting the event processor service, register each callback to a topic. The topics are defined by the entity type, the entity id, the operation, and the attribute name of the events. If an event matching the provided topic is produced, the callback execution is triggered.

For more information about the event attributes please refer to the Event class.

Filters

For each registered callback, you can specify a custom filter function in addition to the topic. This is mostly useful when your filter is more complex than the topic. The filter must accept an event as the only argument and return a boolean. If the filter returns False on an event, the callback is not triggered. See an example below.

Callback extra arguments

For each registered callback, you can also specify extra arguments to be passed to the callback function in addition to the event. The extra arguments must be provided as a list of values.

Broadcast a callback to all states

When registering a callback, you can specify if the callback is automatically broadcast to all states. In this case, the first argument of the callback must be the state otherwise it is the Gui instance. The second argument is the event. Optionally, the callback can accept more extra arguments (see the callback_args argument).

Example

from taipy import Event, EventProcessor, Gui

def event_received(gui: Gui, event: Event):
    print(f"Received event created at : {event.creation_date}")

if __name__ == "__main__":
    event_processor = EventProcessor()
    event_processor.on_event(callback=event_received)
    event_processor.start()
from taipy import Event, EventProcessor, Gui

def on_entity_creation(event: Event, gui: Gui):
    print(f" {event.entity_type} entity created at {event.creation_date}")

def on_scenario(event: Event, gui: Gui):
    print(f"Scenario '{event.entity_id}' processed for a '{event.operation}' operation.")

if __name__ == "__main__":
    event_processor = EventProcessor()
    event_processor.on_event(callback=on_entity_creation, operation=EventOperation.CREATION)
    event_processor.on_event(callback=on_scenario, entity_type=EventEntityType.SCENARIO)
    event_processor.start()
import taipy as tp
from taipy import Event, EventProcessor, Gui

def event_received(state, event: Event):
    scenario = tp.get(event.entity_id)
    print(f"Received event created at : {event.creation_date} for scenario '{scenario.name}'.")

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.broadcast_on_event(callback=event_received)
    event_processor.start()
    taipy.run(gui)
import taipy as tp
from taipy import Event, EventProcessor, Gui, State

def print_scenario_created(event: Event, scenario: Scenario, gui: Gui):
    print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")

def store_latest_scenario(state: State, event: Event, scenario: Scenario):
    print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
    state.latest_scenario = scenario

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.on_scenario_created(callback=print_scenario_created)
    event_processor.broadcast_on_scenario_created(callback=store_latest_scenario)
    event_processor.start()
    ...
    taipy.run(gui)
import taipy as tp
from taipy import Event, EventProcessor, Gui

def cycle_filter(event: Event, gui: Gui):
    scenario = tp.get(event.entity_id)
    return scenario.cycle.name == "2023"

def event_received(state, event: Event):
    scenario = tp.get(event.entity_id)
    cycle = scenario.cycle
    print(f"Received event for scenario '{scenario.name}' in cycle 'cycle.name'.")

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.broadcast_on_event(
        callback=event_received,
        entity_type=EventEntityType.SCENARIO,
        filter=cycle_filter)
    event_processor.start()
    taipy.run(gui)

Others methods such as on_data_node_written() or on_submission_finished() are utility methods as shortcuts to easily register callbacks for predefined topics and filters.

Methods

__init__()

__init__(gui: Optional[Gui] = None) -> None

Initialize the Event Processor service.

Parameters:

Name Type Description Default
gui Gui

The Gui instance used to broadcast the callbacks to all states.

None

broadcast_on_datanode_created()

broadcast_on_datanode_created(
    callback: Callable,
    callback_args: Optional[List] = None,
    datanode_config: Union[
        str, DataNodeConfig, List, None
    ] = None,
) -> EventProcessor

Register a callback to be executed for each state on data node creation event.

Example

import taipy as tp
from taipy import Event, EventProcessor, Gui, State
from taipy.gui import notify

def on_datanode_creations(state: State, event: Event, datanode: DataNode):
    print(f"Datanode created at '{event.creation_date}'.")
    notify(state, f"Datanode '{datanode.id}' created.")

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.broadcast_on_datanode_created(callback=record_creations)
    event_processor.start()
    ...
    taipy.run(gui)

Parameters:

Name Type Description Default
callback callable

The callback to be executed on data node creation events.

def on_event_received(state: State, event: Event, datanode: DataNode):
    ...
The callback takes the state, the event, the datanode as arguments. Optionally, the callback can accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the state, the event, and the datanode.

None
datanode_config Union[str, ScenarioConfig, List, None]

The optional datanode configuration ids or datanode configurations for which the callback is registered. If None, the callback is registered for all datanode configurations.

None

Returns:

Name Type Description
EventProcessor EventProcessor

The current instance of the EventProcessor service.

broadcast_on_datanode_deleted()

broadcast_on_datanode_deleted(
    callback: Callable,
    callback_args: Optional[List] = None,
    datanode_config: Union[
        str, DataNodeConfig, List, None
    ] = None,
) -> EventProcessor

Register a callback for each state on data node deletion events.

Example

import taipy as tp
from taipy import Event, EventProcessor, Gui, State

def record_deletions(state: State, event: Event, datanode_id: str):
    print(f"Datanode deleted at '{event.creation_date}'.")
    state.deleted_datanodes.append[datanode_id]

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.broadcast_on_datanode_deleted(callback=record_deletions)
    event_processor.start()
    ...
    taipy.run(gui)

Parameters:

Name Type Description Default
callback callable

The callback to be executed for each state on data node deletion events.

def on_event_received(state: State, event: Event, datanode_id: str):
    ...
The callback takes the state, the event, the datanode id, and the GUI instance as arguments. Optionally, it can accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the state, the event, and the datanode id.

None
datanode_config Union[str, DataNodeConfig, List, None]

The optional datanode configuration ids or datanode configurations for which the callback is registered. If None, the callback is registered for all datanode configurations.

None

Returns:

Name Type Description
EventProcessor EventProcessor

The current instance of the EventProcessor service.

broadcast_on_datanode_written()

broadcast_on_datanode_written(
    callback: Callable,
    callback_args: Optional[List] = None,
    datanode_config: Union[
        str, DataNodeConfig, List, None
    ] = None,
) -> EventProcessor

Register a callback for data node written events.

The callback is triggered when a datanode is written (see methods DataNode.write() or DataNode.append()).

Example

import taipy as tp
from taipy import Event, EventProcessor, Gui, State

def last_data_edition(state: State, event: Event, datanode: DataNode, data: Any):
    print(f"Datanode written at '{event.creation_date}'.")
    state.last_data_edition.append[datanode.id]

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.broadcast_on_datanode_written(callback=last_data_edition)
    event_processor.start()
    ...
    taipy.run(gui)

Parameters:

Name Type Description Default
callback callable

The callback to be executed for all states on data node written events.

def on_event_received(state: State, event: Event, datanode: DataNode, data: Any):
    ...
The callback takes the state, the event, the datanode, the data, and the GUI instance as arguments. Optionally, the callback can accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the state, the event, the datanode, and the data.

None
datanode_config Union[str, DataNodeConfig, List, None]

The optional datanode configuration ids or datanode configurations for which the callback is registered. If None, the callback is registered for all datanode configurations.

None

Returns:

Name Type Description
EventProcessor EventProcessor

The current instance of the EventProcessor service.

broadcast_on_event()

broadcast_on_event(
    callback: Callable,
    callback_args: Optional[List] = None,
    entity_type: Optional[EventEntityType] = None,
    entity_id: Optional[str] = None,
    operation: Optional[EventOperation] = None,
    attribute_name: Optional[str] = None,
    filter: Optional[Callable[[Event], bool]] = None,
) -> EventProcessor

Register a callback to be broadcast to all states on a specific event.

Parameters:

Name Type Description Default
callback callable

The callback to be executed for each state when the event is produced. The callback takes the state as the first argument and the event as the second argument.

def on_event_received(state, event: Event):
    ...
Optionally, the callback can accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the state and the event.

None
entity_type Optional[EventEntityType]

The entity type of the event. If None, the callback is registered for all entity types.

None
entity_id Optional[str]

The entity id of the event. If None, the callback is registered for all entities.

None
operation Optional[EventOperation]

The operation of the event. If None, the callback is registered for all operations.

None
attribute_name Optional[str]

The attribute name of an update event. If None, the callback is registered for all attribute names.

None
filter Optional[Callable[[Event], bool]]

A custom filter to apply to the event before triggering the callback. The filter must accept an event as the only argument and return a boolean. If the filter returns False, the callback is not triggered.

None

broadcast_on_scenario_created()

broadcast_on_scenario_created(
    callback: Callable,
    callback_args: Optional[List] = None,
    scenario_config: Union[
        str, ScenarioConfig, List, None
    ] = None,
) -> EventProcessor

Register a callback executed for all states on scenario creation events.

Example

import taipy as tp
from taipy import Event, EventProcessor, Gui, State

def store_latest_scenario(state: State, event: Event, scenario: Scenario):
    print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
    state.latest_scenario = scenario

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.broadcast_on_scenario_created(callback=store_latest_scenario)
    event_processor.start()
    ...
    taipy.run(gui)
import taipy as tp
from taipy import Event, EventProcessor, Gui

def scenario_created(state, event: Event, scenario: Scenario):
    print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")
    state.latest_scenario = scenario

if __name__ == "__main__":
    event_processor = EventProcessor()
    event_processor.broadcast_on_scenario_created(callback=scenario_created, scenario_config="my_cfg")
    event_processor.start()
    ...

Parameters:

Name Type Description Default
callback callable

The callback to be executed for each state when a scenario creation event occurs.

def on_event_received(state: State, event: Event, scenario: Scenario):
    ...
The callback takes the state, the event, and the scenario as arguments. Optionally, the callback can accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the state, the event, and the scenario.

None
scenario_config Union[str, ScenarioConfig, List, None]

The optional scenario configuration ids or scenario configurations for which the callback is registered. If None, the callback is registered for all scenario configurations.

None

Returns:

Name Type Description
EventProcessor EventProcessor

The current instance of the EventProcessor service.

broadcast_on_scenario_deleted()

broadcast_on_scenario_deleted(
    callback: Callable,
    callback_args: Optional[List] = None,
    scenario_config: Union[
        str, ScenarioConfig, List, None
    ] = None,
) -> EventProcessor

Register a callback executed for all states on scenario deletion events.

Example

import taipy as tp
from taipy import Event, EventProcessor, Gui, State
from taipy.gui import notify

def on_scenario_deleted(state: State, event: Event, scenario_id: str):
    notify(state, f"A scenario has been deleted at '{event.creation_date}'.")

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.broadcast_on_scenario_deleted(callback=on_scenario_deleted)
    event_processor.start()
    ...
    taipy.run(gui)

Parameters:

Name Type Description Default
callback Callable

The callback to be executed for each state on scenario deletion event.

def on_event_received(state: State, event: Event, scenario_id: str):
    ...
The callback takes the state, the event, and the scenario id as arguments. Optionally, it can also accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the state, the event, and the scenario id.

None
scenario_config Union[str, ScenarioConfig, List, None]

The optional scenario configuration ids or scenario configurations for which the callback is registered. If None, the callback is registered for all scenario configurations.

None

Returns:

Name Type Description
EventProcessor EventProcessor

The current instance of the EventProcessor service.

broadcast_on_submission_finished()

broadcast_on_submission_finished(
    callback: Callable,
    callback_args: Optional[List] = None,
    config_ids: Union[
        str, ScenarioConfig, TaskConfig, List, None
    ] = None,
) -> EventProcessor

Register a callback to be executed for each state on submission finished events.

Example

import taipy as tp
from taipy import Event, EventProcessor, Gui, State

def record_submissions(state: State, event: Event, submittable: Submittable, submission: Submission):
    print(f"Submission finished at '{event.creation_date}'. Status: '{submission.submission_status}'")
    if submission.submission_status == SubmissionStatus.COMPLETED:
        state.completed.append[submittable.id]
    elif submission.submission_status == SubmissionStatus.FAILED:
        state.failed.append[submittable.id]

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.on_submission_finished(callback=record_submissions, broadcast=True)
    event_processor.start()
    ...
    taipy.run(gui)

Parameters:

Name Type Description Default
callback callable

The callback to be executed for each state on submission finished events.

def on_event_received(state: State, event: Event, submittable: Submittable, submission: Submission):
    ...
The callback takes the state, the event, the submittable (scenario, sequence or task), and the submission. Optionally, the callback can accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the state, the event, the submittable, and the submission.

None
config_ids Union[str, ScenarioConfig, TaskConfig, List, None]

The optional scenario configuration ids or task configuration ids or scenario configurations or task configurations for which the callback is registered. If None, the callback is registered for any submittable.

None

Returns:

Name Type Description
EventProcessor EventProcessor

The current instance of the EventProcessor service.

on_datanode_created()

on_datanode_created(
    callback: Callable,
    callback_args: Optional[List] = None,
    datanode_config: Union[
        str, DataNodeConfig, List, None
    ] = None,
) -> EventProcessor

Register a callback to be executed on data node creation event.

Example

import taipy as tp
from taipy import Event, EventProcessor, Gui, State

def on_datanode_creations(event: Event, datanode: DataNode, gui: Gui):
    print(f"Datanode created at '{event.creation_date}'.")

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.on_datanode_created(callback=record_creations)
    event_processor.start()
    ...
    taipy.run(gui)

Parameters:

Name Type Description Default
callback callable

The callback to be executed on data node creation events.

def on_event_received(event: Event, datanode: DataNode, gui: Gui):
    ...
The callback takes the event, the datanode, and the GUI instance as arguments. Optionally, the callback can accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the event, the datanode, and the GUI.

None
datanode_config Union[str, ScenarioConfig, List, None]

The optional datanode configuration ids or datanode configurations for which the callback is registered. If None, the callback is registered for all datanode configurations.

None

Returns:

Name Type Description
EventProcessor EventProcessor

The current instance of the EventProcessor service.

on_datanode_deleted()

on_datanode_deleted(
    callback: Callable,
    callback_args: Optional[List] = None,
    datanode_config: Union[
        str, DataNodeConfig, List, None
    ] = None,
) -> EventProcessor

Register a callback for data node deletion events.

Example

import taipy as tp
from taipy import Event, EventProcessor, Gui, State

def on_deletions(event: Event, datanode_id: str, gui: Gui):
    print(f"Datanode deleted at '{event.creation_date}'.")

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.on_datanode_deleted(callback=record_deletions)
    event_processor.start()
    ...
    taipy.run(gui)

Parameters:

Name Type Description Default
callback callable

The callback to be executed when consuming the event.

def on_event_received(event: Event, datanode_id: str, gui: Gui):
    ...
The callback takes the event, the datanode id, and the GUI instance as arguments. Optionally, it can accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the event, the datanode id, and the GUI.

None
datanode_config Union[str, DataNodeConfig, List, None]

The optional datanode configuration ids or datanode configurations for which the callback is registered. If None, the callback is registered for all datanode configurations.

None

Returns:

Name Type Description
EventProcessor EventProcessor

The current instance of the EventProcessor service.

on_datanode_written()

on_datanode_written(
    callback: Callable,
    callback_args: Optional[List] = None,
    datanode_config: Union[
        str, DataNodeConfig, List, None
    ] = None,
) -> EventProcessor

Register a callback for data node written events.

The callback is triggered when a datanode is written (see methods DataNode.write() or DataNode.append()).

Example

import taipy as tp
from taipy import Event, EventProcessor, Gui, State

def last_data_edition(event: Event, datanode: DataNode, data: Any, gui: Gui):
    print(f"Datanode written at '{event.creation_date}'.")
    state.last_data_edition.append[datanode.id]

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.on_datanode_written(callback=last_data_edition, broadcast=True)
    event_processor.start()
    ...
    taipy.run(gui)

Parameters:

Name Type Description Default
callback callable

The callback to be executed when consuming the event.

def on_event_received(event: Event,
                      datanode: DataNode,
                      data: Any,
                      gui: Gui):
    ...
The callback takes the event, the datanode, the data, and the GUI instance as arguments. Optionally, the callback can accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the event, the datanode, the data, and the GUI.

None
datanode_config Union[str, DataNodeConfig, List, None]

The optional datanode configuration ids or datanode configurations for which the callback is registered. If None, the callback is registered for all datanode configurations.

None

Returns:

Name Type Description
EventProcessor EventProcessor

The current instance of the EventProcessor service.

on_event()

on_event(
    callback: Callable,
    callback_args: Optional[List] = None,
    entity_type: Optional[EventEntityType] = None,
    entity_id: Optional[str] = None,
    operation: Optional[EventOperation] = None,
    attribute_name: Optional[str] = None,
    filter: Optional[Callable[[Event], bool]] = None,
) -> EventProcessor

Register a callback to be executed on a specific event.

Parameters:

Name Type Description Default
callback callable

The callback to be executed when the event is produced. The callback takes the event as the first argument and the GUI instance as the second argument.

def on_event_received(event: Event, gui: Gui):
    ...
Optionally, the callback can accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the event and the GUI.

None
entity_type Optional[EventEntityType]

The entity type of the event. If None, the callback is registered for all entity types.

None
entity_id Optional[str]

The entity id of the event. If None, the callback is registered for all entities.

None
operation Optional[EventOperation]

The operation of the event. If None, the callback is registered for all operations.

None
attribute_name Optional[str]

The attribute name of an update event. If None, the callback is registered for all attribute names.

None
filter Optional[Callable[[Event], bool]]

A custom filter to apply to the event before triggering the callback. The filter must accept an event as the only argument and return a boolean. If the filter returns False, the callback is not triggered.

None

on_scenario_created()

on_scenario_created(
    callback: Callable,
    callback_args: Optional[List] = None,
    scenario_config: Union[
        str, ScenarioConfig, List, None
    ] = None,
) -> EventProcessor

Register a callback for scenario creation events.

Example

import taipy as tp
from taipy import Event, EventProcessor, Gui, State

def print_scenario_created(event: Event, scenario: Scenario, gui: Gui):
    print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.on_scenario_created(callback=print_scenario_created)
    event_processor.start()
    ...
    taipy.run(gui)
import taipy as tp
from taipy import Event, EventProcessor, Gui

def print_scenario_created(event: Event, scenario: Scenario, gui: Gui):
    print(f"Scenario '{scenario.name}' created at '{event.creation_date}'.")

if __name__ == "__main__":
    event_processor = EventProcessor()
    event_processor.on_scenario_created(callback=print_scenario_created, scenario_config="my_cfg")
    event_processor.start()
    ...

Parameters:

Name Type Description Default
callback callable

The callback to be executed when consuming the event.

def on_event_received(event: Event, scenario: Scenario, gui: Gui):
    ...
The callback is triggered when a scenario is created. It takes the event the scenario, and the GUI instance as arguments. It can also accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the event, the scenario and the GUI.

None
scenario_config Union[str, ScenarioConfig, List, None]

The optional scenario configuration ids or scenario configurations for which the callback is registered. If None, the callback is registered for all scenario configurations.

None

Returns:

Name Type Description
EventProcessor EventProcessor

The current instance of the EventProcessor service.

on_scenario_deleted()

on_scenario_deleted(
    callback: Callable,
    callback_args: Optional[List] = None,
    scenario_config: Union[
        str, ScenarioConfig, List, None
    ] = None,
) -> EventProcessor

Register a callback for scenario deletion events.

Example

import taipy as tp
from taipy import Event, EventProcessor, Gui, State

def print_scenario_deleted(event: Event, scenario_id: str, gui: Gui):
    print(f"A scenario has been deleted at '{event.creation_date}'.")

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.on_scenario_deleted(callback=print_scenario_)
    event_processor.on_scenario_deleted(callback=print_scenario_deleted)
    event_processor.start()
    ...
    taipy.run(gui)

Parameters:

Name Type Description Default
callback callable

The callback to be executed on scenario deletion event.

def on_event_received(event: Event, scenario_id: str, gui: Gui):
    ...
The callback takes the event, the scenario id, and the GUI instance as arguments. Optionally, it can also accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the event, the scenario id, and the GUI.

None
scenario_config Union[str, ScenarioConfig, List, None]

The optional scenario configuration ids or scenario configurations for which the callback is registered. If None, the callback is registered for all scenario configurations.

None

Returns:

Name Type Description
EventProcessor EventProcessor

The current instance of the EventProcessor service.

on_submission_finished()

on_submission_finished(
    callback: Callable,
    callback_args: Optional[List] = None,
    config_ids: Union[
        str, ScenarioConfig, TaskConfig, List, None
    ] = None,
) -> EventProcessor

Register a callback for submission finished events.

Example

import taipy as tp
from taipy import Event, EventProcessor, Gui, State

def record_submissions(event: Event, submittable: Submittable, submission: Submission, gui: Gui):
    if submission.submission_status == SubmissionStatus.COMPLETED:
        print(f"Submission completed at '{event.creation_date}'. Status: '{submission.submission_status}'")
    elif submission.submission_status == SubmissionStatus.FAILED:
        print(f"Submission failed at '{event.creation_date}'. Status: '{submission.submission_status}'")

if __name__ == "__main__":
    gui = Gui()
    event_processor = EventProcessor(gui)
    event_processor.on_submission_finished(callback=record_submissions)
    event_processor.start()
    ...
    taipy.run(gui)

Parameters:

Name Type Description Default
callback callable

The callback to be executed on submission finished events.

def on_event_received(event: Event, submittable: Submittable, submission: Submission, gui: Gui):
    ...
The callback takes the event, the submittable (scenario, sequence or task), the submission, and the GUI instance as arguments. Optionally, the callback can accept extra arguments (see the callback_args argument).

required
callback_args List[AnyOf]

The extra arguments to be passed to the callback function in addition to the event, the submittable, the submission, and the GUI.

None
config_ids Union[str, ScenarioConfig, TaskConfig, List, None]

The optional scenario configuration ids or task configuration ids or scenario configurations or task configurations for which the callback is registered. If None, the callback is registered for any submittable.

None

Returns:

Name Type Description
EventProcessor EventProcessor

The current instance of the EventProcessor service.

process_event()

process_event(event: Event) -> None

Process an event.

This method is responsible for processing the incoming event.

Parameters:

Name Type Description Default
event Event

The event to be processed.

required

start()

start()

Start the event processor thread.

stop()

stop()

Stop the event processor thread.