Built-in Processor nodes

Each kind of processor node works on the data in its queue in a specific ways.

Processor

class pyneurode.processor_node.Processor.Processor

The base class that handle real-time processing

process(message: Message) Message

The function to be override by subclass to perform realtime processing. The overriden function should work on the passed-in message and return the results as another Message

Parameters

message (Message) – Data to be processed.

Raises

NotImplementedError – This function must be implemented in subclasses

Returns

processed messages

Return type

Message

shutdown()

Close the queue and flush all the messages

startup()

Functions to be first called before processing run. Can be overrided for customized startup behaviour

BatchProcessor

class pyneurode.processor_node.BatchProcessor.BatchProcessor(interval=0.001, internal_buffer_size=10000, verbose=True)

A BatchProcessor starts an background thread to keep pulling the data, the process() function is called every specified interval. During each specified interval, all the data acquired since last call to process() will be send to the current process

__init__(interval=0.001, internal_buffer_size=10000, verbose=True)
Parameters
  • interval (float, optional) – Time interval at which batches of messages will be processed at once. Defaults to 0.001.

  • internal_buffer_size (int, optional) – internal butter size at which the messages will be held untill they are processed. Defaults to 10000.

  • verbose (bool, optional) – _description_. Defaults to True.

process(messages: Optional[List[Message]] = None)

Process the input messages. A batch of messages will be passed at a batch every time interval

Parameters

messages (List[Message], optional) – messages to be processed. Defaults to None.

Raises

NotImplementedError – This function must be implemented in subclasses

startup()

Functions to be first called before processing run. Can be overrided for customized startup behaviour

SpikeSortProcessor

class pyneurode.processor_node.SpikeSortProcessor.SpikeSortProcessor(interval=None, internal_buffer_size=1000, min_num_spikes=2000, do_pca=True, time_bin=0.1)
__init__(interval=None, internal_buffer_size=1000, min_num_spikes=2000, do_pca=True, time_bin=0.1)
Parameters
  • interval (float, optional) – Time interval at which batches of messages will be processed at once. Defaults to 0.001.

  • internal_buffer_size (int, optional) – internal butter size at which the messages will be held untill they are processed. Defaults to 10000.

  • verbose (bool, optional) – _description_. Defaults to True.

process(msgs)

Process the input messages. A batch of messages will be passed at a batch every time interval

Parameters

messages (List[Message], optional) – messages to be processed. Defaults to None.

Raises

NotImplementedError – This function must be implemented in subclasses

startup()

Functions to be first called before processing run. Can be overrided for customized startup behaviour

SyncDataProcessor

class pyneurode.processor_node.SyncDataProcessor.SyncDataProcessor(interval=0.001, internal_buffer_size=10000, verbose=True)

Synchronize two sources of time series data

process(data)

Process the input messages. A batch of messages will be passed at a batch every time interval

Parameters

messages (List[Message], optional) – messages to be processed. Defaults to None.

Raises

NotImplementedError – This function must be implemented in subclasses

startup()

Functions to be first called before processing run. Can be overrided for customized startup behaviour

GUIProcessor

class pyneurode.processor_node.GUIProcessor.GUIProcessor(internal_buffer_size=1000, frame_rate=60)

A GUIProcessor is a processor that receive messages for plotting and displaying the GUI.

Note

its process() function will be called frequently. Avoid doing time consuming processing there.

__init__(internal_buffer_size=1000, frame_rate=60)
Parameters

internal_buffer_size (int, optional) – size of the internal buffer to hold messages before they are displayed. Defaults to 1000.

process(messages: Optional[List[Message]] = None)

Process the input messages. A batch of messages will be passed at a batch every time interval

Parameters

messages (List[Message], optional) – messages to be processed. Defaults to None.

Raises

NotImplementedError – This function must be implemented in subclasses

register_visualizer(visualizer: Visualizer, filters: List[str])

Register a Visualizer object with the GUIProcessor. Only message of types specified in the filter list will be passed to the visualizer.

Parameters
  • visualizer (Visualizer) – visualizer to be creat the GUI and call during frame update

  • filters (List[str]) – names of message type to be passed to the visualizer.

shutdown()

Close the queue and flush all the messages

startup()

Functions to be first called before processing run. Can be overrided for customized startup behaviour

Visualizers

Visualizers are responsible for visualizing messages on the user interface using dearpygui

Visualizer

class pyneurode.processor_node.Visualizer.Visualizer(name: str)

The base Visualizer class. Intended to be extended by subclasses. The UI should be constructed using the init_gui() function and updated based on the messages passed to it in the update() function.

Example:

this is an exmaple showing how it works
__init__(name: str)
Parameters

name (str) – Name of the visualizer, for identification purpose

init_gui()

Build the GUI using dearpygui here

Raises

NotImplementedError – Must be overrided by subclasses.

update(messages: List[Message])

Update the interface.

Parameters

messages (List[Message]) – list of messages to be displayed

Raises

NotImplementedError – Must be overrided by subclasses.

LatencyVisualizer

class pyneurode.processor_node.LatencyVisualizer.LatencyVisualizer(name: str, subplot_size=(3, 3))
__init__(name: str, subplot_size=(3, 3)) None
Parameters

name (str) – Name of the visualizer, for identification purpose

init_gui()

Build the GUI using dearpygui here

Raises

NotImplementedError – Must be overrided by subclasses.

update(messages: List[Message])
  1. check if the metrics is already in the current dictionary

  2. if not, add it to the existing dictionary, update the x axis tick

  3. set the value accordingly

AnalogVisualizer

class pyneurode.processor_node.AnalogVisualizer.AnalogVisualizer(name: str, buffer_length: int = 500, scale=1, time_scale=None)

Display a analog value

__init__(name: str, buffer_length: int = 500, scale=1, time_scale=None) None
Parameters

name (str) – Name of the visualizer, for identification purpose

init_gui()

Build the GUI using dearpygui here

Raises

NotImplementedError – Must be overrided by subclasses.

update(messages: List[Message])

Update the interface.

Parameters

messages (List[Message]) – list of messages to be displayed

Raises

NotImplementedError – Must be overrided by subclasses.

SpikeClusterVisualizer

class pyneurode.processor_node.SpikeClusterVisualizer.SpikeClusterVisualizer(name, num_channel=4, max_spikes=500, max_plot_per_cluster=80)
__init__(name, num_channel=4, max_spikes=500, max_plot_per_cluster=80) None
Parameters

name (str) – Name of the visualizer, for identification purpose

init_gui()

Build the GUI using dearpygui here

Raises

NotImplementedError – Must be overrided by subclasses.

update(messages: List[Message])

Update the interface.

Parameters

messages (List[Message]) – list of messages to be displayed

Raises

NotImplementedError – Must be overrided by subclasses.