Algorithms in Nengo: Advanced Examples#
This tutorial covers the following:
Read 1D time-varying data from an external source into Nengo
Learn decoders in real-time
Perform matrix multiplication with greater than 2-dimensional neurons
Set up#
Ensure you are using your 495 Virtual Environment before you begin!
Import Nengo and other supporting libraries into your program to get started:
import matplotlib.pyplot as plt
import numpy as np
import nengo
from nengo.processes import Piecewise
from nengo.processes import WhiteSignal
from nengo.solvers import LstsqL2
Continuous Time Data#
We’ve talked a lot about how neurons are well suited to processing continous time data, but we’ve yet to see an example where we actually read in that data. Let’s do that now.
We are going to create some bogus time-varying data that is saved off somewhere that we then want to pull into Nengo for processing. Think of an EKG - our heart rhythm is constantly changing - i.e. the data recorded from our heart is time-varying aka continous time data.
Imagine we want to connect the EKG recorder directly to neuromorphic hardware to detect cardiac events in real-time at low-power. Why? A hospital might have a lot of power, but what if you want to record events for a patient going about their normal life? We need something low power that can record for a while! For the record, we have these - but we are in a hypothetical situation where we want to further reduce the power consumed by implementing with neuromorphic hardware.
OKAY. We need to proof of concept this prior to strapping a neuromorphic hardware architecture to a person, right? Here are the steps:
Record data
See if the data can be read by the architecture (import into Nengo simulator!)
Build a processing algorithm that uses neurons as computational units (Nengo!)
Run the algorithm on data with real hardware (one that uses Nengo as the software architecture)
Integrate the EKG recorder and the neuromorphic chip with the algorithm to ensure it works on hardware end-to-end
Beta test it on users
In this tutorial, we will to simulate step 1 (with bogus time-varying data) and implement step 2.
Step 1#
Here is where we create bogus data, but you could instead load your own.
# This tells us how often the simulated heart beats (2 seconds with dt=.001 like Nengp)
time_vector = np.arange(0, 2, 1e-3)
# ChatGPT helped me simulate EKG data
# EKG parameters
heart_rate = 80 # beats per minute
bpm_to_hz = heart_rate / 60 # Convert BPM to Hz
signal_frequency = bpm_to_hz * np.pi * 2
# Make some data over the course of time
p_wave = np.sin(signal_frequency * time_vector) * np.exp(-np.power(time_vector - 0.25, 2) / (2 * 0.1**2))
qrs_wave = np.piecewise(time_vector,
[time_vector < 0.45, time_vector >= 0.45],
[lambda t: -np.sin(5 * signal_frequency * t),
lambda t: 0.8 * np.sin(2 * signal_frequency * t)])
t_wave = 0.5 * np.sin(signal_frequency * time_vector) * np.exp(-np.power(time_vector - 0.7, 2) / (2 * 0.1**2))
# Combine waves to create an artificial EKG signal
ekg_signal = p_wave + qrs_wave + t_wave
### Could also just create a sine wave outside of Nengo
# time_vector = np.arange(0, 60, 1e-3)
# ekg_signal = np.sin(time_vector)
Step 2#
We will build a Nengo node that takes the data from outside of Nengo into a form that can be represented and processed using Nengo neurons. At each Nengo time step, the Nengo node will implement the step function defined for that value and time.
class readData:
def __init__(self, funct):
self.vals = funct
def step(self, t):
self.vals = np.roll(self.vals, -1)
return self.vals[0]
inp = readData(ekg_signal)
Now, we will connect this node to a neuron ensemble and see if the neuron ensemble correctly represents the input data.
model = nengo.Network(label="ReadData")
# Create a model to perform the sorting
with model:
# Add input node
input_node = nengo.Node(inp.step)
# Add a single neuron to encode the input
input_neuron = nengo.Ensemble(n_neurons=100, dimensions=1, radius=2)
# Connect the node to the encoding neuron
inp_conn = nengo.Connection(input_node, input_neuron)
## Add probes to see outputs throughout the model
# This probe captures the non-spiking input value
input_probe_node = nengo.Probe(input_node)
# This probe displays the decoded neural representation of the input
input_probe_neuron = nengo.Probe(input_neuron, synapse=0.01)
# Build the simulator to run the model containing just input encoding
with nengo.Simulator(model) as sim:
# Run it for 1 second
sim.run(5)
# Plot the input signals and decoded ensemble values
plt.figure()
plt.plot(sim.trange(), sim.data[input_probe_node], label="Input EKG data")
plt.plot(sim.trange(), sim.data[input_probe_neuron], label="Decoded Estimate of EKG")
plt.legend()
plt.xlabel("time [s]")
plt.title("Decoded Spikes = Neural Representation of Input")
Discussion#
The key distinction here is that when we’ve had time-varying functions as inputs via Nengo nodes in the past, they’ve been functions that we’ve built within the node! Go back and look at our tutorials that utilize piecewise
and whitenoise
and lambda
for reference. An easy example can be found here.
For our EKG data, we are taking (simulated) data that’s never been seen by our model - collected (generated) entirely offline - and are now processing that with Nengo. That’s why we need the readData
class. Make sure that makes sense!
What this means for you: if you plan to pull continuous time data offline for your final project, you’ll likely need this function.
Online Learning#
Normally, if you have a function you would like to compute across a connection, you would specify it with function
in the Connection constructor. However, it is also possible to use error-driven learning to learn to compute a function online (i.e. while you’re actively receiving inputs). This is relevant to edge computing which we discussed quite a bit in our Neuroscience and Neural Nets lectures. Suppose you’ve trained a DNN to classify certain data, but now you get new data while in the field and want to update your network in real-time. This is NOT an easy task with current state of the art - in fact a lot of dollars are going in to trying to solve this problem. Our brains however have already solved it. These neurons are inspired by our brains. So let’s learn in real time with these neurons.
First, here’s our model without learning - i.e. Nengo computes all of the weights for the connections in advance. In this case, we have two ensembles acting as a communication channel (we built a simple version of this in neuron_transformations.ipynb
). In a comms channel, you want the output of the first ensemble fed into the second, and you want the second ensemble’s output to match that of the original input.
[Input] —> (A) —> (B) —> [Output]
To demonstrate online learning, we add in a function between the two ensembles (i.e. the output of the second ensemble is a function of the first - which is why we have weights, to compute that f(x)). NOTE: in a comms channel, our f(x) = x. BUT we want to mess that up so we have to relearn the weights to make that happen. SO, we add a weird function in between that we will train away by the end of this demo.
Notice both ensembles are 2D - meaning we send two white noise signals into the two inputs of the first ensemble, and we get a white noise signal out of the two output dimensions. THEN, we compute a bizarre function where we just return two random numbers out of the second ensemble. To recap: we send two white noise signals into Ensemble A, connect to Ensemble B, which then outputs two random values from the network using pretrained Nengo decoders.
model = nengo.Network(label="Pretrained Comms Channel")
with model:
# input signal, passed from node to ensemble A
input = nengo.Node(WhiteSignal(60, high=5), size_out=2)
A = nengo.Ensemble(60, dimensions=2)
nengo.Connection(input, A)
# passed output of ensemble A to ensemble B
# train weights with a complicated function (send in a value, return something random)
B = nengo.Ensemble(60, dimensions=2)
conn = nengo.Connection(A, B, function=lambda x: np.random.random(2))
inp_p = nengo.Probe(input)
pre_p = nengo.Probe(A, synapse=0.01)
post_p = nengo.Probe(B, synapse=0.01)
with nengo.Simulator(model) as sim:
sim.run(10.0)
plt.figure(figsize=(12, 8))
plt.subplot(2, 1, 1)
plt.plot(sim.trange(), sim.data[inp_p].T[0], c="k", label="Input")
plt.plot(sim.trange(), sim.data[pre_p].T[0], c="b", label="Pre")
plt.plot(sim.trange(), sim.data[post_p].T[0], c="r", label="Post")
plt.ylabel("Dimension 1")
plt.legend(loc="best")
plt.subplot(2, 1, 2)
plt.plot(sim.trange(), sim.data[inp_p].T[1], c="k", label="Input")
plt.plot(sim.trange(), sim.data[pre_p].T[1], c="b", label="Pre")
plt.plot(sim.trange(), sim.data[post_p].T[1], c="r", label="Post")
plt.ylabel("Dimension 2")
plt.legend(loc="best")
Add in Learning#
Now, we will add an ensemble to compute our error. We know that with a proper comms channel, our output from the ensemble B should match that of ensemble A. The error is therefore the difference between the two and we will use an ensemble to compute that. The error neuron will subtract ensemble A output from ensemble B output using the transform=-1
argument for the A connection(i.e. error \(= B-A =\) how far off we are from where we want to be).
We will then employ an error rule on our connection between A and B, which we called conn
in the last segment of code. The learning rule we use is called the prescribed error sensitivity (PES) learning rule. The PES learning rule is a biologically plausible supervised learning rule that is frequently used with the Neural Engineering Framework (NEF) – this is Nengo’s framework! PES modifies the connection weights between populations of neurons to minimize an external error signal (being the error ensemble we’ve created). This page describes it succinctly in the “How does this work?” section if you want the math behind how the decoders are updated.
This means we compute an error and use that in our learning rule between ensembles A and B to push the weights toward a solution where \(B-A=0\), i.e. \(A=B\).
with model:
error = nengo.Ensemble(60, dimensions=2)
error_probe = nengo.Probe(error, synapse=0.03)
# Error = actual - target = post - pre
nengo.Connection(B, error)
connA = nengo.Connection(A, error, transform=-1)
# Add the learning rule to the connection
conn.learning_rule_type = nengo.PES()
# Connect the error into the learning rule
nengo.Connection(error, conn.learning_rule)
# probe the decoders to see how those change over time to reduce error
weights_p = nengo.Probe(conn, "weights", synapse=0.01, sample_every=0.01)
Run it, plot it.#
with nengo.Simulator(model) as sim:
sim.run(10.0)
plt.figure(figsize=(12, 12))
plt.subplot(4, 1, 1)
plt.plot(sim.trange(), sim.data[inp_p].T[0], c="k", label="Input")
plt.plot(sim.trange(), sim.data[pre_p].T[0], c="b", label="Pre")
plt.plot(sim.trange(), sim.data[post_p].T[0], c="r", label="Post")
plt.ylabel("Dimension 1")
plt.legend(loc="best")
plt.subplot(4, 1, 2)
plt.plot(sim.trange(), sim.data[inp_p].T[1], c="k", label="Input")
plt.plot(sim.trange(), sim.data[pre_p].T[1], c="b", label="Pre")
plt.plot(sim.trange(), sim.data[post_p].T[1], c="r", label="Post")
plt.ylabel("Dimension 2")
plt.legend(loc="best")
plt.subplot(4, 1, 3)
plt.plot(sim.trange(), sim.data[error_probe], c="b")
plt.ylim(-1, 1)
plt.legend(("Error[0]", "Error[1]"), loc="best")
plt.subplot(4, 1, 4)
plt.plot(sim.trange(sample_every=0.01), sim.data[weights_p][..., 10])
plt.ylabel("Decoding weight")
plt.legend(("Decoder 10[0]", "Decoder 10[1]"), loc="best")
Discussion#
We can see here that toward the end of our simulation, both dimensions start to look the same. We can see from the third plot that error is reduced as we go in each dimension. Once error is reduced to near zero, the weights should always work, regardless of input to the network (i.e. if we keep those decoders between those two ensembles, we can feed in any input and we should output the same signal). How? Decoders! You can see in the fourth plot how the decoders change and then converge to a value over time. That means that instead of computing that nasty random output function, the weights have adjusted to take the same output spikes and compute \(f(x)=x\). If you’re lost, please work back through the neuron_transformations.ipynb
.
Matrix Multiplication using Ensemble Arrays (On your own)#
This example demonstrates how to perform general matrix multiplication using Nengo. The matrix can change during the computation, which makes it distinct from doing static matrix multiplication with neural connection weights (as done in all neural networks).
Let’s compute \(A \cdot B\), which is equal to \((B \cdot A)^T\) where
\(A = \begin{bmatrix} .5 & -.5 \\ -.2 & .3 \end{bmatrix}\) and \(B = \begin{bmatrix} .58 & -1 \\ .7 & .1 \end{bmatrix}\)
Big differences here:
We are using an EnsembleArray (i.e. an array of ensembles of neurons) instead of an Ensemble of neurons
We specifically probe the outputs of an Ensemble Array since we need to probe each ensemble’s output (not the array itself). This results in 4 outputs since we have an array of 4 ensembles of neurons
Amat = np.array([[0.5, -0.5], [-0.2, 0.3]])
Bmat = np.array([[0.58, -1.0], [0.7, 0.1]])
model = nengo.Network(label="Matrix Multiplication", seed=123)
with model:
# Make 2 EnsembleArrays to encode the input
# Amat.size and Bmat.size = 4, therefore 4D neurons used
# Values should stay within the range (-radius, radius)
A = nengo.networks.EnsembleArray(n_neurons = 100, n_ensembles=Amat.size, radius=1)
B = nengo.networks.EnsembleArray(n_neurons = 100, n_ensembles=Bmat.size, radius=1)
# Create 2 nodes to send in matrix values
inputA = nengo.Node(Amat.ravel()) # ravel is equivalent to reshape(-1) i.e. flatten!
inputB = nengo.Node(Bmat.ravel())
# connect input nodes to neuron ensembles
nengo.Connection(inputA, A.input)
nengo.Connection(inputB, B.input)
# Probe to check we've connected correctly
# Note: must probe the output of each ensemble because
# we are using an array of ensembles!
#
# sample_every determines how many timesteps you plot
# in this case, it's fewer times than the simulation would plot
# default simulation dt = .001
A_probe = nengo.Probe(A.output, sample_every=0.01, synapse=0.01)
B_probe = nengo.Probe(B.output, sample_every=0.01, synapse=0.01)
Run and Plot
with nengo.Simulator(model) as sim:
sim.run(1)
plt.figure()
plt.subplot(1, 2, 1)
plt.title("A")
plt.plot(sim.trange(sample_every=0.01), sim.data[A_probe])
plt.legend(['1','2','3','4'], loc="best")
plt.subplot(1, 2, 2)
plt.title("B")
plt.plot(sim.trange(sample_every=0.01), sim.data[B_probe])
plt.legend(['1','2','3','4'], loc="best")
Inputs look good, but now we need to actually perform each multiplication that occurs in matrix multiplication.
First, we will use one of Nengo’s reusable networks product
instead of manually creating multiplication neurons as we did in neuron_transformations.ipynb
(although we could definitely do it this way!). The product
network computes the element-wise product of two equally sized vectors where input_a
is the first vector and input_b
is the second. Read more about different reusable networks here.
You’ll notice we call an argument transform
. It’s way easier to use transform than a function when it comes to matrix multiplication!
with model:
# The C matrix is composed of populations that each contain
# one element of A and one element of B.
# These elements will be multiplied together in the next step.
# For two 2x2 matrices to be multiplied together, there will be
# 8 total multiplications that will occur (think through this!)
c_size = Amat.size * Bmat.shape[1] # 4*2 = 8
C = nengo.networks.Product(100, dimensions=c_size)
# Determine the transformation matrices to get the correct pairwise
# products computed.
transformA = np.zeros((c_size, Amat.size))
transformB = np.zeros((c_size, Bmat.size))
for i in range(Amat.shape[0]):
for j in range(Amat.shape[1]):
for k in range(Bmat.shape[1]):
c_index = j + k * Amat.shape[1] + i * Bmat.size
transformA[c_index][j + i * Amat.shape[1]] = 1
transformB[c_index][k + j * Bmat.shape[1]] = 1
print("A->C")
print(transformA)
print("B->C")
print(transformB)
with model:
conn = nengo.Connection(A.output, C.input_a, transform=transformA)
nengo.Connection(B.output, C.input_b, transform=transformB)
C_probe = nengo.Probe(C.output, sample_every=0.01, synapse=0.01)
We printed the weights. Recall, we have two 2x2 matrices. Each matrix was flattened into an array of 4 values (indices 0 to 3). If there is a 1
in the tranform matrix, that means the element is “active” because it is weighted 1 instead of 0. So, for the first multiplication that must be performed, we have \(a_0\) multiplied by \(b_0\).
If you take a look at each transform matrix row by row, they line up to perform each multiplication required within two 2x2 matrices being multiplied.
Now, let’s view the outputs of our 8-dimensional neuron ensemble C (created from a pre-built Nengo network product
). This will output each of the individual multiplications. Note: no addition has occurred!
# Run it!
with nengo.Simulator(model) as sim:
sim.run(1)
# Plot it!
plt.figure()
plt.plot(sim.trange(sample_every=0.01), sim.data[C_probe])
plt.title("C")
plt.legend(['1','2','3','4','5','6','7','8'], loc="best")
Now we have to add the appropriate elements to complete matrix multiplication. We use the same transform
argument (i.e. decoders/weights) to choose which of the outputs from C should be added together.
with model:
# Now do the appropriate summing
D = nengo.networks.EnsembleArray(
100, n_ensembles=Amat.shape[0] * Bmat.shape[1], radius=1
) # sanity check that this is the right size for any matrix multiplication!
# The mapping for this transformation is much easier, since we want to
# combine pairs from C within D.
transformC = np.zeros((D.dimensions, c_size))
for i in range(c_size):
transformC[i // Bmat.shape[0]][i] = 1
print("C->D")
print(transformC)
with model:
nengo.Connection(C.output, D.input, transform=transformC)
D_probe = nengo.Probe(D.output, sample_every=0.01, synapse=0.01)
We have 8 outputs from C. We add two at a time to compute the four outputs of our two 2x2 matrices being multiplied together.
Recall:
Now, let’s run it and see our final answer!
# Run it!
with nengo.Simulator(model) as sim:
sim.run(1)
# Plot it!
plt.figure()
plt.plot(sim.trange(sample_every=0.01), sim.data[D_probe])
for d in np.dot(Amat, Bmat).flatten():
plt.axhline(d, color="k")
plt.title("D")
Discussion#
We could have written matrix multiplication manually using our own multiplication function from last time. That may have made it more intuitive, and we could have possibly found a faster way to do it!
BUT, now you have experience using a pre-built Nengo network and a generalized form of matrix multiplication, which may come in handy for your final project.