Built-in Processor nodes
Each kind of processor node works on the data in its queue in a specific ways.
Processor
- class pyneurode.processor_node.Processor.Processor
The base class that handle real-time processing
- process(message: Message) Message
The function to be override by subclass to perform realtime processing. The overriden function should work on the passed-in message and return the results as another Message
- Parameters
message (Message) – Data to be processed.
- Raises
NotImplementedError – This function must be implemented in subclasses
- Returns
processed messages
- Return type
Message
- shutdown()
Close the queue and flush all the messages
- startup()
Functions to be first called before processing run. Can be overrided for customized startup behaviour
BatchProcessor
- class pyneurode.processor_node.BatchProcessor.BatchProcessor(interval=0.001, internal_buffer_size=10000, verbose=True)
A BatchProcessor starts an background thread to keep pulling the data, the process() function is called every specified interval. During each specified interval, all the data acquired since last call to process() will be send to the current process
- __init__(interval=0.001, internal_buffer_size=10000, verbose=True)
- Parameters
interval (float, optional) – Time interval at which batches of messages will be processed at once. Defaults to 0.001.
internal_buffer_size (int, optional) – internal butter size at which the messages will be held untill they are processed. Defaults to 10000.
verbose (bool, optional) – _description_. Defaults to True.
- process(messages: Optional[List[Message]] = None)
Process the input messages. A batch of messages will be passed at a batch every time interval
- Parameters
messages (List[Message], optional) – messages to be processed. Defaults to None.
- Raises
NotImplementedError – This function must be implemented in subclasses
- startup()
Functions to be first called before processing run. Can be overrided for customized startup behaviour
SpikeSortProcessor
- class pyneurode.processor_node.SpikeSortProcessor.SpikeSortProcessor(interval=None, internal_buffer_size=1000, min_num_spikes=2000, do_pca=True, time_bin=0.1)
- __init__(interval=None, internal_buffer_size=1000, min_num_spikes=2000, do_pca=True, time_bin=0.1)
- Parameters
interval (float, optional) – Time interval at which batches of messages will be processed at once. Defaults to 0.001.
internal_buffer_size (int, optional) – internal butter size at which the messages will be held untill they are processed. Defaults to 10000.
verbose (bool, optional) – _description_. Defaults to True.
- process(msgs)
Process the input messages. A batch of messages will be passed at a batch every time interval
- Parameters
messages (List[Message], optional) – messages to be processed. Defaults to None.
- Raises
NotImplementedError – This function must be implemented in subclasses
- startup()
Functions to be first called before processing run. Can be overrided for customized startup behaviour
SyncDataProcessor
- class pyneurode.processor_node.SyncDataProcessor.SyncDataProcessor(interval=0.001, internal_buffer_size=10000, verbose=True)
Synchronize two sources of time series data
- process(data)
Process the input messages. A batch of messages will be passed at a batch every time interval
- Parameters
messages (List[Message], optional) – messages to be processed. Defaults to None.
- Raises
NotImplementedError – This function must be implemented in subclasses
- startup()
Functions to be first called before processing run. Can be overrided for customized startup behaviour
GUIProcessor
- class pyneurode.processor_node.GUIProcessor.GUIProcessor(internal_buffer_size=1000, frame_rate=60)
A GUIProcessor is a processor that receive messages for plotting and displaying the GUI.
Note
its process() function will be called frequently. Avoid doing time consuming processing there.
- __init__(internal_buffer_size=1000, frame_rate=60)
- Parameters
internal_buffer_size (int, optional) – size of the internal buffer to hold messages before they are displayed. Defaults to 1000.
- process(messages: Optional[List[Message]] = None)
Process the input messages. A batch of messages will be passed at a batch every time interval
- Parameters
messages (List[Message], optional) – messages to be processed. Defaults to None.
- Raises
NotImplementedError – This function must be implemented in subclasses
- register_visualizer(visualizer: Visualizer, filters: List[str])
Register a Visualizer object with the GUIProcessor. Only message of types specified in the filter list will be passed to the visualizer.
- Parameters
visualizer (Visualizer) – visualizer to be creat the GUI and call during frame update
filters (List[str]) – names of message type to be passed to the visualizer.
- shutdown()
Close the queue and flush all the messages
- startup()
Functions to be first called before processing run. Can be overrided for customized startup behaviour
Visualizers
Visualizers are responsible for visualizing messages on the user interface using dearpygui
Visualizer
- class pyneurode.processor_node.Visualizer.Visualizer(name: str)
The base Visualizer class. Intended to be extended by subclasses. The UI should be constructed using the init_gui() function and updated based on the messages passed to it in the update() function.
Example:
this is an exmaple showing how it works
- __init__(name: str)
- Parameters
name (str) – Name of the visualizer, for identification purpose
- init_gui()
Build the GUI using dearpygui here
- Raises
NotImplementedError – Must be overrided by subclasses.
- update(messages: List[Message])
Update the interface.
- Parameters
messages (List[Message]) – list of messages to be displayed
- Raises
NotImplementedError – Must be overrided by subclasses.
LatencyVisualizer
- class pyneurode.processor_node.LatencyVisualizer.LatencyVisualizer(name: str, subplot_size=(3, 3))
- __init__(name: str, subplot_size=(3, 3)) None
- Parameters
name (str) – Name of the visualizer, for identification purpose
- init_gui()
Build the GUI using dearpygui here
- Raises
NotImplementedError – Must be overrided by subclasses.
- update(messages: List[Message])
check if the metrics is already in the current dictionary
if not, add it to the existing dictionary, update the x axis tick
set the value accordingly
AnalogVisualizer
- class pyneurode.processor_node.AnalogVisualizer.AnalogVisualizer(name: str, buffer_length: int = 500, scale=1, time_scale=None)
Display a analog value
- __init__(name: str, buffer_length: int = 500, scale=1, time_scale=None) None
- Parameters
name (str) – Name of the visualizer, for identification purpose
- init_gui()
Build the GUI using dearpygui here
- Raises
NotImplementedError – Must be overrided by subclasses.
- update(messages: List[Message])
Update the interface.
- Parameters
messages (List[Message]) – list of messages to be displayed
- Raises
NotImplementedError – Must be overrided by subclasses.
SpikeClusterVisualizer
- class pyneurode.processor_node.SpikeClusterVisualizer.SpikeClusterVisualizer(name, num_channel=4, max_spikes=500, max_plot_per_cluster=80)
- __init__(name, num_channel=4, max_spikes=500, max_plot_per_cluster=80) None
- Parameters
name (str) – Name of the visualizer, for identification purpose
- init_gui()
Build the GUI using dearpygui here
- Raises
NotImplementedError – Must be overrided by subclasses.
- update(messages: List[Message])
Update the interface.
- Parameters
messages (List[Message]) – list of messages to be displayed
- Raises
NotImplementedError – Must be overrided by subclasses.