# Standard library imports
from collections import defaultdict
from itertools import product, combinations
import math
from typing import List, Optional, Union
import os
import gc
from functools import reduce
# Third-party package imports
import ipywidgets as widgets
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import pandas as pd
import plotly.express as px
import scipy as sp
import seaborn as sns
import torch
from IPython.display import display
from scipy.sparse import csr_matrix, issparse, csc_matrix, coo_matrix, spmatrix, vstack
from tqdm import tqdm
from .utils import (
dynamic_representation,
tensor_to_csc,
to_nparray,
torch_sparse_where,
arrayable,
scipy_sparse_to_pytorch,
)
from .path_finding import (
find_paths_of_length,
group_paths,
remove_excess_neurons,
filter_paths,
)
[docs]
def compress_paths(
A: spmatrix,
step_number: int,
threshold: float = 0,
output_threshold: float = 1e-4,
root: bool = False,
chunkSize=2000,
device: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu"),
save_to_disk: bool = False,
save_path: str = "./",
save_prefix: str = "step_",
return_results: bool = True,
high_cpu_ram: bool = True,
output_dtype: np.dtype = np.float32,
density_threshold: float = 0.2, # Save as dense if density exceeds this
):
"""
Computes A^0 to A^n by chunking the computation to save memory.
Results are thresholded and returned as a list of sparse scipy matrices or numpy
arrays (if above density_threshold). If it's too slow, try changing chunkSize.
Args:
A (scipy.sparse.matrix): The connectivity matrix as a scipy
sparse matrix.
step_number (int): Power to raise A to.
threshold (float): Threshold to apply to the matrix after each multiplication.
If 0, no thresholding is applied.
output_threshold (float): Threshold to apply to the output (>=), but not during
matrix multiplication.
root (bool): If True, take the n-th root of the result in output.
chunkSize (int): Size of the chunks to process at a time. This determines
the memory usage (on GPU if available). It needs < 3 times the size of the
dense chunk which is `A.shape[0] * chunkSize * float32 / 8 bit per byte`
bytes.
device (torch.device): Device to use for computation. Uses GPU if available.
save_to_disk (bool): If True, save the results to disk.
save_path (str): Path to save the results.
save_prefix (str): Prefix to use for the saved files.
return_results (bool, optional): Whether to return the results as a list
of sparse matrices. Defaults to True. If False, returns an empty list.
high_cpu_ram (bool): if high_cpu_ram, keep all the resulting chunked sparse
matrices in memory, before combining and writing to disk altogether. if
False, write each chunk to disk to a temporary directory as soon as it is
computed, and combine them at the end.
output_dtype (np.dtype): Data type to use for the output matrices. Defaults
to np.float32.
density_threshold (float): If the density of the matrix exceeds this threshold,
the matrix is saved as dense. Defaults to 0.2.
Returns:
List[scipy.sparse.csr_matrix or numpy.ndarray]:
List of matrices representing A^0 to A^n.
"""
assert A.shape[0] == A.shape[1], "Matrix A must be square"
assert step_number > 0, "Power n must be positive"
# Turn A into a torch sparse tensor
A = scipy_sparse_to_pytorch(A).to(device)
matrix_size = A.shape[0]
num_chunks = (matrix_size + chunkSize - 1) // chunkSize # Ceiling division
# Create save directory if needed
if save_to_disk and not os.path.exists(save_path):
os.makedirs(save_path)
# Prepare temporary storage method
if high_cpu_ram:
# Initialize dictionaries to collect COO data
rows: dict[int, list[npt.NDArray]] = {idx: [] for idx in range(step_number)}
cols: dict[int, list[npt.NDArray]] = {idx: [] for idx in range(step_number)}
data: dict[int, list[npt.NDArray]] = {idx: [] for idx in range(step_number)}
else:
# Create temporary directory for chunks
temp_dir = os.path.join(os.getcwd(), "temp_chunks")
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
# Process each chunk
for chunk_idx in tqdm(range(num_chunks)):
# Define the range for this chunk
start_col = chunk_idx * chunkSize
end_col = min((chunk_idx + 1) * chunkSize, matrix_size)
current_chunk_size = end_col - start_col
for power in range(step_number):
# Compute the result for this chunk and power
if power == 0:
# For A^1, extract the relevant columns from A
indices = A._indices()
values = A._values()
# Filter to include only columns in current chunk
col_mask = (indices[1, :] >= start_col) & (indices[1, :] < end_col)
result_indices = indices[:, col_mask].clone()
result_values = values[col_mask].clone()
# Adjust column indices to be relative to start_col
result_indices[1, :] -= start_col
# Create sparse tensor for this column chunk
result = torch.sparse_coo_tensor(
result_indices,
result_values,
(matrix_size, current_chunk_size),
device=device,
).to_dense()
if threshold > 0:
result = torch.where(
torch.abs(result) < threshold,
torch.tensor(0.0).to(device),
result,
)
else:
# Matrix multiplication for higher powers
result = torch.sparse.mm(A, result)
if threshold > 0:
result = torch.where(
torch.abs(result) < threshold,
torch.tensor(0.0).to(device),
result,
)
# Force garbage collection
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Apply root if requested
if root:
result_root = torch.sign(result) * torch.abs(result) ** (
1 / (power + 1)
)
# Apply output threshold and collect non-zero entries
chunk_rows, chunk_cols = torch.nonzero(
torch.abs(result) > output_threshold, as_tuple=True
)
# Get values based on whether root was applied
chunk_data: npt.NDArray
if root:
chunk_data = (
result_root[chunk_rows, chunk_cols]
.cpu()
.numpy()
.astype(output_dtype)
)
del result_root
else:
chunk_data = (
result[chunk_rows, chunk_cols].cpu().numpy().astype(output_dtype)
)
torch.cuda.empty_cache()
# Convert to CPU
chunk_cols: npt.NDArray = chunk_cols.cpu().numpy()
chunk_rows: npt.NDArray = chunk_rows.cpu().numpy()
# Store the data based on RAM usage preference
if high_cpu_ram:
# Adjust column indices to global coordinates
global_cols = chunk_cols + start_col
# Store in memory
rows[power].append(chunk_rows)
cols[power].append(global_cols)
data[power].append(chunk_data)
else:
# Save chunk to disk
sparse_chunk = sp.sparse.coo_matrix(
(chunk_data, (chunk_rows, chunk_cols)),
shape=(matrix_size, current_chunk_size),
)
sparse_chunk.eliminate_zeros()
sp.sparse.save_npz(
os.path.join(temp_dir, f"step_{power}_chunk_{chunk_idx}.npz"),
sparse_chunk,
)
# Clear memory
del result, chunk_rows, chunk_cols, chunk_data
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Combine chunks and save results
print("Combining data from all chunks")
results = []
for power in tqdm(range(step_number)):
# Get all data for current power
if high_cpu_ram:
# Combine data from memory
if rows[power]: # Check if data was collected
all_rows = np.concatenate(rows[power])
all_cols = np.concatenate(cols[power])
all_data = np.concatenate(data[power])
else:
all_rows = np.array([])
all_cols = np.array([])
all_data = np.array([])
else:
# Combine data from disk
all_rows, all_cols, all_data = [], [], []
for chunk_idx in range(num_chunks):
# Load chunk from disk
sparse_chunk = sp.sparse.load_npz(
os.path.join(temp_dir, f"step_{power}_chunk_{chunk_idx}.npz")
)
# Adjust column indices
sparse_chunk.col += chunk_idx * chunkSize
# Append data
all_rows.append(sparse_chunk.row)
all_cols.append(sparse_chunk.col)
all_data.append(sparse_chunk.data)
# Clean up
del sparse_chunk
gc.collect()
# Combine all chunks
all_rows = np.concatenate(all_rows)
all_cols = np.concatenate(all_cols)
all_data = np.concatenate(all_data)
# Calculate density from the data directly
nnz = len(all_data)
density = nnz / (matrix_size**2)
print(
f"Matrix {power} has {nnz} non-zero elements ({density*100:.6f}% of full matrix)"
)
# Choose the appropriate format based on density
if density > density_threshold:
print(
f"Creating matrix {power} in dense format (density: {density*100:.6f}%)"
)
# Create dense matrix directly instead of converting from sparse
dense_result = np.zeros((matrix_size, matrix_size), dtype=output_dtype)
dense_result[all_rows, all_cols] = all_data
# Save to disk if requested
if save_to_disk:
np.save(
os.path.join(save_path, f"{save_prefix}{power}.npy"), dense_result
)
# Add to results if requested
if return_results:
results.append(dense_result)
# Clean up
del dense_result
else:
# Create sparse matrix
sparse_result = sp.sparse.coo_matrix(
(all_data, (all_rows, all_cols)), shape=(matrix_size, matrix_size)
).tocsc()
sparse_result.eliminate_zeros()
# Save sparse matrix if requested
if save_to_disk:
sp.sparse.save_npz(
os.path.join(save_path, f"{save_prefix}{power}.npz"), sparse_result
)
# Add to results if requested
if return_results:
results.append(sparse_result)
# Clean up
del sparse_result
# Clean up
del all_rows, all_cols, all_data
gc.collect()
# Clean up temporary files if using disk storage
if not high_cpu_ram:
for power in range(step_number):
for chunk_idx in range(num_chunks):
os.remove(os.path.join(temp_dir, f"step_{power}_chunk_{chunk_idx}.npz"))
os.rmdir(temp_dir)
# Final cleanup
torch.cuda.empty_cache()
gc.collect()
return results
[docs]
def compress_paths_dense_chunked(
inprop: csc_matrix,
step_number: int,
threshold: float = 0,
output_threshold: float = 1e-4,
root: bool = False,
chunkSize: int = 10000,
dtype: torch.dtype = torch.float32,
device: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu"),
save_to_disk: bool = False,
save_path: str = "./",
save_prefix: str = "step_",
) -> list:
"""Performs iterative multiplication of a sparse matrix `inprop` for a specified
number of steps, applying thresholding to filter out values below a certain
`threshold` to optimize memory usage and computation speed.
The function is optimized to run on GPU if available. It needs >=
size_of_inprop.to_dense() * 3 amount of GPU memory, for matrix multiplication, and
thresholding.
This function multiplies the connectivity matrix (input in rows; output in columns)
`inprop` with itself `step_number` times, with each step's result being thresholded
to zero out elements below a given threshold. The function stores each step's result
in a list, where each result is further processed to drop values below the
`output_threshold` to save memory.
Args:
inprop (scipy.sparse.matrix): The connectivity matrix as a scipy
sparse matrix.
step_number (int): The number of iterations to perform the matrix
multiplication.
threshold (float, optional): The threshold value to apply after each
multiplication. Values below this threshold are set to zero.
Defaults to 0.
output_threshold (float, optional): The threshold value to apply to
the final output, used to reduce output size. Defaults to 1e-4.
root (bool, optional): Whether to take the nth root of the output.
This can be understood as "the average direct connection strength"
(when root=True), as opposed to "the proportion of influence among
all partners n steps away" (when root=False). Defaults to False.
chunkSize (int, optional): The size of the chunks to split the matrix
into for matrix multiplication. Defaults to 10000.
dtype (torch.dtype, optional): The data type to use for the tensor
calculations. Defaults to torch.float32.
device (torch.device, optional): The device to use for the tensor
calculations. Defaults to torch.device("cuda" if
torch.cuda.is_available() else "cpu").
save_to_disk (bool, optional): Whether to save the output matrices to
disk. Defaults to False.
save_path (str, optional): The path to save the output matrices to.
Defaults to "./" (the current folder).
save_prefix (str, optional): The prefix to use for the output matrix
filenames. Defaults to ``"step_"``.
Returns:
list:
A list of scipy.sparse.csc_matrix objects, each representing connectivity
from all neurons to all neurons n steps away.
Note:
This function requires PyTorch and is designed to automatically
utilize CUDA-enabled GPU devices if available to accelerate
computations. The input matrix `inprop` is converted to a dense tensor
before processing.
Example:
>>> from scipy.sparse import csc_matrix
>>> import numpy as np
>>> inprop = csc_matrix(np.array([[0.1, 0.2], [0.3, 0.4]]))
>>> step_number = 2
>>> compressed_paths = compress_paths(inprop, step_number,
threshold=0.1,
output_threshold=0.01)
>>> print(compressed_paths)
"""
steps_fast: List[csc_matrix] = []
if not isinstance(inprop, csc_matrix):
inprop = inprop.tocsc()
# check that step_number>0
if step_number < 1:
raise ValueError("step_number should be greater than 0")
size = inprop.shape[0]
chunks = math.ceil(size / chunkSize)
with torch.no_grad():
for i in tqdm(range(step_number)):
if i == 0:
out_tensor = torch.tensor(inprop.toarray(), dtype=dtype)
else:
out_tensor_new = torch.zeros(size, size, dtype=dtype)
colLow = 0
colHigh = chunkSize
for colChunk in range(chunks): # iterate chunks colwise
rowLow = 0
rowHigh = chunkSize
in_col = torch.tensor(
inprop[:, colLow:colHigh].toarray(), dtype=dtype
).to(device)
# shape: size x chunkSize; on GPU
for rowChunk in range(chunks): # iterate chunks rowwise
in_rows = out_tensor[rowLow:rowHigh, :].to(device)
# shape: chunkSize x size; on GPU
out_tensor_new[rowLow:rowHigh, colLow:colHigh] = torch.matmul(
in_rows, in_col
).to("cpu")
# shape: chunkSize x chunkSize; on CPU
rowLow += chunkSize
rowHigh += chunkSize
rowHigh = min(rowHigh, size)
del in_rows
del in_col
torch.cuda.empty_cache()
colLow += chunkSize
colHigh += chunkSize
colHigh = min(colHigh, size)
out_tensor = out_tensor_new
del out_tensor_new
# Clear PyTorch CUDA cache
torch.cuda.empty_cache()
# Thresholding during matmul
if threshold != 0:
out_tensor = torch.where(
out_tensor >= threshold,
out_tensor,
torch.tensor(0.0, dtype=dtype),
)
# Convert to csc for output
out_csc = tensor_to_csc(out_tensor)
out_csc.eliminate_zeros()
if root:
out_csc.data = np.power(out_csc.data, 1 / (i + 1))
if output_threshold > 0:
out_csc.data = np.where(
out_csc.data >= output_threshold, out_csc.data, 0
)
out_csc.eliminate_zeros()
if save_to_disk:
sp.sparse.save_npz(
os.path.join(save_path, f"{save_prefix}{i}.npz"), out_csc
)
else:
steps_fast.append(out_csc)
del out_csc
# remove all variables
del out_tensor
torch.cuda.empty_cache()
return steps_fast
# below: not chunked version
[docs]
def compress_paths_not_chunked(
inprop,
step_number,
threshold=0,
output_threshold=1e-4,
root=False,
device: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu"),
):
"""As above, but without chunking.
This would be more demanding for GPU RAM.
"""
steps_fast = []
if device is None:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
inprop_tensor = torch.tensor(inprop.toarray(), device=device)
with torch.no_grad():
for i in tqdm(range(step_number)):
if i == 0:
out_tensor = inprop_tensor.clone()
else:
out_tensor = torch.matmul(out_tensor, inprop_tensor)
# Thresholding during matmul
if threshold != 0:
out_tensor = torch.where(
out_tensor >= threshold,
out_tensor,
torch.tensor(0.0, device=device),
)
# Convert to csc for output
out_csc = tensor_to_csc(out_tensor.to("cpu"))
out_csc.eliminate_zeros()
if root:
out_csc.data = np.power(out_csc.data, 1 / (i + 1))
if output_threshold > 0:
out_csc.data = np.where(
out_csc.data >= output_threshold, out_csc.data, 0
)
out_csc.eliminate_zeros()
steps_fast.append(out_csc)
torch.cuda.empty_cache()
return steps_fast
[docs]
def compress_paths_signed(
inprop,
idx_to_sign: dict,
target_layer_number: int,
threshold: float = 0,
output_threshold: float = 1e-4,
root: bool = False,
chunkSize: int = 2000,
save_to_disk: bool = False,
save_path: str = "./",
return_results: bool = True,
):
"""Calculates the excitatory and inhibitory influences across specified layers of a
neural network, using PyTorch for GPU acceleration. Even numbers of inhibition is
treated as excitation.
Args:
inprop (scipy.sparse.csc_matrix): The initial connectivity matrix representing
direct connections between all neurons, shape (n_neurons, n_neurons).
idx_to_sign (dict): A dictionary mapping neuron indices to the sign of output
(1 for excitatory, -1 for inhibitory).
target_layer_number (int): The number of layers through which to calculate
influences. 1 for direct connections, 2 for one step away, etc.
threshold (float, optional): A value to threshold the influences during
calculation; influences below this value are set to zero, and not passed on.
Defaults to 0.
output_threshold (float, optional): A threshold for the final output to reduce
output file size, with values below this threshold set to zero. Defaults to
1e-4.
root (bool, optional): Whether to take the nth root of the output. This can be
understood as "the average direct connection strength" (when root=True), as
opposed to "the proportion of influence among all partners n steps away"
(when root=False). Defaults to False.
chunkSize (int, optional): The size of the chunks to split the matrix into for
matrix multiplication. Defaults to 5000. A chunk is a dense matrix of size
(chunkSize, n_neurons). This determines the memory usage: e.g. 2000 * 160000
* float32 / 8 bytes / 1e9 = 1.28 GB. We need two copies (one excitation, one
inhibition), plus the sparse representations of the (n_neurons, n_neurons)
matrix.
save_to_disk (bool, optional): Whether to save the output matrices to disk.
save_path (str, optional): Path to save the results. Defaults to "./".
return_results (bool, optional): Whether to return the results as a list
of sparse matrices. Defaults to True.
Returns:
Tuple[List[scipy.sparse.csc_matrix], List[scipy.sparse.csc_matrix]]:
Two lists of sparse matrices representing the excitatory and inhibitory
influences, respectively, up to the specified target layer.
"""
# if inprop is not a coo matrix, convert it
if not issparse(inprop):
# raise error
raise ValueError("inprop must be a scipy sparse matrix")
if not isinstance(inprop, coo_matrix):
inprop = inprop.tocoo()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
e_idx = [i for i in range(len(idx_to_sign)) if idx_to_sign[i] == 1]
i_idx = [i for i in range(len(idx_to_sign)) if idx_to_sign[i] == -1]
matrix_size = inprop.shape[0]
num_chunks = (matrix_size + chunkSize - 1) // chunkSize # Ceiling division
# make a temporary directory to save the chunks
temp_dir = os.path.join(os.getcwd(), "temp_chunks")
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
# Filter out e/i rows directly in COO format
e_mask = np.isin(inprop.row, e_idx)
i_mask = np.isin(inprop.row, i_idx)
direct_e_sender = sp.sparse.coo_matrix(
(
inprop.data[e_mask],
(inprop.row[e_mask], inprop.col[e_mask]),
),
shape=inprop.shape,
).tocsr()
direct_i_sender = sp.sparse.coo_matrix(
(
inprop.data[i_mask],
(inprop.row[i_mask], inprop.col[i_mask]),
),
shape=inprop.shape,
).tocsr()
if target_layer_number == 1:
return [direct_e_sender.tocsc()], [direct_i_sender.tocsc()]
# shape: (n_neurons, n_neurons)
direct_e_sender_tensor = scipy_sparse_to_pytorch(direct_e_sender, device=device)
# shape: (n_neurons, n_neurons)
direct_i_sender_tensor = scipy_sparse_to_pytorch(direct_i_sender, device=device)
for chunk_idx in tqdm(range(num_chunks)):
# Define the range for this chunk
start_row = chunk_idx * chunkSize
end_row = min((chunk_idx + 1) * chunkSize, matrix_size)
current_chunk_size = end_row - start_row
for layer in range(target_layer_number):
if layer == 0:
# For inprop^1, just take the corresponding rows, and turn into tensor
# shape: (chunkSize, n_neurons)
dense_e = torch.tensor(
direct_e_sender[start_row:end_row, :].toarray(),
dtype=torch.float32,
device=device,
)
dense_i = torch.tensor(
direct_i_sender[start_row:end_row, :].toarray(),
dtype=torch.float32,
device=device,
)
else:
past_dense_e = dense_e.clone()
# E = ee and ii added
# shape: (chunkSize, n_neurons)
dense_e = torch.sparse.mm(
dense_e, direct_e_sender_tensor
) + torch.sparse.mm(dense_i, direct_i_sender_tensor)
# I = E * I + I * E
# shape: (chunkSize, n_neurons)
dense_i = torch.sparse.mm(
past_dense_e, direct_i_sender_tensor
) + torch.sparse.mm(dense_i, direct_e_sender_tensor)
# Apply thresholding
if threshold > 0:
dense_e = torch.where(
dense_e >= threshold,
dense_e,
torch.tensor(0.0, device=device),
)
dense_i = torch.where(
dense_i >= threshold,
dense_i,
torch.tensor(0.0, device=device),
)
# force garbage collection after each multiplication
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Convert to csc for output
chunk_rows_e, chunk_cols_e = torch.nonzero(
torch.abs(dense_e) > output_threshold, as_tuple=True
)
chunk_rows_i, chunk_cols_i = torch.nonzero(
torch.abs(dense_i) > output_threshold, as_tuple=True
)
if root:
chunk_data_e = (
(
torch.sign(dense_e[chunk_rows_e, chunk_cols_e])
* torch.abs(dense_e[chunk_rows_e, chunk_cols_e])
** (1 / (layer + 1))
)
.cpu()
.numpy()
)
chunk_data_i = (
(
torch.sign(dense_i[chunk_rows_i, chunk_cols_i])
* torch.abs(dense_i[chunk_rows_i, chunk_cols_i])
** (1 / (layer + 1))
)
.cpu()
.numpy()
)
else:
chunk_data_e = dense_e[chunk_rows_e, chunk_cols_e].cpu().numpy()
chunk_data_i = dense_i[chunk_rows_i, chunk_cols_i].cpu().numpy()
# save the chunked data to disk as scipy sparse matrices
sparse_e = sp.sparse.coo_matrix(
(
chunk_data_e,
(chunk_rows_e.cpu().numpy(), chunk_cols_e.cpu().numpy()),
),
shape=(current_chunk_size, matrix_size),
)
sparse_i = sp.sparse.coo_matrix(
(
chunk_data_i,
(chunk_rows_i.cpu().numpy(), chunk_cols_i.cpu().numpy()),
),
shape=(current_chunk_size, matrix_size),
)
sparse_e.eliminate_zeros()
sparse_i.eliminate_zeros()
# save the sparse matrices to disk
sp.sparse.save_npz(
os.path.join(temp_dir, f"step_{layer}_e_{chunk_idx}.npz"), sparse_e
)
sp.sparse.save_npz(
os.path.join(temp_dir, f"step_{layer}_i_{chunk_idx}.npz"), sparse_i
)
del sparse_e, sparse_i
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Combine all data -----------------
print("Combining data from all chunks")
stepse = []
stepsi = []
# if save_path isn't already a directory, make it
if save_to_disk and not os.path.exists(save_path):
os.makedirs(save_path)
for layer in tqdm(range(target_layer_number)):
# first combine the es ----
rows = []
cols = []
data = []
for chunk_idx in range(num_chunks):
# load the chunked data from disk
sparse_e = sp.sparse.load_npz(
os.path.join(temp_dir, f"step_{layer}_e_{chunk_idx}.npz")
)
# adjust the row indices to account for the chunk offset
sparse_e.row += chunk_idx * chunkSize
# append the data to the lists
rows.append(sparse_e.row)
cols.append(sparse_e.col)
data.append(sparse_e.data)
# delete the sparse matrix to save memory
del sparse_e
gc.collect()
# combine the data
rows = np.concatenate(rows)
cols = np.concatenate(cols)
data = np.concatenate(data)
# create the sparse matrix
sparse_e = sp.sparse.coo_matrix(
(data, (rows, cols)), shape=(matrix_size, matrix_size)
).tocsc()
sparse_e.eliminate_zeros()
# save the sparse matrix to disk
if save_to_disk:
sp.sparse.save_npz(os.path.join(save_path, f"step_{layer}_e.npz"), sparse_e)
if return_results:
stepse.append(sparse_e)
# delete the sparse matrix to save memory
del sparse_e
gc.collect()
# now combine the is ----
rows = []
cols = []
data = []
for chunk_idx in range(num_chunks):
# load the chunked data from disk
sparse_i = sp.sparse.load_npz(
os.path.join(temp_dir, f"step_{layer}_i_{chunk_idx}.npz")
)
# adjust the row indices to account for the chunk offset
sparse_i.row += chunk_idx * chunkSize
# append the data to the lists
rows.append(sparse_i.row)
cols.append(sparse_i.col)
data.append(sparse_i.data)
# delete the sparse matrix to save memory
del sparse_i
gc.collect()
# combine the data
rows = np.concatenate(rows)
cols = np.concatenate(cols)
data = np.concatenate(data)
# create the sparse matrix
sparse_i = sp.sparse.coo_matrix(
(data, (rows, cols)), shape=(matrix_size, matrix_size)
).tocsc()
sparse_i.eliminate_zeros()
if return_results:
stepsi.append(sparse_i)
# save the sparse matrix to disk
if save_to_disk:
sp.sparse.save_npz(os.path.join(save_path, f"step_{layer}_i.npz"), sparse_i)
# delete the sparse matrix to save memory
del sparse_i
gc.collect()
# remove the temporary directory
for layer in range(target_layer_number):
for chunk_idx in range(num_chunks):
os.remove(os.path.join(temp_dir, f"step_{layer}_e_{chunk_idx}.npz"))
os.remove(os.path.join(temp_dir, f"step_{layer}_i_{chunk_idx}.npz"))
os.rmdir(temp_dir)
# remove all variables
del direct_e_sender, direct_i_sender
torch.cuda.empty_cache()
# clear the cache
gc.collect()
return stepse, stepsi
[docs]
def compress_paths_signed_no_chunking(
inprop,
idx_to_sign,
target_layer_number,
threshold=0,
output_threshold=1e-4,
root=False,
):
"""Calculates the excitatory and inhibitory influences across specified
layers of a neural network, using PyTorch for GPU acceleration. This
function processes a connectivity matrix (where presynaptic neurons are
represented by rows and postsynaptic neurons by columns) to distinguish and
compute the influence of excitatory and inhibitory neurons at each layer.
Args:
inprop (scipy.sparse.csc_matrix): The initial connectivity matrix
representing direct connections between adjacent layers.
idx_to_sign (dict): A dictionary mapping neuron indices to their types
(1 for excitatory, -1 for inhibitory), used to differentiate between
excitatory and inhibitory influences.
target_layer_number (int): The number of layers through which to
calculate influences, starting from the second layer (with the first
layer's influence, the direct connectivity, being defined by `inprop`).
threshold (float, optional): A value to threshold the influences;
influences below this value are set to zero, and not passed on in
future layers. Defaults to 0.
output_threshold (float, optional): A threshold for the final output
to reduce memory usage, with values below this threshold set to zero.
Defaults to 1e-4.
root (bool, optional): Whether to take the nth root of the output.
This can be understood as "the average direct connection strength"
(when root=True), as opposed to "the proportion of influence among all
partners n steps away" (when root=False). Defaults to False.
Returns:
Tuple[List[scipy.sparse.csc_matrix], List[scipy.sparse.csc_matrix]]:
Two lists of sparse matrices representing the excitatory and
inhibitory influences, respectively, up to the specified target
layer.
Note:
This function is ideal with GPU support. Ensure your environment
supports CUDA and that PyTorch is correctly installed.
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
inprop_tensor = torch.tensor(inprop.toarray(), device=device, dtype=torch.float32)
# Create masks for excitatory and inhibitory neurons
n_neurons = inprop_tensor.shape[0]
excitatory_mask = torch.tensor(
[1 if idx_to_sign[i] == 1 else 0 for i in range(n_neurons)],
dtype=torch.float32,
device=device,
)
inhibitory_mask = torch.tensor(
[1 if idx_to_sign[i] == -1 else 0 for i in range(n_neurons)],
dtype=torch.float32,
device=device,
)
print(
"Number of excitatory neurons: "
+ str(excitatory_mask.unique(return_counts=True)[1][1].cpu().numpy())
)
print(
"Number of inhibitory neurons: "
+ str(inhibitory_mask.unique(return_counts=True)[1][1].cpu().numpy())
)
lne = torch.matmul(torch.diag(excitatory_mask), inprop_tensor)
lni = torch.matmul(torch.diag(inhibitory_mask), inprop_tensor)
lne_initial = lne.clone()
lni_initial = lni.clone()
steps_excitatory = []
steps_inhibitory = []
for layer in tqdm(range(target_layer_number)):
if layer != 0:
# if layer ==0, return the direct excitatory and inhibitory
# connections separately
lne_new = torch.matmul(lne, lne_initial) + torch.matmul(lni, lni_initial)
lni = torch.matmul(lne, lni_initial) + torch.matmul(lni, lne_initial)
# Apply thresholding
if threshold > 0:
lne_new = torch_sparse_where(lne_new, threshold)
lni = torch_sparse_where(lni, threshold)
# Dynamic representation based on density
lne = dynamic_representation(lne_new)
lni = dynamic_representation(lni)
torch.cuda.empty_cache()
# possible alternative implementation: first root and threshold, then
# move to CPU. But not sure if can root sparse tensor. So:
stepe_csc = tensor_to_csc(lne.to("cpu"))
stepi_csc = tensor_to_csc(lni.to("cpu"))
if root:
stepe_csc.data = np.power(stepe_csc.data, 1 / (layer + 1))
stepi_csc.data = np.power(stepi_csc.data, 1 / (layer + 1))
# then threshold
if output_threshold > 0:
stepe_csc.data = np.where(
stepe_csc.data >= output_threshold, stepe_csc.data, 0
)
stepi_csc.data = np.where(
stepi_csc.data >= output_threshold, stepi_csc.data, 0
)
stepe_csc.eliminate_zeros()
stepi_csc.eliminate_zeros()
steps_excitatory.append(stepe_csc)
steps_inhibitory.append(stepi_csc)
torch.cuda.empty_cache()
return steps_excitatory, steps_inhibitory
[docs]
def add_first_n_matrices(matrices, n):
"""Adds the first N connectivity matrices from a list, supporting different
path lengths. This function is designed to work with scipy sparse matrices,
and dense numpy matrices. Each matrix in the list represents connectivity
information for a specific path length.
Args:
matrices (list): A list of connectivity matrices of different path
lengths. The matrices can be scipy sparse matrices, or dense numpy
arrays. The function expects all matrices in the list to be of the
same type and shape.
n (int): The number of initial matrices in the list to be summed. This
number must not exceed the length of the matrices list.
Returns:
matrix:
The resulting matrix after summing the first N matrices. The type of the
returned matrix matches the type of the input matrices (scipy sparse matrix,
or numpy array).
Raises:
ValueError: If the list of matrices is empty or if n is larger than
the number of matrices available in the list.
Example:
>>> from scipy.sparse import csc_matrix
>>> matrices = [csc_matrix([[1, 2], [3, 4]]),
csc_matrix([[5, 6], [7, 8]]),
csc_matrix([[9, 10], [11, 12]])]
>>> n = 2
>>> result_matrix = add_first_n_matrices(matrices, n)
>>> print(result_matrix.toarray())
[[ 6 8]
[10 12]]
Note:
Ensure that all matrices in the list are of compatible types and
shapes before using this function.
"""
if not matrices:
raise ValueError("The list of matrices is empty")
if n > len(matrices):
raise ValueError("n is larger than the number of matrices available")
sum_matrix = matrices[0].copy()
for i in range(1, n):
sum_matrix += matrices[i]
return sum_matrix
[docs]
def connectivity_summary(
stepsn: Union[spmatrix, np.ndarray],
inidx: arrayable,
outidx: arrayable,
inidx_map: dict | None = None,
outidx_map: dict | None = None,
display_output: bool = True,
display_threshold: float = 1e-3,
threshold_axis: str = "row",
sort_within: str = "column",
sort_names: str | List | None = None,
pre_in_column: bool = False,
include_undefined_groups: bool = False,
outprop: bool = False,
combining_method: str = "mean",
return_long: bool = False,
):
"""
Generates a summary of connectivity from `inidx` to `outidx`, grouped by `inidx_map`
and `outidx_map`, respectively. By default, it displays the total input across
single cells of the same type, to an average post-synaptic cell.
Args:
stepsn (scipy.sparse matrix or numpy.ndarray): Matrix representing the synaptic
strengths between neurons, can be dense or sparse. Pres are in the rows.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array of indices
representing the input (presynaptic) neurons, used to subset stepsn. nan
values are removed.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array of
indices representing the output (postsynaptic) neurons.
inidx_map (dict, optional): Mapping from indices to neuron groups for the input
neurons. Defaults to None, in which case neurons are not grouped.
outidx_map (dict, optional): Mapping from indices to neuron groups for the
output neurons. Defaults to None, in which case it is set to be the same as
inidx_map.
display_output (bool, optional): Whether to display the output in a coloured
dataframe. Defaults to True.
display_threshold (float, optional): The minimum threshold for displaying the
output. Defaults to 0.
threshold_axis (str, optional): The axis to apply the display_threshold to.
Defaults to 'row' (removing entire rows if no value exceeds
display_threshold).
sort_within (str, optional): The axis to sort the output in. Defaults to
'column'.
sort_names (str or list, optional): the column/row name(s) to sort the result by.
If none is provided, then sort by the first column/row.
pre_in_column (bool, optional): Whether to have the presynaptic neuron groups as
columns. Defaults to False (pre in rows, post: columns).
include_undefined_groups (bool, optional): Whether to include undefined groups
in the output. Defaults to False.
outprop (bool, optional): If True, get the summed output proportion (across
recipient single cells in the same cell type) for each average sender. If
False (default), get the summed input proportion across all senders for
each average recipient.
combining_method (str, optional): Method to combine inputs (outprop=False)
or outputs (outprop=True). Can be 'sum', 'mean', or 'median'.
Defaults to 'mean'.
return_long (bool, optional): Whether to return the connectivity summary in long
format (in which case display no longer works). Defaults to False.
Returns:
pd.DataFrame:
A dataframe representing the summed synaptic input from presynaptic neuron
groups to an average neuron in each postsynaptic neuron group. This
dataframe is always returned, regardless of the value of display_output.
Displays:
If display_output is True, the function will display a styled version
of the resulting dataframe.
"""
# ---------------------------------------------------------------------#
# Sanity checks & defaults
# ---------------------------------------------------------------------#
assert combining_method in {"mean", "median", "sum"}
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
# make sure inidx and outidx only contain integers
inidx = np.array([int(x) for x in inidx])
outidx = np.array([int(x) for x in outidx])
if inidx_map is None:
inidx_map = {idx: idx for idx in range(stepsn.shape[0])}
if outidx_map is None:
outidx_map = inidx_map
if include_undefined_groups:
inidx_map = {
k: (v if pd.notna(v) else "undefined") for k, v in inidx_map.items()
}
outidx_map = {
k: (v if pd.notna(v) else "undefined") for k, v in outidx_map.items()
}
# ---------------------------------------------------------------------#
# Ensure sparse and slice
# ---------------------------------------------------------------------#
if issparse(stepsn):
submat = stepsn.tocsc()[inidx, :][:, outidx].tocoo()
# map sub-indices back to original ids
# submat.row is the local indices of inidx
pre_ids = inidx[submat.row]
post_ids = outidx[submat.col]
weights = submat.data
edgelist = pd.DataFrame(
{
"pre_id": pre_ids,
"post_id": post_ids,
"weight": weights,
}
)
else:
submat = stepsn[inidx, :][:, outidx]
edgelist = pd.DataFrame(
{
"pre_id": np.repeat(inidx, submat.shape[1]),
"post_id": np.tile(outidx, submat.shape[0]),
"weight": submat.flatten(),
}
)
edgelist["pre_group"] = edgelist["pre_id"].map(inidx_map).astype(str)
edgelist["post_group"] = edgelist["post_id"].map(outidx_map).astype(str)
pre_group_df = pd.DataFrame(
{"pre_id": list(inidx_map.keys()), "pre_group": list(inidx_map.values())}
)
pre_group_df["pre_group"] = pre_group_df["pre_group"].astype(str)
post_group_df = pd.DataFrame(
{"post_id": list(outidx_map.keys()), "post_group": list(outidx_map.values())}
)
post_group_df["post_group"] = post_group_df["post_group"].astype(str)
# unique (pre_group, post_group) pairs that exist in the data
pregrp_postgrp = edgelist[["pre_group", "post_group"]].drop_duplicates()
# ---------------------------------------------------------------------#
# Aggregate according to outprop / combining_method
# ---------------------------------------------------------------------#
if not outprop:
# INPUT-centred: average neuron in *post* group
# total input from that type, for the e.g. average/median postsynaptic neuron
per_post_neuron = (
edgelist.groupby(["pre_group", "post_id"], sort=False)["weight"]
.sum()
.reset_index()
)
per_post_neuron["post_group"] = (
per_post_neuron["post_id"].map(outidx_map).astype(str)
)
# for each such pair, expand to all post_ids in that post_group
scaffold = pregrp_postgrp.merge(post_group_df, on="post_group", how="left")
per_post_neuron = scaffold.merge(
per_post_neuron, on=["pre_group", "post_group", "post_id"], how="left"
).fillna(0)
# combine with post_group_df to make sure all post_ids from the post_groups are included
# per_post_neuron = (
# post_group_df[post_group_df.post_group.isin(per_post_neuron.post_group)]
# .merge(per_post_neuron, how="outer", on=["post_group", "post_id"])
# .fillna(0)
# )
result = per_post_neuron.groupby(["pre_group", "post_group"], sort=False)[
"weight"
].agg(combining_method)
else:
# OUTPUT-centred: average neuron in *pre* group
# output proportion from an average source to a target type
per_pre_neuron = (
edgelist.groupby(["pre_id", "post_group"], sort=False)["weight"]
.sum()
.reset_index()
)
per_pre_neuron["pre_group"] = (
per_pre_neuron["pre_id"].map(inidx_map).astype(str)
)
# for each such pair, expand to all post_ids in that post_group
scaffold = pregrp_postgrp.merge(pre_group_df, on="pre_group", how="left")
# left-join actual weights
per_pre_neuron = scaffold.merge(
per_pre_neuron, on=["pre_group", "post_group", "pre_id"], how="left"
).fillna(0)
result = per_pre_neuron.groupby(["pre_group", "post_group"], sort=False)[
"weight"
].agg(combining_method)
if not return_long:
result = result.unstack(fill_value=0)
# ---------------------------------------------------------------------#
# Layout adjustments
# ---------------------------------------------------------------------#
if pre_in_column:
result = result.T
# threshold filtering
if display_threshold > 0:
if threshold_axis == "row":
result = result[(np.abs(result) >= display_threshold).any(axis=1)]
elif threshold_axis == "column":
result = result.loc[
:, (np.abs(result) >= display_threshold).any(axis=0)
]
else:
raise ValueError("threshold_axis must be 'column' or 'row'.")
if result.empty:
raise ValueError(
"No values left after applying the display_threshold; lower the threshold."
)
# sorting
if sort_within == "column":
if sort_names is None:
result = result.sort_values(by=result.columns[0], ascending=False)
else:
sort_names = [sort_names] if isinstance(sort_names, str) else sort_names
if set(sort_names).issubset(result.columns):
result = result.sort_values(by=sort_names, ascending=False)
else:
raise ValueError("sort_names must be present in outidx_map values.")
elif sort_within == "row":
result = result.loc[
np.abs(result).mean(axis=1).sort_values(ascending=False).index
]
if sort_names is None:
result = result.sort_values(by=result.index[0], axis=1, ascending=False)
else:
sort_names = [sort_names] if isinstance(sort_names, str) else sort_names
if set(sort_names).issubset(result.index):
result = result.sort_values(by=sort_names, axis=1, ascending=False)
else:
raise ValueError("sort_names must be present in inidx_map values.")
else:
raise ValueError("sort_within must be 'column' or 'row'.")
# ---------------------------------------------------------------------#
# Display (only if wide)
# ---------------------------------------------------------------------#
if display_output and not return_long:
styled = result.style.background_gradient(
cmap="Blues",
vmin=np.abs(result).min().min(),
vmax=np.abs(result).max().max(),
)
display(styled)
else: # return_long
result = (
result[result > display_threshold]
.reset_index()
.rename(columns={"weight": "value"})
)
if result.empty:
raise ValueError(
"No values left after applying the display_threshold; lower the threshold."
)
return result
[docs]
def result_summary(
stepsn,
inidx: arrayable,
outidx: arrayable,
inidx_map: dict | None = None,
outidx_map: dict | None = None,
display_output: bool = True,
display_threshold: float = 1e-3,
threshold_axis: str = "row",
sort_within: str = "column",
sort_names: str | List | None = None,
pre_in_column: bool = False,
include_undefined_groups: bool = False,
outprop: bool = False,
combining_method: str = "mean",
):
"""Generates a summary of connections between different types of neurons,
represented by their input and output indexes. The function calculates the
total synaptic input from presynaptic neuron groups to an average neuron in
each postsynaptic neuron group.
Args:
stepsn (scipy.sparse matrix or numpy.ndarray): Matrix representing the
synaptic strengths between neurons, can be dense or sparse.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array
of indices representing the input (presynaptic) neurons, used to
subset stepsn. nan values are removed.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array
of indices representing the output (postsynaptic) neurons.
inidx_map (dict, optional): Mapping from indices to neuron groups for
the input neurons. Defaults to None, in which case neurons are not
grouped.
outidx_map (dict, optional): Mapping from indices to neuron groups for
the output neurons. Defaults to None, in which case it is set to
be the same as inidx_map.
display_output (bool, optional): Whether to display the output in a
coloured dataframe. Defaults to True.
display_threshold (float, optional): The minimum threshold for
displaying the output. Defaults to 0.
threshold_axis (str, optional): The axis to apply the display_threshold to.
Defaults to 'row' (removing entire rows if no value exceeds
display_threshold).
sort_within (str, optional): The axis to sort the output in. Defaults
to 'column'.
sort_names (str or list, optional): the column/row name(s) to sort the
result by. If none is provided, then sort by the first column/row.
pre_in_column (bool, optional): Whether to have the presynaptic neuron
groups as columns. Defaults to False (pre in rows, post: columns).
include_undefined_groups (bool, optional): Whether to include
undefined groups in the output. Defaults to False.
outprop (bool, optional): If True, get the summed output proportion (across
recipient single cells in the same cell type) for each average sender. If
False (default), get the summed input proportion across all senders for
each average recipient.
combining_method (str, optional): Method to combine inputs (outprop=False)
or outputs (outprop=True). Can be 'sum', 'mean', or 'median'.
Defaults to 'mean'.
Returns:
pd.DataFrame:
A dataframe representing the summed synaptic input from presynaptic neuron
groups to an average neuron in each postsynaptic neuron group. This
dataframe is always returned, regardless of the value of display_output.
Displays:
If display_output is True, the function will display a styled version
of the resulting dataframe.
"""
assert combining_method in ["mean", "median", "sum"], (
"The combining_method should be either 'mean', 'median', or 'sum'. "
f"Currently it is {combining_method}."
)
print(
"Feel free to try `connectivity_summary()` - the same function with less memory usage!"
)
if inidx_map is None:
inidx_map = {idx: idx for idx in range(stepsn.shape[0])}
if outidx_map is None:
outidx_map = inidx_map
# remove nan values in inidx and outidx
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
# make sure inidx and outidx only contain integers
inidx = np.array([int(x) for x in inidx])
outidx = np.array([int(x) for x in outidx])
if issparse(stepsn):
# if stepsn is coo, turn into csc
if stepsn.format == "coo":
stepsn = stepsn.tocsc()
matrix = stepsn[:, outidx][inidx, :].toarray()
else:
matrix = stepsn[inidx, :][:, outidx]
if include_undefined_groups:
# fill the nan values in inidx_map (e.g. 17726: nan) and outidx_map
# with 'undefined'
inidx_map = {k: v if pd.notna(v) else "undefined" for k, v in inidx_map.items()}
outidx_map = {
k: v if pd.notna(v) else "undefined" for k, v in outidx_map.items()
}
# Create the dataframe
df = pd.DataFrame(
data=matrix,
# choose what to group by here
# if idx is mapped to root_id, if root_id is kept as int64, the
# root_ids seem a bit messed up
index=[str(inidx_map[key]) for key in inidx],
columns=[str(outidx_map[key]) for key in outidx],
)
if not outprop:
# Sum across rows: presynaptic neuron is in the rows
# summing across neurons of the same type: total amount of input from that
# type for the postsynaptic neuron
summed_df = df.groupby(df.index).sum()
# Average across columns and transpose back
# averaging across columns of the same type:
# on average, a neuron of that type receives x% input from a presynaptic type
if combining_method == "sum":
result_df = summed_df.T.groupby(level=0).sum().T
elif combining_method == "mean":
result_df = summed_df.T.groupby(level=0).mean().T
elif combining_method == "median":
result_df = summed_df.T.groupby(level=0).median().T
else:
# calculate the output proportion from an average source neuron to a target type
# so first sum across columns
summed_df = df.T.groupby(level=0).sum().T
# then average across rows
if combining_method == "sum":
result_df = summed_df.groupby(summed_df.index).sum()
elif combining_method == "mean":
result_df = summed_df.groupby(summed_df.index).mean()
elif combining_method == "median":
result_df = summed_df.groupby(summed_df.index).median()
if pre_in_column:
result_df = result_df.T
if display_threshold > 0:
if threshold_axis == "row":
# only display rows where any value >= display_threshold
result_df = result_df[(np.abs(result_df) >= display_threshold).any(axis=1)]
elif threshold_axis == "column":
# only display columns where any value >= display_threshold
result_df = result_df.loc[
:, (np.abs(result_df) >= display_threshold).any(axis=0)
]
else:
raise ValueError("threshold_axis must be either 'column' or 'row'.")
# if there is nothing left
if result_df.empty:
raise ValueError(
"No values left after applying the display_threshold. "
"Try setting display_threshold to 0."
)
if sort_within == "column":
if sort_names is None:
# sort result_df by the values in the first column, in descending
# order
result_df = result_df.sort_values(
by=np.abs(result_df).columns[0], ascending=False
)
elif isinstance(sort_names, str):
sort_names = [sort_names]
if sort_names is not None:
if set(sort_names).issubset(result_df.columns):
result_df = result_df.sort_values(by=sort_names, ascending=False)
else:
raise ValueError(
"sort_names must be present in the values of outidx_map."
)
elif sort_within == "row":
# first sort rows by average row value in descending order
result_df = result_df.loc[
np.abs(result_df).mean(axis=1).sort_values(ascending=False).index
]
if sort_names is None:
# sort result_df by the values in the first column, in descending
# order
result_df = result_df.sort_values(
by=result_df.index[0], axis=1, ascending=False
)
elif isinstance(sort_names, str):
sort_names = [sort_names]
if sort_names is not None:
if set(sort_names).issubset(result_df.index):
result_df = result_df.sort_values(
by=sort_names, axis=1, ascending=False
)
else:
raise ValueError(
"sort_names must be present in the values of inidx_map."
)
else:
raise ValueError("sort_within must be either 'column' or 'row'.")
if display_output:
result_dp = result_df.style.background_gradient(
cmap="Blues",
vmin=np.abs(result_df).min().min(),
vmax=np.abs(result_df).max().max(),
)
display(result_dp)
return result_df
[docs]
def contribution_by_path_lengths_data(
steps,
inidx: arrayable,
outidx: arrayable,
outidx_map: dict | None = None,
inidx_map: dict | None = None,
):
"""Calculates the contribution from all of inidx (grouped by inidx_map) to an
average outidx (grouped by outidx_map) over different path lengths. Either inidx_map
or outidx_map, but not both, should be provided. If neither is provided, presynaptic
neurons are grouped taoogether. Direct connections are in path_length 1.
Args:
steps (list of scipy.sparse matrices or numpy.array): List of sparse matrices,
each representing synaptic strengths for a specific path length.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array of indices
representing input (presynaptic) neurons.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array of
indices representing output (postsynaptic) neurons.
outidx_map (dict): Mapping from indices to postsynaptic neuron groups. Only one
of inidx_map and outidx_map should be specified.
inidx_map (dict): Mapping from indices to presynaptic neuron groups. Only one of
inidx_map and outidx_map should be specified.
Returns:
pd.DataFrame:
A DataFrame containing the contributions from presynaptic neurons to
postsynaptic neurons over different path lengths. The DataFrame has three
columns: 'path_length', 'presynaptic_type' (or 'postsynaptic_type'), and
'value'.
"""
# remove nan values in inidx and outidx
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
# check if both inidx_map and outidx_map are provided
if inidx_map is not None and outidx_map is not None:
raise ValueError(
"Only one of inidx_map and outidx_map should be specified. "
"If you want to keep both, use "
"contribution_by_path_lengths_heatmap()."
)
if inidx_map is None and outidx_map is None:
outidx_map = {idx: idx for idx in outidx}
# give message that pres are grouped together
print(
"Neither inidx_map nor outidx_map provided. By default "
"presynaptic neurons are grouped together."
)
rows = []
for step in steps:
if hasattr(step, "toarray"):
data = step[:, outidx][inidx, :].toarray()
else:
data = step[inidx, :][:, outidx]
if inidx_map is not None:
df = pd.DataFrame(
data=data,
index=[inidx_map[key] for key in inidx],
)
# average of all columns
# then groupby index, and take the sum
# average post from all pre
df = df.mean(axis=1).groupby(level=0).sum().to_frame().T
rows.append(df)
elif outidx_map is not None:
df = pd.DataFrame(
data=data,
columns=[outidx_map[key] for key in outidx],
)
# sum all rows
# then groupby column, and take the mean
# average post from all pre
df = df.sum().groupby(level=0).mean().to_frame().T
rows.append(df)
# first have a dataframe where: each row is a path length, each column is
# a postsynaptic cell type
# then pivot_wide to long
# index is the path length
# variable is postynaptic cell type
# value is y axis
contri = pd.concat(rows, ignore_index=True).melt(ignore_index=False).reset_index()
if inidx_map is not None:
contri.columns = ["path_length", "presynaptic_type", "value"]
else:
contri.columns = ["path_length", "postsynaptic_type", "value"]
contri.path_length = contri.path_length + 1
return contri
[docs]
def contribution_by_path_lengths(
steps,
inidx: arrayable,
outidx: arrayable,
outidx_map: dict | None = None,
inidx_map: dict | None = None,
width: int = 800,
height: int = 400,
):
"""Plots the connection strength from all of inidx (grouped by inidx_map) to an
average outidx (grouped by outidx_map) over different path lengths. Either inidx_map
or outidx_map, but not both, should be provided. If neither is provided, presynaptic
neurons are grouped together. Direct connections are in path_length 1.
Args:
steps (list of scipy.sparse matrices or numpy.array): List of sparse matrices,
each representing synaptic strengths for a specific path length.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array of indices
representing input (presynaptic) neurons.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array of
indices representing output (postsynaptic) neurons.
outidx_map (dict): Mapping from indices to postsynaptic neuron groups. Only one
of inidx_map and outidx_map should be specified.
inidx_map (dict): Mapping from indices to presynaptic neuron groups. Only one of
inidx_map and outidx_map should be specified.
width (int, optional): The width of the plot. Defaults to 800.
height (int, optional): The height of the plot. Defaults to 400.
Returns:
None:
Displays an interactive line plot showing the connection strength from all
of inidx to an average outidx over different path lengths.
"""
contri = contribution_by_path_lengths_data(
steps,
inidx,
outidx,
outidx_map=outidx_map,
inidx_map=inidx_map,
)
fig = px.line(
contri,
x="path_length",
y="value",
color=contri.columns[1],
width=width,
height=height,
)
fig.update_layout(
xaxis=dict(tickmode="linear", tick0=1, dtick=1),
yaxis=dict(tickmode="linear", tick0=0, dtick=contri.value.max() / 5),
)
# fig.show()
return fig
[docs]
def contribution_by_path_lengths_heatmap(
steps,
inidx,
outidx,
inidx_map=None,
outidx_map=None,
sort_by_index=True,
sort_names=None,
pre_in_column=False,
display_threshold=0,
cmap="viridis",
figsize=(30, 15),
):
"""Display the contribution from inidx to outidx, grouped by inidx_map and
outidx_map, across different path lengths.
Args:
steps (list of scipy.sparse matrices): List of sparse matrices, each
representing synaptic strengths for a specific path length.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array
of indices representing input (presynaptic) neurons.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array
of indices representing output (postsynaptic) neurons.
inidx_map (dict, optional): Mapping from indices to input neuron
groups. Defaults to None, in which case neurons are not grouped.
outidx_map (dict, optional): Mapping from indices to output neuron
groups. Defaults to None, in which case it is set to be the same
as inidx_map.
sort_by_index (bool, optional): Whether to sort the output by index.
Defaults to True.
sort_names (str or list, optional): the column name(s) to sort the
result by. If none is provided, then sort by the first column.
pre_in_column (bool, optional): Whether to have the presynaptic neuron
groups as columns. Defaults to False (pre in rows, post: columns).
display_threshold (float, optional): The threshold for displaying the
output. Defaults to 0.
cmap (str, optional): The colormap to use for the heatmap. Defaults to
'viridis'.
figsize (tuple, optional): The size of the figure to display. Defaults
to (30, 15).
Returns:
None:
Displays an interactive heatmap showing the contribution from inidx to
outidx, grouped by inidx_map and outidx_map, across different path lengths.
"""
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
if inidx_map is None:
inidx_map = {idx: idx for idx in inidx}
if outidx_map is None:
outidx_map = inidx_map
heatmaps = []
for step in tqdm(steps):
df = connectivity_summary(
step,
inidx,
outidx,
inidx_map,
outidx_map,
display_output=False,
sort_names=sort_names,
pre_in_column=pre_in_column,
display_threshold=display_threshold,
)
if sort_by_index:
df.sort_index(inplace=True)
heatmaps.append(df)
slider = widgets.IntSlider(
value=1,
min=1,
max=len(heatmaps),
step=1,
description="Path length",
continuous_update=True,
)
def plot_heatmap(index):
plt.figure(figsize=figsize)
# plt.imshow(heatmaps[index], cmap='viridis', aspect = 'auto')
# Use seaborn's heatmap function which is a higher-level API for
# Matplotlib's imshow
sns.heatmap(
heatmaps[index - 1],
annot=True,
fmt=".2f",
linewidths=0.5,
cmap=cmap,
)
# Rotate the tick labels for the columns to show them better
plt.xticks(rotation=90)
plt.yticks(rotation=0)
# Show the heatmap
plt.show()
# Link the slider to the plotting function
display(widgets.interactive(plot_heatmap, index=slider))
[docs]
def conn_by_path_length_data(
inprop: spmatrix,
inidx: arrayable,
outidx: arrayable,
n: int,
outidx_map: Optional[dict] = None,
inidx_map: Optional[dict] = None,
intermediate_group: Optional[dict] = None,
combining_method: str = "mean",
wide: bool = False,
chunk_size: int = 2000,
):
"""Calculates the connectivity from all of inidx (grouped by inidx_map) to outidx
(grouped by outidx_map) within `n` hops, aggregated by `combining_method`. If
neither is provided, nothing is grouped together. Direct connections are in
path_length 1.
Args:
inprop (spmatrix): The connectivity matrix, with presynaptic in the rows.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): The source
indices.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): The target
indices.
n (int): The maximum number of hops. n=1 for direct connections.
outidx_map (dict): Mapping from indices to postsynaptic neuron groups. Only one
of inidx_map and outidx_map should be specified.
inidx_map (dict): Mapping from indices to presynaptic neuron groups. Only one of
inidx_map and outidx_map should be specified.
intermediate_group (dict, optional): Mapping from indices to neuron groups for
intermediate neurons. If None, it will be inherited from one of inidx_map
or outidx_map (whichever is not None).
combining_method (str, optional): Method to combine inputs or outputs. Can be
'mean', 'median', or 'sum'. Defaults to 'mean'.
wide (bool, optional): Whether to return the result in wide format. If False,
the result will be in long format. Defaults to False.
chunk_size (int, optional): The chunk size to use when processing large
datasets. Defaults to 2000.
Returns:
List[pd.DataFrame] | pd.DataFrame:
If one of outidx_map and inidx_map is provided, a DataFrame containing the
three columns: 'path_length', 'post' (or 'pre'), and 'weight'. If both are
provided, a list of DataFrames, where each one is the connectivity of a
specific path length.
"""
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
# if no grouping dict provided, group all together
if inidx_map is None:
inidx_map = {idx: "group" for idx in inidx}
inidx_map_is_none = True
else:
inidx_map_is_none = False
if outidx_map is None:
outidx_map = {idx: "group" for idx in outidx}
outidx_map_is_none = True
else:
outidx_map_is_none = False
# but if neither is provided, then don't group anything
if inidx_map_is_none and outidx_map_is_none:
inidx_map = {idx: idx for idx in range(inprop.shape[0])}
outidx_map = {idx: idx for idx in range(inprop.shape[0])}
if intermediate_group is None:
# inherit from the one that's not None
if inidx_map_is_none and not outidx_map_is_none:
intermediate_group = outidx_map
elif outidx_map_is_none and not inidx_map_is_none:
intermediate_group = inidx_map
# if not inidx_map_is_none and not outidx_map_is_none:
# # if both are provided, then follow pre_group - already specified in group_paths
# intermediate_group = inidx_map
# if neither are provided, taken care of above
rows = []
for i in tqdm(range(n)):
paths = find_paths_of_length(inprop, inidx, outidx, i + 1)
paths = group_paths(
paths,
inidx_map,
outidx_map,
intermediate_group=intermediate_group,
combining_method=combining_method,
)
if paths is not None and not paths.empty:
# has colunms: pre, post, weight
df = effective_conn_from_paths(
paths,
wide=False,
chunk_size=chunk_size,
)
df.loc[:, ["path_length"]] = i + 1
rows.append(df)
# Handle case when no paths exist at all
if not rows:
# Create empty DataFrame with appropriate structure
if not inidx_map_is_none and not outidx_map_is_none:
# Both maps: return list of empty DataFrames
if wide:
# Return empty DataFrames with appropriate index/columns structure
return [pd.DataFrame() for _ in range(n)]
else:
# Add path_length column for long format
return [
pd.DataFrame(columns=["pre", "post", "weight", "path_length"])
for _ in range(n)
]
else:
# Single map: return empty DataFrame with path_length column
if inidx_map_is_none:
df = pd.DataFrame(columns=["path_length", "post", "weight"])
else:
df = pd.DataFrame(columns=["path_length", "pre", "weight"])
if wide:
# Return empty pivoted DataFrame
return pd.DataFrame()
return df
contri = pd.concat(rows, ignore_index=True)
if inidx_map_is_none:
contri = contri[["path_length", "post", "weight"]]
elif outidx_map_is_none:
contri = contri[["path_length", "pre", "weight"]]
if contri.shape[1] == 3:
# fill the empty path_length-pre/post weight with 0
# first make a df of all combinations of path_length and pre/post
contri_full = pd.DataFrame(
product(range(1, n + 1), contri.iloc[:, 1].unique()),
columns=["path_length", contri.columns[1]],
)
contri = pd.merge(
contri_full,
contri,
on=["path_length", contri.columns[1]],
how="left",
)
contri.fillna(0, inplace=True)
if wide:
# make wider
contri = contri.pivot(
index="path_length", columns=contri.columns[1], values="weight"
)
return contri
else:
contri_all = []
for plength in range(1, n + 1):
contri_plength = contri[contri["path_length"] == plength]
contri_plength_allcombo = pd.DataFrame(
product(contri.pre.unique(), contri_plength.post.unique()),
columns=["pre", "post"],
)
contri_plength = pd.merge(
contri_plength_allcombo,
contri_plength,
on=["pre", "post"],
how="left",
)
contri_plength.fillna(0, inplace=True)
# make wider
if wide:
contri_plength = contri_plength.pivot(
index="pre", columns="post", values="weight"
)
# if not wide, has columns: pre, post, weight, path_length
contri_all.append(contri_plength)
return contri_all
[docs]
def conn_by_path_length(
inprop: spmatrix,
inidx: arrayable,
outidx: arrayable,
n: int,
outidx_map: Optional[dict] = None,
inidx_map: Optional[dict] = None,
intermediate_group: Optional[dict] = None,
combining_method: str = "mean",
width: int = 800,
height: int = 400,
):
"""Plots the connectivity from all of inidx (grouped by inidx_map) to outidx
(grouped by outidx_map) within `n` hops, aggregated by `combining_method`. Either
inidx_map or outidx_map, but not both, should be provided. If neither is provided,
presynaptic neurons are grouped together. Direct connections are in path_length 1.
Args:
inprop (spmatrix): The connectivity matrix, with presynaptic in the rows.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): The source indices.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): The target
indices.
n (int): The maximum number of hops. n=1 for direct connections.
outidx_map (dict): Mapping from indices to postsynaptic neuron groups. Only one
of inidx_map and outidx_map should be specified.
inidx_map (dict): Mapping from indices to presynaptic neuron groups. Only one of
inidx_map and outidx_map should be specified.
intermediate_group (dict, optional): Mapping from indices to neuron groups for
intermediate neurons. If None, it will be inherited from one of inidx_map or
outidx_map (whichever is not None).
combining_method (str, optional): Method to combine inputs or outputs. Can be
'mean', 'median', or 'sum'. Defaults to 'mean'.
width (int, optional): The width of the plot. Defaults to 800.
height (int, optional): The height of the plot. Defaults to 400.
Returns:
None:
Displays an interactive line plot showing the connection strength from all
of inidx to an average outidx over different path lengths.
"""
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
# check if both inidx_map and outidx_map are provided
if inidx_map is not None and outidx_map is not None:
raise ValueError(
"Only one of inidx_map and outidx_map should be specified. "
"If you want to keep both, use "
"contribution_by_path_lengths_heatmap()."
)
if inidx_map is None and outidx_map is None:
outidx_map = {idx: idx for idx in outidx}
# give message that pres are grouped together
print(
"Neither inidx_map nor outidx_map provided. By default "
"presynaptic neurons are grouped together."
)
contri = conn_by_path_length_data(
inprop,
inidx,
outidx,
n,
inidx_map=inidx_map,
outidx_map=outidx_map,
intermediate_group=intermediate_group,
combining_method=combining_method,
)
fig = px.line(
contri,
x="path_length",
y="weight",
color=contri.columns[1],
width=width,
height=height,
)
fig.update_layout(
xaxis=dict(tickmode="linear", tick0=1, dtick=1),
yaxis=dict(tickmode="linear", tick0=0, dtick=contri.weight.max() / 5),
)
return fig
[docs]
def conn_by_path_length_heatmap(
inprop: spmatrix,
inidx: arrayable,
outidx: arrayable,
n: int,
outidx_map: Optional[dict] = None,
inidx_map: Optional[dict] = None,
threshold: float = 0,
combining_method: str = "mean",
cmap="viridis",
annot=True,
figsize=(30, 15),
):
"""Display the connectivity from all of inidx (grouped by inidx_map) to outidx
(grouped by outidx_map) within `n` hops, aggregated by `combining_method`.
Args:
inprop (spmatrix): The connectivity matrix, with presynaptic in the rows.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): The source indices.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): The target
indices.
n (int): The maximum number of hops. n=1 for direct connections.
outidx_map (dict): Mapping from indices to postsynaptic neuron groups. Only one
of inidx_map and outidx_map should be specified.
inidx_map (dict): Mapping from indices to presynaptic neuron groups. Only one of
inidx_map and outidx_map should be specified.
combining_method (str, optional): Method to combine inputs or outputs. Can be
'mean', 'median', or 'sum'. Defaults to 'mean'.
cmap (str, optional): The colormap to use for the heatmap. Defaults to 'viridis'.
annot (bool, optional): Whether to annotate the heatmap with the values.
Defaults to True.
figsize (tuple, optional): The size of the figure to display. Defaults to
(30, 15).
Returns:
None:
Displays a heatmap showing the connection strength from all of inidx to an
average outidx over different path lengths with a slider.
"""
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
if inidx_map is None and outidx_map is None:
outidx_map = {idx: idx for idx in outidx}
# give message that pres are grouped together
print(
"Neither inidx_map nor outidx_map provided. By default "
"presynaptic neurons are grouped together."
)
contri = conn_by_path_length_data(
inprop,
inidx,
outidx,
n,
inidx_map=inidx_map,
outidx_map=outidx_map,
combining_method=combining_method,
) # list of dataframes with columns: path_length, pre, post, weight
if threshold != 0:
thresholded = pd.concat([df[df.weight >= threshold] for df in contri])
contri = [
df[(df.pre.isin(thresholded.pre)) & (df.post.isin(thresholded.post))]
for df in contri
]
# make wider
contri = [df.pivot(index="pre", columns="post", values="weight") for df in contri]
slider = widgets.IntSlider(
value=1,
min=1,
max=len(contri),
step=1,
description="Path length",
continuous_update=True,
)
def plot_heatmap(index):
plt.figure(figsize=figsize)
# plt.imshow(heatmaps[index], cmap='viridis', aspect = 'auto')
# Use seaborn's heatmap function which is a higher-level API for
# Matplotlib's imshow
sns.heatmap(
contri[index - 1],
annot=annot,
fmt=".2f",
linewidths=0.5,
cmap=cmap,
)
# Rotate the tick labels for the columns to show them better
plt.xticks(rotation=90)
plt.yticks(rotation=0)
# Show the heatmap
plt.show()
# Link the slider to the plotting function
display(widgets.interactive(plot_heatmap, index=slider))
[docs]
def signed_conn_by_path_length_data(
inprop: spmatrix,
inidx: arrayable,
outidx: arrayable,
n: int,
group_to_sign: dict = None,
inidx_map: Optional[dict] = None,
outidx_map: Optional[dict] = None,
intermediate_map: Optional[dict] = None,
combining_method: str = "mean",
wide: bool = True,
):
"""Calculates the signed connectivity from all of inidx to outidx within `n` hops,
grouped by inidx_map and outidx_map, aggregated by `combining_method`. The sign of
each connection is determined by `group_to_sign`. Direct connections are in
path_length 1.
Args:
inprop (spmatrix): The connectivity matrix, with presynaptic in the rows.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): The source
indices.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): The target
indices.
n (int): The maximum number of hops. n=1 for direct connections.
group_to_sign (dict): Mapping from group names (the values in inidx_map and
outidx_map, or the indices themselves if no mapping is provided) to their
signs (+1 or -1).
inidx_map (dict): Mapping from indices to presynaptic neuron groups.
outidx_map (dict): Mapping from indices to postsynaptic neuron groups.
intermediate_map (dict, optional): Mapping from indices to intermediate neuron
groups. Defaults to None.
combining_method (str, optional): Method to combine inputs or outputs. Can be
'mean', 'median', or 'sum'. Defaults to 'mean'.
wide (bool, optional): Whether to return the result in wide format. If False,
the result will be in long format. Defaults to True.
Returns:
List[pd.DataFrame], List[pd.DataFrame]:
Two lists of DataFrames. The first list contains the excitatory effective
connectivity DataFrames for each path length, and the second list contains
the inhibitory effective connectivity DataFrames for each path length.
"""
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
eees = []
iiis = []
# group_paths() turns groups into strings. So let's also make groups string here
group_to_sign = {str(k): v for k, v in group_to_sign.items()}
for path_length in tqdm(range(n)):
paths = find_paths_of_length(inprop, inidx, outidx, path_length + 1)
paths = group_paths(
paths,
inidx_map,
outidx_map,
intermediate_group=intermediate_map,
combining_method=combining_method,
)
if paths is None or paths.empty:
# make empty dataframe
exc = pd.DataFrame()
inh = pd.DataFrame()
else:
exc, inh = signed_effective_conn_from_paths(
paths,
wide=wide,
idx_to_nt=group_to_sign,
)
eees.append(exc)
iiis.append(inh)
return eees, iiis
[docs]
def effective_conn_from_paths(
paths: pd.DataFrame,
wide: bool = True,
chunk_size: int = 2000, # rows per chunk
density_threshold: float = 0.2,
use_gpu: bool = True,
root: bool = False,
quiet: bool = False,
):
"""
Calculate the effective connectivity from the paths Dataframe (which could be an
output of `find_paths_of_length()`), from the 'pre' in the earliest layer, to the
'post' in the latest layer, grouped by `pre_group`, `post_group`, and
`intermediate_group`.
Args:
paths (pd.DataFrame): DataFrame containing the paths with columns: 'pre', 'post',
'weight', and 'layer'. The 'pre' and 'post' columns represent the
presynaptic and postsynaptic neurons, respectively. The 'weight' column
represents the strength of the connection, and the 'layer' column indicates
the layer of the connection (starting from 1).
wide (bool, optional): If True, returns the result in wide format. If False,
returns in long format. Defaults to True.
chunk_size (int, optional): Number of rows to process in each chunk when using
GPU. Defaults to 2000.
density_threshold (float, optional): Threshold for converting sparse matrices to
dense. If the density of the matrix exceeds this value, it will be converted
to a dense matrix to save memory. Defaults to 0.2.
use_gpu (bool, optional): Whether to use GPU for computation. Defaults to True.
If GPU is not available, it will fall back to CPU.
Returns:
pd.DataFrame:
A DataFrame summarizing the effective connectivity between presynaptic and
postsynaptic groups.
"""
# --------------------------------------------------------------------- #
# 0. preliminaries
# --------------------------------------------------------------------- #
if len(paths) == 0:
if not quiet:
print("No paths found, returning None.")
return None
local_idx_dict = {
idx: i for i, idx in enumerate(set(paths.pre).union(set(paths.post)))
}
local_to_global_idx = {i: idx for idx, i in local_idx_dict.items()}
paths.loc[:, ["pre_idx"]] = paths.pre.map(local_idx_dict)
paths.loc[:, ["post_idx"]] = paths.post.map(local_idx_dict)
m = len(local_idx_dict)
if use_gpu:
if torch is not None and torch.cuda.is_available():
pass
else:
use_gpu = False
if not quiet:
print("GPU not available, using CPU instead.")
if not use_gpu:
for i, layer in enumerate(sorted(paths.layer.unique())):
if i == 0:
initial_el = paths[paths.layer == layer] # edgelist
csr = csr_matrix(
(
initial_el.weight,
(initial_el.pre_idx.values, initial_el.post_idx.values),
),
shape=(m, m),
dtype=np.float32,
) # make sparse matrix of the shape all_elements, all_elements
else:
el = paths[paths.layer == layer]
csr = csr @ csr_matrix(
(el.weight, (el.pre_idx.values, el.post_idx.values)),
shape=(m, m),
dtype=np.float32,
)
coo = csr.tocoo()
result_el = pd.DataFrame(
{"pre_idx": coo.row, "post_idx": coo.col, "weight": coo.data}
)
result_el.loc[:, ["pre"]] = result_el.pre_idx.map(local_to_global_idx)
result_el.loc[:, ["post"]] = result_el.post_idx.map(local_to_global_idx)
else:
device = torch.device("cuda")
with torch.no_grad():
chunk_size = min(chunk_size, m)
num_of_chunks = math.ceil(len(local_idx_dict) / chunk_size)
# pre-define sparse matrices for each layer
layer_mats = {}
for layer in sorted(paths.layer.unique()):
layer_el = paths[paths.layer == layer]
idx = torch.as_tensor(
np.vstack((layer_el.pre_idx.values, layer_el.post_idx.values)),
device=device,
dtype=torch.long,
)
val = torch.as_tensor(
layer_el.weight.values, device=device, dtype=torch.float32
)
layer_mat = torch.sparse_coo_tensor(
idx, val, (m, m), dtype=torch.float32, device=device
).coalesce()
layer_mats[layer] = layer_mat
chunk_els = []
for i in range(num_of_chunks):
start_idx = i * chunk_size
end_idx = min((i + 1) * chunk_size, len(local_idx_dict))
local_to_input_idx = {
idx: i for i, idx in enumerate(range(start_idx, end_idx))
}
input_idx_to_local = {v: k for k, v in local_to_input_idx.items()}
for layer in sorted(paths.layer.unique()):
layer_el = paths[paths.layer == layer]
if layer == paths.layer.min():
layer_el_chunk = layer_el[
(layer_el.pre_idx >= start_idx)
& (layer_el.pre_idx < end_idx)
]
layer_el_chunk.loc[:, ["pre_idx"]] = layer_el_chunk.pre_idx.map(
local_to_input_idx
)
if layer_el_chunk.empty:
mat = None
break # break out of the iteration through layers
idx = torch.as_tensor(
np.vstack(
(
# now pre_idx between 0 and chunk_size
layer_el_chunk.pre_idx.values,
# post_idx between 0 and m, which is the number of
# nodes in the paths
layer_el_chunk.post_idx.values,
)
),
device=device,
dtype=torch.long,
)
val = torch.as_tensor(
layer_el_chunk.weight.values,
device=device,
dtype=torch.float32,
)
mat = torch.sparse_coo_tensor(
idx,
val,
(len(local_to_input_idx), m),
dtype=torch.float32,
device=device,
).coalesce()
if (
mat._nnz() / (len(local_to_input_idx) * m)
) > density_threshold:
mat = mat.to_dense()
else:
mat = torch.sparse.mm(
layer_mats[layer].t().to(device), mat.t().to(device)
).t() # shape (len(local_to_input_idx), m)
# if mat isn't dense:
if mat.is_sparse:
if (
mat._nnz() / (len(local_to_input_idx) * m)
) > density_threshold:
mat = mat.to_dense()
torch.cuda.empty_cache()
if mat is None:
continue
mat = mat.to("cpu")
if mat.is_sparse:
mat = mat.coalesce().to_sparse_coo()
rows, cols = mat.indices()
vals = mat.values()
chunk_el = pd.DataFrame(
{
"pre_idx": rows.numpy(),
"post_idx": cols.numpy(),
"weight": vals.numpy(),
}
)
chunk_el.loc[:, ["pre_idx"]] = chunk_el.pre_idx.map(
input_idx_to_local
)
chunk_els.append(chunk_el)
else:
chunk_el = pd.DataFrame(
data=mat.numpy(),
index=input_idx_to_local.values(),
columns=local_to_global_idx.keys(),
)
# make longer
chunk_el = chunk_el.melt(ignore_index=False).reset_index()
chunk_el.columns = ["pre_idx", "post_idx", "weight"]
chunk_el = chunk_el[chunk_el.weight != 0]
chunk_el.loc[:, ["pre"]] = chunk_el.pre_idx.map(input_idx_to_local)
chunk_els.append(chunk_el)
del mat
# free up memory
torch.cuda.empty_cache()
result_el = pd.concat(chunk_els, ignore_index=True)
result_el["pre"] = result_el.pre_idx.map(local_to_global_idx)
result_el["post"] = result_el.post_idx.map(local_to_global_idx)
if root:
result_el["weight"] = result_el.weight ** (1 / len(paths.layer.unique()))
# --------------------------------------------------------------------- #
# 4. back to edge-list, group, pivot
# --------------------------------------------------------------------- #
if wide:
result_el = result_el.pivot(index="pre", columns="post", values="weight")
result_el.fillna(0, inplace=True)
return result_el
[docs]
def effective_conn_from_paths_cpu(
paths,
wide=True,
):
"""Calculate the effective connectivity between (groups of) neurons based
only on the provided `paths` between neurons. This function runs on CPU,
and doesn't expect a big connectivity matrix as input.
Args:
paths (pd.DataFrame): A dataframe representing the paths between neurons, with
columns 'pre', 'post', 'weight', and 'layer'.
wide (bool, optional): Whether to pivot the output dataframe to a wide format.
Defaults to True.
Returns:
pd.DataFrame:
A dataframe representing the effective connectivity between groups of
neurons.
"""
# it will get confusing if we didn't use the same mapping for all layers
# though it is true that this would increase the size of the matrix being
# multiplied
local_idx_dict = {
idx: i for i, idx in enumerate(set(paths.pre).union(set(paths.post)))
} # give one index for each element in path; map from element to index
# map from index to element
local_to_global_idx = {i: idx for idx, i in local_idx_dict.items()}
paths.loc[:, ["pre_idx"]] = paths.pre.map(local_idx_dict)
paths.loc[:, ["post_idx"]] = paths.post.map(local_idx_dict)
# matmul with sparse matrices
for i, layer in enumerate(sorted(paths.layer.unique())):
if i == 0:
initial_el = paths[paths.layer == layer] # edgelist
csr = csr_matrix(
(
initial_el.weight,
(initial_el.pre_idx.values, initial_el.post_idx.values),
),
shape=(len(local_idx_dict), len(local_idx_dict)),
dtype=np.float32,
) # make sparse matrix of the shape all_elements, all_elements
else:
el = paths[paths.layer == layer]
csr = csr @ csr_matrix(
(el.weight, (el.pre_idx.values, el.post_idx.values)),
shape=(len(local_idx_dict), len(local_idx_dict)),
dtype=np.float32,
)
coo = csr.tocoo()
result_el = pd.DataFrame(
{"pre_idx": coo.row, "post_idx": coo.col, "weight": coo.data}
)
result_el.loc[:, ["pre"]] = result_el.pre_idx.map(local_to_global_idx)
result_el.loc[:, ["post"]] = result_el.post_idx.map(local_to_global_idx)
# pivot wider
if wide:
result_el = result_el.pivot(index="pre", columns="post", values="weight")
result_el.fillna(0, inplace=True)
return result_el
[docs]
def signed_effective_conn_from_paths(
paths,
wide: bool = True,
idx_to_nt: dict = None,
):
"""Calculate the *signed* effective connectivity between (groups of)
neurons based only on the provided `paths` between neurons. This function
runs on CPU, and doesn't expect a big connectivity matrix as input.
Args:
paths (pd.DataFrame): A dataframe representing the paths between
neurons, with columns 'pre', 'post', 'weight', 'layer', and
optionally 'sign'.
wide (bool, optional): Whether to pivot the output dataframe to a wide
format. Defaults to True.
idx_to_nt (dict, optional): A dictionary mapping neuron indices
(values in columns `pre` and `post`) to 1 (excitatory) / -1
(inhibitory). Defaults to None.
Returns:
list:
A list of two dataframes representing the effective connectivity between
groups of neurons, one for effective excitation, the other inhibition.
"""
if ("sign" not in paths.columns) & (idx_to_nt is None):
raise ValueError(
"Either 'sign' column must be present in paths or "
"idx_to_nt must be provided."
)
# setting local indices
# it will get confusing if we didn't use the same mapping for all layers
# though it is true that this would increase the size of the matrix being
# multiplied
local_idx_dict = {
idx: i for i, idx in enumerate(set(paths.pre).union(set(paths.post)))
}
local_to_global_idx = {i: idx for idx, i in local_idx_dict.items()}
paths.loc[:, ["pre_idx"]] = paths.pre.map(local_idx_dict)
paths.loc[:, ["post_idx"]] = paths.post.map(local_idx_dict)
# make sure sign is in the column:
if "sign" not in paths.columns:
if any(~paths.pre.isin(idx_to_nt)):
print(
"Warning: some neurons are not in idx_to_nt. Their outputs "
"will be ignored"
)
paths.loc[:, ["sign"]] = paths.pre.map(idx_to_nt)
# matmul with sparse matrices
for i, layer in enumerate(sorted(paths.layer.unique())):
if i == 0:
initial_el_e = paths[(paths.layer == layer) & (paths.sign == 1)]
initial_el_i = paths[(paths.layer == layer) & (paths.sign == -1)]
csr_e = csr_matrix(
(
initial_el_e.weight,
(
initial_el_e.pre_idx.values,
initial_el_e.post_idx.values,
),
),
shape=(len(local_idx_dict), len(local_idx_dict)),
)
csr_i = csr_matrix(
(
initial_el_i.weight,
(
initial_el_i.pre_idx.values,
initial_el_i.post_idx.values,
),
),
shape=(len(local_idx_dict), len(local_idx_dict)),
)
else:
el_e = paths[(paths.layer == layer) & (paths.sign == 1)]
el_i = paths[(paths.layer == layer) & (paths.sign == -1)]
this_csr_e = csr_matrix(
(el_e.weight, (el_e.pre_idx.values, el_e.post_idx.values)),
shape=(len(local_idx_dict), len(local_idx_dict)),
)
this_csr_i = csr_matrix(
(el_i.weight, (el_i.pre_idx.values, el_i.post_idx.values)),
shape=(len(local_idx_dict), len(local_idx_dict)),
)
# e = ee + ii
# make sure csr_e is not modified in place, so that we can use it
# for csr_i
csr_e_new = csr_e @ this_csr_e + csr_i @ this_csr_i
# i = ie + ei
csr_i = csr_e @ this_csr_i + csr_i @ this_csr_e
# now modify csr_e
csr_e = csr_e_new
coo_e = csr_e.tocoo()
coo_i = csr_i.tocoo()
# make dataframe based on the connectivity matrix
result_el_e = pd.DataFrame(
{"pre_idx": coo_e.row, "post_idx": coo_e.col, "weight": coo_e.data}
)
result_el_i = pd.DataFrame(
{"pre_idx": coo_i.row, "post_idx": coo_i.col, "weight": coo_i.data}
)
# change back to the global names
result_el_e.loc[:, ["pre"]] = result_el_e.pre_idx.map(local_to_global_idx)
result_el_e.loc[:, ["post"]] = result_el_e.post_idx.map(local_to_global_idx)
result_el_i.loc[:, ["pre"]] = result_el_i.pre_idx.map(local_to_global_idx)
result_el_i.loc[:, ["post"]] = result_el_i.post_idx.map(local_to_global_idx)
if wide:
result_el_e = result_el_e.pivot(index="pre", columns="post", values="weight")
result_el_e.fillna(0, inplace=True)
result_el_i = result_el_i.pivot(index="pre", columns="post", values="weight")
result_el_i.fillna(0, inplace=True)
return result_el_e, result_el_i
[docs]
def effconn_without_loops(
paths: pd.DataFrame,
pre_group: Optional[dict] = None,
post_group: Optional[dict] = None,
intermediate_group: Optional[dict] = None,
wide: bool = True,
combining_method: str = "mean",
chunk_size: int = 2000,
density_threshold: float = 0.2,
use_gpu: bool = True,
root: bool = False,
remove_loop_after_grouping: bool = True,
quiet: bool = False,
):
"""Calculate the effective connectivity from the paths DataFrame (which could be an
output of `find_paths_of_length()`), from the 'pre' in the earliest layer, to the
'post' in the latest layer, grouped by `pre_group`, `post_group`, and
`intermediate_group`, excluding paths that contain loops (i.e., paths that
revisit the same neuron). This function was written by Claude Sonnet 4.5.
This function:
1. Calculates total effective connectivity (with loops)
2. Enumerates paths to find those with loops
3. Accumulates effective connectivity contributions from looping paths
4. Subtracts loop contributions from total
Args:
paths (pd.DataFrame): DataFrame containing the paths with columns: 'pre', 'post',
'weight', and 'layer'. The 'pre' and 'post' columns represent the
presynaptic and postsynaptic neurons, respectively. The 'weight' column
represents the strength of the connection, and the 'layer' column indicates
the layer of the connection (starting from 1).
pre_group (dict): Mapping from presynaptic neuron indices (what's in the `pre`
column in paths) to their groups.
post_group (dict): Mapping from postsynaptic neuron indices to their groups.
intermediate_group (dict, optional): Mapping for intermediate neurons. Defaults
to None.
wide (bool, optional): If True, returns the result in wide format. If False,
returns in long format. Defaults to True.
combining_method (str, optional): Method to combine inputs or outputs based on
groups. Can be 'mean', 'median', or 'sum'. Defaults to 'mean'.
chunk_size (int, optional): Number of rows to process in each chunk when using
GPU. Defaults to 2000.
density_threshold (float, optional): Threshold for converting sparse matrices to
dense. If the density of the matrix exceeds this value, it will be converted
to a dense matrix to save memory. Defaults to 0.2.
use_gpu (bool, optional): Whether to use GPU for computation. Defaults to True.
If GPU is not available, it will fall back to CPU.
root (bool, optional): If True, take the nth root of the effective connectivity
values (where n is the number of steps/layers). This transforms the output
from "total influence" to "average connection strength per step". Defaults
to False.
remove_loop_after_grouping (bool, optional): Whether to remove loops after
grouping the paths. Defaults to True.
quiet (bool, optional): Whether to suppress progress output. Defaults to False.
Returns:
pd.DataFrame:
A DataFrame summarizing the effective connectivity between presynaptic and
postsynaptic groups, excluding loops.
"""
if len(paths) == 0:
return None
# Validate required columns
required_columns = {"layer", "pre", "post", "weight"}
missing = required_columns - set(paths.columns)
if missing:
raise ValueError(
f"paths DataFrame missing required columns: {missing}. "
f"Available columns: {list(paths.columns)}"
)
if remove_loop_after_grouping:
# group first
paths = group_paths(
paths,
pre_group,
post_group,
intermediate_group,
combining_method=combining_method,
)
# Calculate total effective connectivity (with loops)
if not quiet:
print("Calculating total effective connectivity (with loops)...")
effconn_total = effective_conn_from_paths(
paths,
wide=True,
chunk_size=chunk_size,
density_threshold=density_threshold,
use_gpu=use_gpu,
root=root,
)
# Enumerate paths using generator to identify loop contributions
if not quiet:
print("Enumerating paths to identify loops...")
# Build adjacency for path enumeration
max_layer = paths.layer.max()
min_layer = paths.layer.min()
# Dictionary to accumulate loop path contributions: (start_node, end_node) -> weight
loop_contributions = defaultdict(float)
# Use enumerate_paths generator to avoid materializing all paths
from .path_finding import enumerate_paths
path_gen = enumerate_paths(
paths, start_layer=min_layer, end_layer=max_layer, return_generator=True
)
total_paths = 0
loop_paths = 0
for path in tqdm(path_gen, desc="Processing paths", disable=quiet):
total_paths += 1
# Extract nodes from path - each tuple is (pre, post, weight)
# The nodes in order are: path[0][0] (start), then all path[i][1] (posts)
nodes_in_path = [path[0][0]] + [edge[1] for edge in path]
# Check for loops: does any node appear more than once?
# Allow start and end to be the same (that's specified by the user, not a loop)
# Only middle nodes matter for loop detection
start_node = nodes_in_path[0]
end_node = nodes_in_path[-1]
middle_nodes = nodes_in_path[1:-1] if len(nodes_in_path) > 2 else []
# Check if any middle node appears multiple times, or if start/end appear in middle
has_loop = False
seen_middle = set()
for node in middle_nodes:
if node in seen_middle or node == start_node or node == end_node:
has_loop = True
break
seen_middle.add(node)
if has_loop:
loop_paths += 1
# Calculate the contribution of this path
# Multiply all weights along the path
path_weight = 1.0
for _, _, weight in path:
path_weight *= weight
# Add to loop contributions for this (start, end) pair
loop_contributions[(start_node, end_node)] += path_weight
if not quiet:
print(f"Total paths: {total_paths}, Loop paths: {loop_paths}")
# Convert loop_contributions to DataFrame
if len(loop_contributions) == 0:
# No loops found, return total
if not quiet:
print("No loops found in paths.")
effconn_noloop = effconn_total
else:
# Build loop contribution matrix
loop_data = []
for (pre, post), weight in loop_contributions.items():
if root:
weight = weight ** (1 / (max_layer - min_layer + 1))
loop_data.append({"pre": pre, "post": post, "weight": weight})
loop_df = pd.DataFrame(loop_data)
loop_matrix = loop_df.pivot(index="pre", columns="post", values="weight")
loop_matrix.fillna(0, inplace=True)
# Ensure loop_matrix has same indices as effconn_total
loop_matrix = loop_matrix.reindex(
index=effconn_total.index, columns=effconn_total.columns, fill_value=0
)
# Subtract loop contributions
effconn_noloop = effconn_total - loop_matrix
# Handle numerical precision issues: clip small negative values to zero
min_value = effconn_noloop.min().min()
if min_value < 0:
if min_value < -1e-6:
# Significant negative values - warn user
import warnings
warnings.warn(
f"Loop subtraction resulted in significant negative values "
f"(min={min_value:.6e}). This may indicate an issue with the "
f"loop enumeration approach. Values will be clipped to zero.",
UserWarning,
)
# Clip to zero (handles both numerical errors and the warning case)
effconn_noloop = effconn_noloop.clip(lower=0)
# Convert to long format if requested
effconn_noloop_long = effconn_noloop.reset_index().melt(
id_vars="pre", var_name="post", value_name="weight"
)
if not remove_loop_after_grouping:
# Group now
effconn_noloop_long = group_paths(
effconn_noloop_long,
pre_group,
post_group,
intermediate_group,
combining_method=combining_method,
)
if wide:
effconn_noloop = effconn_noloop_long.pivot(
index="pre", columns="post", values="weight"
)
effconn_noloop.fillna(0, inplace=True)
if wide:
return effconn_noloop
else:
return effconn_noloop_long
[docs]
def read_precomputed(
prefix: str, file_path: str | None = None, first_n: int | None = None
) -> List:
"""Reads the precomputed compressed paths.
Args:
prefix (str): The prefix/folder name (expected to be the same) of the
files to read.
file_path (str, optional): The path to the files. Defaults to None. If
None, checks if running in Google Colab, and sets the path: if
running in Colab, sets the path to "/content/"; otherwise, sets the
path to "".
first_n (int, optional): Number of files to read. If None, reads all files.
Returns:
List:
A list of matrices (sparse or dense) representing the steps.
"""
if file_path is None:
# Check if running in Google Colab
if "COLAB_GPU" in os.environ:
# Running in Colab
file_path = "/content/"
else:
# Running locally
file_path = ""
steps_cpu = []
# Get all files with the prefix pattern
folder_path = os.path.join(file_path, prefix)
if not os.path.exists(folder_path):
raise FileNotFoundError(f"Directory {folder_path} does not exist")
files = os.listdir(folder_path)
# Filter files that match the pattern prefix_i.npz or prefix_i.npy
step_files = []
for f in files:
if f.startswith(f"{prefix}_") and (f.endswith(".npz") or f.endswith(".npy")):
# Extract the step number from the filename
try:
step_num = int(f.split("_")[-1].split(".")[0])
step_files.append((step_num, f))
except ValueError:
# Skip files that don't match the expected pattern
continue
# Sort by step number to ensure correct order
step_files.sort(key=lambda x: x[0])
if first_n is None:
first_n = len(step_files)
else:
first_n = min(first_n, len(step_files))
# Load the first_n files
for i in range(first_n):
step_num, filename = step_files[i]
file_full_path = os.path.join(folder_path, filename)
if filename.endswith(".npz"):
# Load sparse matrix
matrix = sp.sparse.load_npz(file_full_path)
elif filename.endswith(".npy"):
# Load dense matrix
matrix = np.load(file_full_path)
else:
# This shouldn't happen given our filtering, but just in case
continue
steps_cpu.append(matrix)
if len(steps_cpu) == 0:
raise FileNotFoundError(
f"No files found with pattern {prefix}_*.np[yz] in {folder_path}"
)
return steps_cpu