# Standard library imports
import math
from typing import List
import os
import gc
# Third-party package imports
import ipywidgets as widgets
import matplotlib.pyplot as plt
import numpy as np
import numpy.typing as npt
import pandas as pd
import plotly.express as px
import scipy as sp
import seaborn as sns
import torch
from IPython.display import display
from scipy.sparse import csr_matrix, issparse, csc_matrix, coo_matrix
from tqdm import tqdm
from .utils import (
dynamic_representation,
group_edge_by,
tensor_to_csc,
to_nparray,
torch_sparse_where,
arrayable,
scipy_sparse_to_pytorch,
)
[docs]
def compress_paths(
A: sp.sparse.spmatrix,
step_number: int,
threshold: float = 0,
output_threshold: float = 1e-4,
root: bool = False,
chunkSize=2000,
device: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu"),
save_to_disk: bool = False,
save_path: str = "./",
save_prefix: str = "step_",
return_results: bool = True,
high_cpu_ram: bool = True,
output_dtype: np.dtype = np.float32,
density_threshold: float = 0.2, # Save as dense if density exceeds this
):
"""
Computes A^0 to A^n by chunking the computation to save memory.
Results are thresholded and returned as a list of sparse scipy matrices or numpy
arrays (if above density_threshold). If it's too slow, try changing chunkSize.
Args:
A (scipy.sparse.matrix): The connectivity matrix as a scipy
sparse matrix.
step_number (int): Power to raise A to.
threshold (float): Threshold to apply to the matrix after each multiplication.
If 0, no thresholding is applied.
output_threshold (float): Threshold to apply to the output (>=), but not during
matrix multiplication.
root (bool): If True, take the n-th root of the result in output.
chunkSize (int): Size of the chunks to process at a time. This determines
the memory usage (on GPU if available). e.g. it might need A + (A.shape[0] *
chunkSize) * float32 / 8 btes.
device (torch.device): Device to use for computation. Uses GPU if available.
save_to_disk (bool): If True, save the results to disk.
save_path (str): Path to save the results.
save_prefix (str): Prefix to use for the saved files.
return_results (bool, optional): Whether to return the results as a list
of sparse matrices. Defaults to True. If False, returns an empty list.
high_cpu_ram (bool): if high_cpu_ram, keep all the resulting chunked sparse
matrices in memory, before combining and writing to disk altogether. if
False, write each chunk to disk to a temporary directory as soon as it is
computed, and combine them at the end.
output_dtype (np.dtype): Data type to use for the output matrices. Defaults
to np.float32.
density_threshold (float): If the density of the matrix exceeds this threshold,
the matrix is saved as dense. Defaults to 0.2.
Returns:
List[scipy.sparse.csr_matrix or numpy.ndarray]: List of matrices representing
A^0 to A^n.
"""
assert A.shape[0] == A.shape[1], "Matrix A must be square"
assert step_number > 0, "Power n must be positive"
# Turn A into a torch sparse tensor
A = scipy_sparse_to_pytorch(A).to(device)
matrix_size = A.shape[0]
num_chunks = (matrix_size + chunkSize - 1) // chunkSize # Ceiling division
# Create save directory if needed
if save_to_disk and not os.path.exists(save_path):
os.makedirs(save_path)
# Prepare temporary storage method
if high_cpu_ram:
# Initialize dictionaries to collect COO data
rows: dict[int, list[npt.NDArray]] = {idx: [] for idx in range(step_number)}
cols: dict[int, list[npt.NDArray]] = {idx: [] for idx in range(step_number)}
data: dict[int, list[npt.NDArray]] = {idx: [] for idx in range(step_number)}
else:
# Create temporary directory for chunks
temp_dir = os.path.join(os.getcwd(), "temp_chunks")
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
# Process each chunk
for chunk_idx in tqdm(range(num_chunks)):
# Define the range for this chunk
start_col = chunk_idx * chunkSize
end_col = min((chunk_idx + 1) * chunkSize, matrix_size)
current_chunk_size = end_col - start_col
for power in range(step_number):
# Compute the result for this chunk and power
if power == 0:
# For A^1, extract the relevant columns from A
indices = A._indices()
values = A._values()
# Filter to include only columns in current chunk
col_mask = (indices[1, :] >= start_col) & (indices[1, :] < end_col)
result_indices = indices[:, col_mask].clone()
result_values = values[col_mask].clone()
# Adjust column indices to be relative to start_col
result_indices[1, :] -= start_col
# Create sparse tensor for this column chunk
result = torch.sparse_coo_tensor(
result_indices,
result_values,
(matrix_size, current_chunk_size),
device=device,
).to_dense()
if threshold > 0:
result = torch.where(
torch.abs(result) < threshold,
torch.tensor(0.0).to(device),
result,
)
else:
# Matrix multiplication for higher powers
result = torch.sparse.mm(A, result)
if threshold > 0:
result = torch.where(
torch.abs(result) < threshold,
torch.tensor(0.0).to(device),
result,
)
# Force garbage collection
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Apply root if requested
if root:
result_root = torch.sign(result) * torch.abs(result) ** (
1 / (power + 1)
)
# Apply output threshold and collect non-zero entries
mask = torch.abs(result) > output_threshold
chunk_rows, chunk_cols = torch.nonzero(mask, as_tuple=True)
# Get values based on whether root was applied
chunk_data: npt.NDArray
if root:
chunk_data = (
result_root[chunk_rows, chunk_cols]
.cpu()
.numpy()
.astype(output_dtype)
)
else:
chunk_data = (
result[chunk_rows, chunk_cols].cpu().numpy().astype(output_dtype)
)
# Convert to CPU
chunk_cols: npt.NDArray = chunk_cols.cpu().numpy()
chunk_rows: npt.NDArray = chunk_rows.cpu().numpy()
# Store the data based on RAM usage preference
if high_cpu_ram:
# Adjust column indices to global coordinates
global_cols = chunk_cols + start_col
# Store in memory
rows[power].append(chunk_rows)
cols[power].append(global_cols)
data[power].append(chunk_data)
else:
# Save chunk to disk
sparse_chunk = sp.sparse.coo_matrix(
(chunk_data, (chunk_rows, chunk_cols)),
shape=(matrix_size, current_chunk_size),
)
sparse_chunk.eliminate_zeros()
sp.sparse.save_npz(
os.path.join(temp_dir, f"step_{power}_chunk_{chunk_idx}.npz"),
sparse_chunk,
)
# Clear memory
del result, mask, chunk_rows, chunk_cols, chunk_data
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Combine chunks and save results
print("Combining data from all chunks")
results = []
for power in tqdm(range(step_number)):
# Get all data for current power
if high_cpu_ram:
# Combine data from memory
if rows[power]: # Check if data was collected
all_rows = np.concatenate(rows[power])
all_cols = np.concatenate(cols[power])
all_data = np.concatenate(data[power])
else:
all_rows = np.array([])
all_cols = np.array([])
all_data = np.array([])
else:
# Combine data from disk
all_rows, all_cols, all_data = [], [], []
for chunk_idx in range(num_chunks):
# Load chunk from disk
sparse_chunk = sp.sparse.load_npz(
os.path.join(temp_dir, f"step_{power}_chunk_{chunk_idx}.npz")
)
# Adjust column indices
sparse_chunk.col += chunk_idx * chunkSize
# Append data
all_rows.append(sparse_chunk.row)
all_cols.append(sparse_chunk.col)
all_data.append(sparse_chunk.data)
# Clean up
del sparse_chunk
gc.collect()
# Combine all chunks
all_rows = np.concatenate(all_rows)
all_cols = np.concatenate(all_cols)
all_data = np.concatenate(all_data)
# Calculate density from the data directly
nnz = len(all_data)
density = nnz / (matrix_size**2)
print(
f"Matrix {power} has {nnz} non-zero elements ({density*100:.6f}% of full matrix)"
)
# Choose the appropriate format based on density
if density > density_threshold:
print(
f"Creating matrix {power} in dense format (density: {density*100:.6f}%)"
)
# Create dense matrix directly instead of converting from sparse
dense_result = np.zeros((matrix_size, matrix_size), dtype=output_dtype)
dense_result[all_rows, all_cols] = all_data
# Save to disk if requested
if save_to_disk:
np.save(
os.path.join(save_path, f"{save_prefix}{power}.npy"), dense_result
)
# Add to results if requested
if return_results:
results.append(dense_result)
# Clean up
del dense_result
else:
# Create sparse matrix
sparse_result = sp.sparse.coo_matrix(
(all_data, (all_rows, all_cols)), shape=(matrix_size, matrix_size)
).tocsc()
sparse_result.eliminate_zeros()
# Save sparse matrix if requested
if save_to_disk:
sp.sparse.save_npz(
os.path.join(save_path, f"{save_prefix}{power}.npz"), sparse_result
)
# Add to results if requested
if return_results:
results.append(sparse_result)
# Clean up
del sparse_result
# Clean up
del all_rows, all_cols, all_data
gc.collect()
# Clean up temporary files if using disk storage
if not high_cpu_ram:
for power in range(step_number):
for chunk_idx in range(num_chunks):
os.remove(os.path.join(temp_dir, f"step_{power}_chunk_{chunk_idx}.npz"))
os.rmdir(temp_dir)
# Final cleanup
torch.cuda.empty_cache()
gc.collect()
return results
[docs]
def compress_paths_dense_chunked(
inprop: csc_matrix,
step_number: int,
threshold: float = 0,
output_threshold: float = 1e-4,
root: bool = False,
chunkSize: int = 10000,
dtype: torch.dtype = torch.float32,
device: torch.device = torch.device("cuda" if torch.cuda.is_available() else "cpu"),
save_to_disk: bool = False,
save_path: str = "./",
save_prefix: str = "step_",
) -> list:
"""Performs iterative multiplication of a sparse matrix `inprop` for a specified
number of steps, applying thresholding to filter out values below a certain
`threshold` to optimize memory usage and computation speed.
The function is optimized to run on GPU if available. It needs >=
size_of_inprop.to_dense() * 3 amount of GPU memory, for matrix multiplication, and
thresholding.
This function multiplies the connectivity matrix (input in rows; output in columns)
`inprop` with itself `step_number` times, with each step's result being thresholded
to zero out elements below a given threshold. The function stores each step's result
in a list, where each result is further processed to drop values below the
`output_threshold` to save memory.
Args:
inprop (scipy.sparse.matrix): The connectivity matrix as a scipy
sparse matrix.
step_number (int): The number of iterations to perform the matrix
multiplication.
threshold (float, optional): The threshold value to apply after each
multiplication. Values below this threshold are set to zero.
Defaults to 0.
output_threshold (float, optional): The threshold value to apply to
the final output, used to reduce output size. Defaults to 1e-4.
root (bool, optional): Whether to take the nth root of the output.
This can be understood as "the average direct connection strength"
(when root=True), as opposed to "the proportion of influence among
all partners n steps away" (when root=False). Defaults to False.
chunkSize (int, optional): The size of the chunks to split the matrix
into for matrix multiplication. Defaults to 10000.
dtype (torch.dtype, optional): The data type to use for the tensor
calculations. Defaults to torch.float32.
device (torch.device, optional): The device to use for the tensor
calculations. Defaults to torch.device("cuda" if
torch.cuda.is_available() else "cpu").
save_to_disk (bool, optional): Whether to save the output matrices to
disk. Defaults to False.
save_path (str, optional): The path to save the output matrices to.
Defaults to "./" (the current folder).
save_prefix (str, optional): The prefix to use for the output matrix
filenames. Defaults to ``"step_"``.
Returns:
list: A list of scipy.sparse.csc_matrix objects, each representing
connectivity from all neurons to all neurons n steps away.
Note:
This function requires PyTorch and is designed to automatically
utilize CUDA-enabled GPU devices if available to accelerate
computations. The input matrix `inprop` is converted to a dense tensor
before processing.
Example:
>>> from scipy.sparse import csc_matrix
>>> import numpy as np
>>> inprop = csc_matrix(np.array([[0.1, 0.2], [0.3, 0.4]]))
>>> step_number = 2
>>> compressed_paths = compress_paths(inprop, step_number,
threshold=0.1,
output_threshold=0.01)
>>> print(compressed_paths)
"""
steps_fast: List[csc_matrix] = []
if not isinstance(inprop, csc_matrix):
inprop = inprop.tocsc()
# check that step_number>0
if step_number < 1:
raise ValueError("step_number should be greater than 0")
size = inprop.shape[0]
chunks = math.ceil(size / chunkSize)
with torch.no_grad():
for i in tqdm(range(step_number)):
if i == 0:
out_tensor = torch.tensor(inprop.toarray(), dtype=dtype)
else:
out_tensor_new = torch.zeros(size, size, dtype=dtype)
colLow = 0
colHigh = chunkSize
for colChunk in range(chunks): # iterate chunks colwise
rowLow = 0
rowHigh = chunkSize
in_col = torch.tensor(
inprop[:, colLow:colHigh].toarray(), dtype=dtype
).to(device)
# shape: size x chunkSize; on GPU
for rowChunk in range(chunks): # iterate chunks rowwise
in_rows = out_tensor[rowLow:rowHigh, :].to(device)
# shape: chunkSize x size; on GPU
out_tensor_new[rowLow:rowHigh, colLow:colHigh] = torch.matmul(
in_rows, in_col
).to("cpu")
# shape: chunkSize x chunkSize; on CPU
rowLow += chunkSize
rowHigh += chunkSize
rowHigh = min(rowHigh, size)
del in_rows
del in_col
torch.cuda.empty_cache()
colLow += chunkSize
colHigh += chunkSize
colHigh = min(colHigh, size)
out_tensor = out_tensor_new
del out_tensor_new
# Clear PyTorch CUDA cache
torch.cuda.empty_cache()
# Thresholding during matmul
if threshold != 0:
out_tensor = torch.where(
out_tensor >= threshold,
out_tensor,
torch.tensor(0.0, dtype=dtype),
)
# Convert to csc for output
out_csc = tensor_to_csc(out_tensor)
out_csc.eliminate_zeros()
if root:
out_csc.data = np.power(out_csc.data, 1 / (i + 1))
if output_threshold > 0:
out_csc.data = np.where(
out_csc.data >= output_threshold, out_csc.data, 0
)
out_csc.eliminate_zeros()
if save_to_disk:
sp.sparse.save_npz(
os.path.join(save_path, f"{save_prefix}{i}.npz"), out_csc
)
else:
steps_fast.append(out_csc)
del out_csc
# remove all variables
del out_tensor
torch.cuda.empty_cache()
return steps_fast
# below: not chunked version
[docs]
def compress_paths_not_chunked(
inprop, step_number, threshold=0, output_threshold=1e-4, root=False
):
"""As above, but without chunking.
This would be more demanding for GPU RAM.
"""
steps_fast = []
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
inprop_tensor = torch.tensor(inprop.toarray(), device=device)
with torch.no_grad():
for i in tqdm(range(step_number)):
if i == 0:
out_tensor = inprop_tensor.clone()
else:
out_tensor = torch.matmul(out_tensor, inprop_tensor)
# Thresholding during matmul
if threshold != 0:
out_tensor = torch.where(
out_tensor >= threshold,
out_tensor,
torch.tensor(0.0, device=device),
)
# Convert to csc for output
out_csc = tensor_to_csc(out_tensor.to("cpu"))
out_csc.eliminate_zeros()
if root:
out_csc.data = np.power(out_csc.data, 1 / (i + 1))
if output_threshold > 0:
out_csc.data = np.where(
out_csc.data >= output_threshold, out_csc.data, 0
)
out_csc.eliminate_zeros()
steps_fast.append(out_csc)
torch.cuda.empty_cache()
return steps_fast
[docs]
def compress_paths_signed(
inprop,
idx_to_sign: dict,
target_layer_number: int,
threshold: float = 0,
output_threshold: float = 1e-4,
root: bool = False,
chunkSize: int = 2000,
save_to_disk: bool = False,
save_path: str = "./",
return_results: bool = True,
):
"""Calculates the excitatory and inhibitory influences across specified layers of a
neural network, using PyTorch for GPU acceleration. Even numbers of inhibition is
treated as excitation.
Args:
inprop (scipy.sparse.csc_matrix): The initial connectivity matrix representing
direct connections between all neurons, shape (n_neurons, n_neurons).
idx_to_sign (dict): A dictionary mapping neuron indices to the sign of output
(1 for excitatory, -1 for inhibitory).
target_layer_number (int): The number of layers through which to calculate
influences. 1 for direct connections, 2 for one step away, etc.
threshold (float, optional): A value to threshold the influences during
calculation; influences below this value are set to zero, and not passed on.
Defaults to 0.
output_threshold (float, optional): A threshold for the final output to reduce
output file size, with values below this threshold set to zero. Defaults to
1e-4.
root (bool, optional): Whether to take the nth root of the output. This can be
understood as "the average direct connection strength" (when root=True), as
opposed to "the proportion of influence among all partners n steps away"
(when root=False). Defaults to False.
chunkSize (int, optional): The size of the chunks to split the matrix into for
matrix multiplication. Defaults to 5000. A chunk is a dense matrix of size
(chunkSize, n_neurons). This determines the memory usage: e.g. 2000 * 160000
* float32 / 8 bytes / 1e9 = 1.28 GB. We need two copies (one excitation, one
inhibition), plus the sparse representations of the (n_neurons, n_neurons)
matrix.
save_to_disk (bool, optional): Whether to save the output matrices to disk.
save_path (str, optional): Path to save the results. Defaults to "./".
return_results (bool, optional): Whether to return the results as a list
of sparse matrices. Defaults to True.
Returns:
Tuple[List[scipy.sparse.csc_matrix], List[scipy.sparse.csc_matrix]]:
Two lists of sparse matrices representing the excitatory and
inhibitory influences, respectively, up to the specified target
layer.
"""
# if inprop is not a coo matrix, convert it
if not issparse(inprop):
# raise error
raise ValueError("inprop must be a scipy sparse matrix")
if not isinstance(inprop, coo_matrix):
inprop = inprop.tocoo()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
e_idx = [i for i in range(len(idx_to_sign)) if idx_to_sign[i] == 1]
i_idx = [i for i in range(len(idx_to_sign)) if idx_to_sign[i] == -1]
matrix_size = inprop.shape[0]
num_chunks = (matrix_size + chunkSize - 1) // chunkSize # Ceiling division
# make a temporary directory to save the chunks
temp_dir = os.path.join(os.getcwd(), "temp_chunks")
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
# Filter out e/i rows directly in COO format
e_mask = np.isin(inprop.row, e_idx)
i_mask = np.isin(inprop.row, i_idx)
direct_e_sender = sp.sparse.coo_matrix(
(
inprop.data[e_mask],
(inprop.row[e_mask], inprop.col[e_mask]),
),
shape=inprop.shape,
).tocsr()
direct_i_sender = sp.sparse.coo_matrix(
(
inprop.data[i_mask],
(inprop.row[i_mask], inprop.col[i_mask]),
),
shape=inprop.shape,
).tocsr()
if target_layer_number == 1:
return [direct_e_sender.tocsc()], [direct_i_sender.tocsc()]
# shape: (n_neurons, n_neurons)
direct_e_sender_tensor = scipy_sparse_to_pytorch(direct_e_sender, device=device)
# shape: (n_neurons, n_neurons)
direct_i_sender_tensor = scipy_sparse_to_pytorch(direct_i_sender, device=device)
for chunk_idx in tqdm(range(num_chunks)):
# Define the range for this chunk
start_row = chunk_idx * chunkSize
end_row = min((chunk_idx + 1) * chunkSize, matrix_size)
current_chunk_size = end_row - start_row
for layer in range(target_layer_number):
if layer == 0:
# For inprop^1, just take the corresponding rows, and turn into tensor
# shape: (chunkSize, n_neurons)
dense_e = torch.tensor(
direct_e_sender[start_row:end_row, :].toarray(),
dtype=torch.float32,
device=device,
)
dense_i = torch.tensor(
direct_i_sender[start_row:end_row, :].toarray(),
dtype=torch.float32,
device=device,
)
else:
past_dense_e = dense_e.clone()
# E = ee and ii added
# shape: (chunkSize, n_neurons)
dense_e = torch.sparse.mm(
dense_e, direct_e_sender_tensor
) + torch.sparse.mm(dense_i, direct_i_sender_tensor)
# I = E * I + I * E
# shape: (chunkSize, n_neurons)
dense_i = torch.sparse.mm(
past_dense_e, direct_i_sender_tensor
) + torch.sparse.mm(dense_i, direct_e_sender_tensor)
# Apply thresholding
if threshold > 0:
dense_e = torch.where(
dense_e >= threshold,
dense_e,
torch.tensor(0.0, device=device),
)
dense_i = torch.where(
dense_i >= threshold,
dense_i,
torch.tensor(0.0, device=device),
)
# force garbage collection after each multiplication
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Convert to csc for output
mask_e = torch.abs(dense_e) > output_threshold
mask_i = torch.abs(dense_i) > output_threshold
chunk_rows_e, chunk_cols_e = torch.nonzero(mask_e, as_tuple=True)
chunk_rows_i, chunk_cols_i = torch.nonzero(mask_i, as_tuple=True)
if root:
chunk_data_e = (
(
torch.sign(dense_e[chunk_rows_e, chunk_cols_e])
* torch.abs(dense_e[chunk_rows_e, chunk_cols_e])
** (1 / (layer + 1))
)
.cpu()
.numpy()
)
chunk_data_i = (
(
torch.sign(dense_i[chunk_rows_i, chunk_cols_i])
* torch.abs(dense_i[chunk_rows_i, chunk_cols_i])
** (1 / (layer + 1))
)
.cpu()
.numpy()
)
else:
chunk_data_e = dense_e[chunk_rows_e, chunk_cols_e].cpu().numpy()
chunk_data_i = dense_i[chunk_rows_i, chunk_cols_i].cpu().numpy()
# save the chunked data to disk as scipy sparse matrices
sparse_e = sp.sparse.coo_matrix(
(
chunk_data_e,
(chunk_rows_e.cpu().numpy(), chunk_cols_e.cpu().numpy()),
),
shape=(current_chunk_size, matrix_size),
)
sparse_i = sp.sparse.coo_matrix(
(
chunk_data_i,
(chunk_rows_i.cpu().numpy(), chunk_cols_i.cpu().numpy()),
),
shape=(current_chunk_size, matrix_size),
)
sparse_e.eliminate_zeros()
sparse_i.eliminate_zeros()
# save the sparse matrices to disk
sp.sparse.save_npz(
os.path.join(temp_dir, f"step_{layer}_e_{chunk_idx}.npz"), sparse_e
)
sp.sparse.save_npz(
os.path.join(temp_dir, f"step_{layer}_i_{chunk_idx}.npz"), sparse_i
)
del sparse_e, sparse_i
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
# Combine all data -----------------
print("Combining data from all chunks")
stepse = []
stepsi = []
# if save_path isn't already a directory, make it
if save_to_disk and not os.path.exists(save_path):
os.makedirs(save_path)
for layer in tqdm(range(target_layer_number)):
# first combine the es ----
rows = []
cols = []
data = []
for chunk_idx in range(num_chunks):
# load the chunked data from disk
sparse_e = sp.sparse.load_npz(
os.path.join(temp_dir, f"step_{layer}_e_{chunk_idx}.npz")
)
# adjust the row indices to account for the chunk offset
sparse_e.row += chunk_idx * chunkSize
# append the data to the lists
rows.append(sparse_e.row)
cols.append(sparse_e.col)
data.append(sparse_e.data)
# delete the sparse matrix to save memory
del sparse_e
gc.collect()
# combine the data
rows = np.concatenate(rows)
cols = np.concatenate(cols)
data = np.concatenate(data)
# create the sparse matrix
sparse_e = sp.sparse.coo_matrix(
(data, (rows, cols)), shape=(matrix_size, matrix_size)
).tocsc()
sparse_e.eliminate_zeros()
# save the sparse matrix to disk
if save_to_disk:
sp.sparse.save_npz(os.path.join(save_path, f"step_{layer}_e.npz"), sparse_e)
if return_results:
stepse.append(sparse_e)
# delete the sparse matrix to save memory
del sparse_e
gc.collect()
# now combine the is ----
rows = []
cols = []
data = []
for chunk_idx in range(num_chunks):
# load the chunked data from disk
sparse_i = sp.sparse.load_npz(
os.path.join(temp_dir, f"step_{layer}_i_{chunk_idx}.npz")
)
# adjust the row indices to account for the chunk offset
sparse_i.row += chunk_idx * chunkSize
# append the data to the lists
rows.append(sparse_i.row)
cols.append(sparse_i.col)
data.append(sparse_i.data)
# delete the sparse matrix to save memory
del sparse_i
gc.collect()
# combine the data
rows = np.concatenate(rows)
cols = np.concatenate(cols)
data = np.concatenate(data)
# create the sparse matrix
sparse_i = sp.sparse.coo_matrix(
(data, (rows, cols)), shape=(matrix_size, matrix_size)
).tocsc()
sparse_i.eliminate_zeros()
if return_results:
stepsi.append(sparse_i)
# save the sparse matrix to disk
if save_to_disk:
sp.sparse.save_npz(os.path.join(save_path, f"step_{layer}_i.npz"), sparse_i)
# delete the sparse matrix to save memory
del sparse_i
gc.collect()
# remove the temporary directory
for layer in range(target_layer_number):
for chunk_idx in range(num_chunks):
os.remove(os.path.join(temp_dir, f"step_{layer}_e_{chunk_idx}.npz"))
os.remove(os.path.join(temp_dir, f"step_{layer}_i_{chunk_idx}.npz"))
os.rmdir(temp_dir)
# remove all variables
del direct_e_sender, direct_i_sender
torch.cuda.empty_cache()
# clear the cache
gc.collect()
return stepse, stepsi
[docs]
def compress_paths_signed_no_chunking(
inprop,
idx_to_sign,
target_layer_number,
threshold=0,
output_threshold=1e-4,
root=False,
):
"""Calculates the excitatory and inhibitory influences across specified
layers of a neural network, using PyTorch for GPU acceleration. This
function processes a connectivity matrix (where presynaptic neurons are
represented by rows and postsynaptic neurons by columns) to distinguish and
compute the influence of excitatory and inhibitory neurons at each layer.
Args:
inprop (scipy.sparse.csc_matrix): The initial connectivity matrix
representing direct connections between adjacent layers.
idx_to_sign (dict): A dictionary mapping neuron indices to their types
(1 for excitatory, -1 for inhibitory), used to differentiate between
excitatory and inhibitory influences.
target_layer_number (int): The number of layers through which to
calculate influences, starting from the second layer (with the first
layer's influence, the direct connectivity, being defined by `inprop`).
threshold (float, optional): A value to threshold the influences;
influences below this value are set to zero, and not passed on in
future layers. Defaults to 0.
output_threshold (float, optional): A threshold for the final output
to reduce memory usage, with values below this threshold set to zero.
Defaults to 1e-4.
root (bool, optional): Whether to take the nth root of the output.
This can be understood as "the average direct connection strength"
(when root=True), as opposed to "the proportion of influence among all
partners n steps away" (when root=False). Defaults to False.
Returns:
Tuple[List[scipy.sparse.csc_matrix], List[scipy.sparse.csc_matrix]]:
Two lists of sparse matrices representing the excitatory and
inhibitory influences, respectively, up to the specified target
layer.
Note:
This function is ideal with GPU support. Ensure your environment
supports CUDA and that PyTorch is correctly installed.
"""
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
inprop_tensor = torch.tensor(inprop.toarray(), device=device, dtype=torch.float32)
# Create masks for excitatory and inhibitory neurons
n_neurons = inprop_tensor.shape[0]
excitatory_mask = torch.tensor(
[1 if idx_to_sign[i] == 1 else 0 for i in range(n_neurons)],
dtype=torch.float32,
device=device,
)
inhibitory_mask = torch.tensor(
[1 if idx_to_sign[i] == -1 else 0 for i in range(n_neurons)],
dtype=torch.float32,
device=device,
)
print(
"Number of excitatory neurons: "
+ str(excitatory_mask.unique(return_counts=True)[1][1].cpu().numpy())
)
print(
"Number of inhibitory neurons: "
+ str(inhibitory_mask.unique(return_counts=True)[1][1].cpu().numpy())
)
lne = torch.matmul(torch.diag(excitatory_mask), inprop_tensor)
lni = torch.matmul(torch.diag(inhibitory_mask), inprop_tensor)
lne_initial = lne.clone()
lni_initial = lni.clone()
steps_excitatory = []
steps_inhibitory = []
for layer in tqdm(range(target_layer_number)):
if layer != 0:
# if layer ==0, return the direct excitatory and inhibitory
# connections separately
lne_new = torch.matmul(lne, lne_initial) + torch.matmul(lni, lni_initial)
lni = torch.matmul(lne, lni_initial) + torch.matmul(lni, lne_initial)
# Apply thresholding
if threshold > 0:
lne_new = torch_sparse_where(lne_new, threshold)
lni = torch_sparse_where(lni, threshold)
# Dynamic representation based on density
lne = dynamic_representation(lne_new)
lni = dynamic_representation(lni)
torch.cuda.empty_cache()
# possible alternative implementation: first root and threshold, then
# move to CPU. But not sure if can root sparse tensor. So:
stepe_csc = tensor_to_csc(lne.to("cpu"))
stepi_csc = tensor_to_csc(lni.to("cpu"))
if root:
stepe_csc.data = np.power(stepe_csc.data, 1 / (layer + 1))
stepi_csc.data = np.power(stepi_csc.data, 1 / (layer + 1))
# then threshold
if output_threshold > 0:
stepe_csc.data = np.where(
stepe_csc.data >= output_threshold, stepe_csc.data, 0
)
stepi_csc.data = np.where(
stepi_csc.data >= output_threshold, stepi_csc.data, 0
)
stepe_csc.eliminate_zeros()
stepi_csc.eliminate_zeros()
steps_excitatory.append(stepe_csc)
steps_inhibitory.append(stepi_csc)
torch.cuda.empty_cache()
return steps_excitatory, steps_inhibitory
[docs]
def add_first_n_matrices(matrices, n):
"""Adds the first N connectivity matrices from a list, supporting different
path lengths. This function is designed to work with scipy sparse matrices,
and dense numpy matrices. Each matrix in the list represents connectivity
information for a specific path length.
Args:
matrices (list): A list of connectivity matrices of different path
lengths. The matrices can be scipy sparse matrices, or dense numpy
arrays. The function expects all matrices in the list to be of the
same type and shape.
n (int): The number of initial matrices in the list to be summed. This
number must not exceed the length of the matrices list.
Returns:
matrix: The resulting matrix after summing the first N matrices. The
type of the returned matrix matches the type of the input matrices
(scipy sparse matrix, or numpy array).
Raises:
ValueError: If the list of matrices is empty or if n is larger than
the number of matrices available in the list.
Example:
>>> from scipy.sparse import csc_matrix
>>> matrices = [csc_matrix([[1, 2], [3, 4]]),
csc_matrix([[5, 6], [7, 8]]),
csc_matrix([[9, 10], [11, 12]])]
>>> n = 2
>>> result_matrix = add_first_n_matrices(matrices, n)
>>> print(result_matrix.toarray())
[[ 6 8]
[10 12]]
Note:
Ensure that all matrices in the list are of compatible types and
shapes before using this function.
"""
if not matrices:
raise ValueError("The list of matrices is empty")
if n > len(matrices):
raise ValueError("n is larger than the number of matrices available")
sum_matrix = matrices[0].copy()
for i in range(1, n):
sum_matrix += matrices[i]
return sum_matrix
[docs]
def result_summary(
stepsn,
inidx: arrayable,
outidx: arrayable,
inidx_map: dict | None = None,
outidx_map: dict | None = None,
display_output: bool = True,
display_threshold: float = 1e-3,
threshold_axis: str = "row",
sort_within: str = "column",
sort_names: str | List | None = None,
pre_in_column: bool = False,
include_undefined_groups: bool = False,
outprop: bool = False,
combining_method: str = "mean",
):
"""Generates a summary of connections between different types of neurons,
represented by their input and output indexes. The function calculates the
total synaptic input from presynaptic neuron groups to an average neuron in
each postsynaptic neuron group.
Args:
stepsn (scipy.sparse matrix or numpy.ndarray): Matrix representing the
synaptic strengths between neurons, can be dense or sparse.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array
of indices representing the input (presynaptic) neurons, used to
subset stepsn. nan values are removed.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array
of indices representing the output (postsynaptic) neurons.
inidx_map (dict, optional): Mapping from indices to neuron groups for
the input neurons. Defaults to None, in which case neurons are not
grouped.
outidx_map (dict, optional): Mapping from indices to neuron groups for
the output neurons. Defaults to None, in which case it is set to
be the same as inidx_map.
display_output (bool, optional): Whether to display the output in a
coloured dataframe. Defaults to True.
display_threshold (float, optional): The minimum threshold for
displaying the output. Defaults to 0.
threshold_axis (str, optional): The axis to apply the display_threshold to.
Defaults to 'row' (removing entire rows if no value exceeds
display_threshold).
sort_within (str, optional): The axis to sort the output in. Defaults
to 'column'.
sort_names (str or list, optional): the column/row name(s) to sort the
result by. If none is provided, then sort by the first column/row.
pre_in_column (bool, optional): Whether to have the presynaptic neuron
groups as columns. Defaults to False (pre in rows, post: columns).
include_undefined_groups (bool, optional): Whether to include
undefined groups in the output. Defaults to False.
outprop (bool, optional): If True, get the summed output proportion (across
recipient single cells in the same cell type) for each average sender. If
False (default), get the summed input proportion across all senders for
each average recipient.
combining_method (str, optional): Method to combine inputs (outprop=False)
or outputs (outprop=True). Can be 'sum', 'mean', or 'median'.
Defaults to 'mean'.
Returns:
pd.DataFrame: A dataframe representing the summed synaptic input from
presynaptic neuron groups to an average neuron in each postsynaptic
neuron group. This dataframe is always returned, regardless of the
value of display_output.
Displays:
If display_output is True, the function will display a styled version
of the resulting dataframe.
"""
assert combining_method in ["mean", "median", "sum"], (
"The combining_method should be either 'mean', 'median', or 'sum'. "
f"Currently it is {combining_method}."
)
if inidx_map is None:
inidx_map = {idx: idx for idx in range(stepsn.shape[0])}
if outidx_map is None:
outidx_map = inidx_map
# remove nan values in inidx and outidx
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
# make sure inidx and outidx only contain integers
inidx = np.array([int(x) for x in inidx])
outidx = np.array([int(x) for x in outidx])
if issparse(stepsn):
# if stepsn is coo, turn into csc
if stepsn.format == "coo":
stepsn = stepsn.tocsc()
matrix = stepsn[:, outidx][inidx, :].toarray()
else:
matrix = stepsn[inidx, :][:, outidx]
if include_undefined_groups:
# fill the nan values in inidx_map (e.g. 17726: nan) and outidx_map
# with 'undefined'
inidx_map = {k: v if pd.notna(v) else "undefined" for k, v in inidx_map.items()}
outidx_map = {
k: v if pd.notna(v) else "undefined" for k, v in outidx_map.items()
}
# Create the dataframe
df = pd.DataFrame(
data=matrix,
# choose what to group by here
# if idx is mapped to root_id, if root_id is kept as int64, the
# root_ids seem a bit messed up
index=[str(inidx_map[key]) for key in inidx],
columns=[str(outidx_map[key]) for key in outidx],
)
if not outprop:
# Sum across rows: presynaptic neuron is in the rows
# summing across neurons of the same type: total amount of input from that
# type for the postsynaptic neuron
summed_df = df.groupby(df.index).sum()
# Average across columns and transpose back
# averaging across columns of the same type:
# on average, a neuron of that type receives x% input from a presynaptic type
if combining_method == "sum":
result_df = summed_df.T.groupby(level=0).sum().T
elif combining_method == "mean":
result_df = summed_df.T.groupby(level=0).mean().T
elif combining_method == "median":
result_df = summed_df.T.groupby(level=0).median().T
else:
# calculate the output proportion from an average source neuron to a target type
# so first sum across columns
summed_df = df.T.groupby(level=0).sum().T
# then average across rows
if combining_method == "sum":
result_df = summed_df.groupby(summed_df.index).sum()
elif combining_method == "mean":
result_df = summed_df.groupby(summed_df.index).mean()
elif combining_method == "median":
result_df = summed_df.groupby(summed_df.index).median()
if pre_in_column:
result_df = result_df.T
if display_threshold > 0:
if threshold_axis == "row":
# only display rows where any value >= display_threshold
result_df = result_df[(np.abs(result_df) >= display_threshold).any(axis=1)]
elif threshold_axis == "column":
# only display columns where any value >= display_threshold
result_df = result_df.loc[
:, (np.abs(result_df) >= display_threshold).any(axis=0)
]
else:
raise ValueError("threshold_axis must be either 'column' or 'row'.")
# if there is nothing left
if result_df.empty:
raise ValueError(
"No values left after applying the display_threshold. "
"Try setting display_threshold to 0."
)
if sort_within == "column":
if sort_names is None:
# sort result_df by the values in the first column, in descending
# order
result_df = result_df.sort_values(
by=np.abs(result_df).columns[0], ascending=False
)
elif isinstance(sort_names, str):
sort_names = [sort_names]
if sort_names is not None:
if set(sort_names).issubset(result_df.columns):
result_df = result_df.sort_values(by=sort_names, ascending=False)
else:
raise ValueError(
"sort_names must be present in the values of outidx_map."
)
elif sort_within == "row":
# first sort rows by average row value in descending order
result_df = result_df.loc[
np.abs(result_df).mean(axis=1).sort_values(ascending=False).index
]
if sort_names is None:
# sort result_df by the values in the first column, in descending
# order
result_df = result_df.sort_values(
by=result_df.index[0], axis=1, ascending=False
)
elif isinstance(sort_names, str):
sort_names = [sort_names]
if sort_names is not None:
if set(sort_names).issubset(result_df.index):
result_df = result_df.sort_values(
by=sort_names, axis=1, ascending=False
)
else:
raise ValueError(
"sort_names must be present in the values of inidx_map."
)
else:
raise ValueError("sort_within must be either 'column' or 'row'.")
if display_output:
result_dp = result_df.style.background_gradient(
cmap="Blues",
vmin=np.abs(result_df).min().min(),
vmax=np.abs(result_df).max().max(),
)
display(result_dp)
return result_df
[docs]
def contribution_by_path_lengths_data(
steps,
inidx: arrayable,
outidx: arrayable,
outidx_map: dict | None = None,
inidx_map: dict | None = None,
):
"""Calculates the contribution from all of inidx (grouped by inidx_map) to an
average outidx (grouped by outidx_map) over different path lengths. Either inidx_map
or outidx_map, but not both, should be provided. If neither is provided, presynaptic
neurons are grouped together. Direct connections are in path_length 1.
Args:
steps (list of scipy.sparse matrices or numpy.array): List of sparse matrices,
each representing synaptic strengths for a specific path length.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array of indices
representing input (presynaptic) neurons.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array of
indices representing output (postsynaptic) neurons.
outidx_map (dict): Mapping from indices to postsynaptic neuron groups. Only one
of inidx_map and outidx_map should be specified.
inidx_map (dict): Mapping from indices to presynaptic neuron groups. Only one of
inidx_map and outidx_map should be specified.
Returns:
pd.DataFrame: A DataFrame containing the contributions from presynaptic neurons
to postsynaptic neurons over different path lengths. The DataFrame has three
columns: 'path_length', 'presynaptic_type' (or 'postsynaptic_type'), and 'value'.
"""
# remove nan values in inidx and outidx
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
# check if both inidx_map and outidx_map are provided
if inidx_map is not None and outidx_map is not None:
raise ValueError(
"Only one of inidx_map and outidx_map should be specified. "
"If you want to keep both, use "
"contribution_by_path_lengths_heatmap()."
)
if inidx_map is None and outidx_map is None:
outidx_map = {idx: idx for idx in outidx}
# give message that pres are grouped together
print(
"Neither inidx_map nor outidx_map provided. By default "
"presynaptic neurons are grouped together."
)
rows = []
for step in steps:
if hasattr(step, "toarray"):
data = step[:, outidx][inidx, :].toarray()
else:
data = step[inidx, :][:, outidx]
if inidx_map is not None:
df = pd.DataFrame(
data=data,
index=[inidx_map[key] for key in inidx],
)
# average of all columns
# then groupby index, and take the sum
# average post from all pre
df = df.mean(axis=1).groupby(level=0).sum().to_frame().T
rows.append(df)
elif outidx_map is not None:
df = pd.DataFrame(
data=data,
columns=[outidx_map[key] for key in outidx],
)
# sum all rows
# then groupby column, and take the mean
# average post from all pre
df = df.sum().groupby(level=0).mean().to_frame().T
rows.append(df)
# first have a dataframe where: each row is a path length, each column is
# a postsynaptic cell type
# then pivot_wide to long
# index is the path length
# variable is postynaptic cell type
# value is y axis
contri = pd.concat(rows, ignore_index=True).melt(ignore_index=False).reset_index()
if inidx_map is not None:
contri.columns = ["path_length", "presynaptic_type", "value"]
else:
contri.columns = ["path_length", "postsynaptic_type", "value"]
contri.path_length = contri.path_length + 1
return contri
[docs]
def contribution_by_path_lengths(
steps,
inidx: arrayable,
outidx: arrayable,
outidx_map: dict | None = None,
inidx_map: dict | None = None,
width: int = 800,
height: int = 400,
):
"""Plots the connection strength from all of inidx (grouped by inidx_map) to an
average outidx (grouped by outidx_map) over different path lengths. Either inidx_map
or outidx_map, but not both, should be provided. If neither is provided, presynaptic
neurons are grouped together. Direct connections are in path_length 1.
Args:
steps (list of scipy.sparse matrices or numpy.array): List of sparse matrices,
each representing synaptic strengths for a specific path length.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array of indices
representing input (presynaptic) neurons.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array of
indices representing output (postsynaptic) neurons.
outidx_map (dict): Mapping from indices to postsynaptic neuron groups. Only one
of inidx_map and outidx_map should be specified.
inidx_map (dict): Mapping from indices to presynaptic neuron groups. Only one of
inidx_map and outidx_map should be specified.
width (int, optional): The width of the plot. Defaults to 800.
height (int, optional): The height of the plot. Defaults to 400.
Returns:
None: Displays an interactive line plot showing the connection strength from all
of inidx to an average outidx over different path lengths.
"""
contri = contribution_by_path_lengths_data(
steps,
inidx,
outidx,
outidx_map=outidx_map,
inidx_map=inidx_map,
)
fig = px.line(
contri,
x="path_length",
y="value",
color=contri.columns[1],
width=width,
height=height,
)
fig.update_layout(
xaxis=dict(tickmode="linear", tick0=1, dtick=1),
yaxis=dict(tickmode="linear", tick0=0, dtick=contri.value.max() / 5),
)
# fig.show()
return fig
[docs]
def contribution_by_path_lengths_heatmap(
steps,
inidx,
outidx,
inidx_map=None,
outidx_map=None,
sort_by_index=True,
sort_names=None,
pre_in_column=False,
display_threshold=0,
cmap="viridis",
figsize=(30, 15),
):
"""Display the contribution from inidx to outidx, grouped by inidx_map and
outidx_map, across different path lengths.
Args:
steps (list of scipy.sparse matrices): List of sparse matrices, each
representing synaptic strengths for a specific path length.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array
of indices representing input (presynaptic) neurons.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): Array
of indices representing output (postsynaptic) neurons.
inidx_map (dict, optional): Mapping from indices to input neuron
groups. Defaults to None, in which case neurons are not grouped.
outidx_map (dict, optional): Mapping from indices to output neuron
groups. Defaults to None, in which case it is set to be the same
as inidx_map.
sort_by_index (bool, optional): Whether to sort the output by index.
Defaults to True.
sort_names (str or list, optional): the column name(s) to sort the
result by. If none is provided, then sort by the first column.
pre_in_column (bool, optional): Whether to have the presynaptic neuron
groups as columns. Defaults to False (pre in rows, post: columns).
display_threshold (float, optional): The threshold for displaying the
output. Defaults to 0.
cmap (str, optional): The colormap to use for the heatmap. Defaults to
'viridis'.
figsize (tuple, optional): The size of the figure to display. Defaults
to (30, 15).
Returns:
None: Displays an interactive heatmap showing the contribution from
inidx to outidx, grouped by inidx_map and outidx_map, across
different path lengths.
"""
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
if inidx_map is None:
inidx_map = {idx: idx for idx in inidx}
if outidx_map is None:
outidx_map = inidx_map
heatmaps = []
for step in tqdm(steps):
df = result_summary(
step,
inidx,
outidx,
inidx_map,
outidx_map,
display_output=False,
sort_names=sort_names,
pre_in_column=pre_in_column,
display_threshold=display_threshold,
)
if sort_by_index:
df.sort_index(inplace=True)
heatmaps.append(df)
slider = widgets.IntSlider(
value=1,
min=1,
max=len(heatmaps),
step=1,
description="Path length",
continuous_update=True,
)
def plot_heatmap(index):
plt.figure(figsize=figsize)
# plt.imshow(heatmaps[index], cmap='viridis', aspect = 'auto')
# Use seaborn's heatmap function which is a higher-level API for
# Matplotlib's imshow
sns.heatmap(
heatmaps[index - 1],
annot=True,
fmt=".2f",
linewidths=0.5,
cmap=cmap,
)
# Rotate the tick labels for the columns to show them better
plt.xticks(rotation=90)
plt.yticks(rotation=0)
# Show the heatmap
plt.show()
# Link the slider to the plotting function
display(widgets.interactive(plot_heatmap, index=slider))
[docs]
def effective_conn_from_paths(paths, group_dict=None, wide=True):
"""Calculate the effective connectivity between (groups of) neurons based
only on the provided `paths` between neurons. This function runs on CPU,
and doesn't expect a big connectivity matrix as input.
Args:
paths (pd.DataFrame): A dataframe representing the paths between
neurons, with columns 'pre', 'post', 'weight', and 'layer'.
group_dict (dict, optional): A dictionary mapping neuron indices
(values in columns `pre` and `post`) to groups. Defaults to None.
wide (bool, optional): Whether to pivot the output dataframe to a wide
format. Defaults to True.
Returns:
pd.DataFrame: A dataframe representing the effective connectivity
between groups of neurons.
"""
# it will get confusing if we didn't use the same mapping for all layers
# though it is true that this would increase the size of the matrix being
# multiplied
local_idx_dict = {
idx: i for i, idx in enumerate(set(paths.pre).union(set(paths.post)))
} # give one index for each element in path; map from element to index
# map from index to element
local_to_global_idx = {i: idx for idx, i in local_idx_dict.items()}
paths.loc[:, ["pre_idx"]] = paths.pre.map(local_idx_dict)
paths.loc[:, ["post_idx"]] = paths.post.map(local_idx_dict)
# matmul with sparse matrices
for i, layer in enumerate(sorted(paths.layer.unique())):
if i == 0:
initial_el = paths[paths.layer == layer] # edgelist
csr = csr_matrix(
(
initial_el.weight,
(initial_el.pre_idx.values, initial_el.post_idx.values),
),
shape=(len(local_idx_dict), len(local_idx_dict)),
) # make sparse matrix of the shape all_elements, all_elements
else:
el = paths[paths.layer == layer]
csr = csr @ csr_matrix(
(el.weight, (el.pre_idx.values, el.post_idx.values)),
shape=(len(local_idx_dict), len(local_idx_dict)),
)
coo = csr.tocoo()
result_el = pd.DataFrame(
{"pre_idx": coo.row, "post_idx": coo.col, "weight": coo.data}
)
result_el.loc[:, ["pre"]] = result_el.pre_idx.map(local_to_global_idx)
result_el.loc[:, ["post"]] = result_el.post_idx.map(local_to_global_idx)
if group_dict is not None:
result_el = group_edge_by(result_el, group_dict)
# result_el.loc[:, ['group_pre']] = result_el.pre.map(group_dict)
# result_el.loc[:, ['group_post']] = result_el.post.map(group_dict)
# # group by pre, sum; group by post, average
# # e.g. if group is cell type: the input proportion from type A to an
# average neuron in type B
# result_el = result_el.groupby( # sum across pre
# ['group_pre', 'group_post', 'post']).weight.sum().reset_index()
# result_el = result_el.groupby( # average across post
# ['group_pre', 'group_post']).weight.mean().reset_index()
# result_el.columns = ['pre', 'post', 'weight']
# pivot wider
if wide:
result_el = result_el.pivot(index="pre", columns="post", values="weight")
result_el.fillna(0, inplace=True)
return result_el
[docs]
def signed_effective_conn_from_paths(paths, group_dict=None, wide=True, idx_to_nt=None):
"""Calculate the *signed* effective connectivity between (groups of)
neurons based only on the provided `paths` between neurons. This function
runs on CPU, and doesn't expect a big connectivity matrix as input.
Args:
paths (pd.DataFrame): A dataframe representing the paths between
neurons, with columns 'pre', 'post', 'weight', 'layer', and
optionally 'sign'.
group_dict (dict, optional): A dictionary mapping neuron indices
(values in columns `pre` and `post`) to groups. Defaults to None.
wide (bool, optional): Whether to pivot the output dataframe to a wide
format. Defaults to True.
idx_to_nt (dict, optional): A dictionary mapping neuron indices
(values in columns `pre` and `post`) to 1 (excitatory) / -1
(inhibitory). Defaults to None.
Returns:
list: A list of two dataframes representing the effective connectivity
between groups of neurons, one for effective excitation, the other
inhibition.
"""
if ("sign" not in paths.columns) & (idx_to_nt is None):
raise ValueError(
"Either 'sign' column must be present in paths or "
"idx_to_nt must be provided."
)
# setting local indices
# it will get confusing if we didn't use the same mapping for all layers
# though it is true that this would increase the size of the matrix being
# multiplied
local_idx_dict = {
idx: i for i, idx in enumerate(set(paths.pre).union(set(paths.post)))
}
local_to_global_idx = {i: idx for idx, i in local_idx_dict.items()}
paths.loc[:, ["pre_idx"]] = paths.pre.map(local_idx_dict)
paths.loc[:, ["post_idx"]] = paths.post.map(local_idx_dict)
# make sure sign is in the column:
if "sign" not in paths.columns:
if any(~paths.pre.isin(idx_to_nt)):
print(
"Warning: some neurons are not in idx_to_nt. Their outputs "
"will be ignored"
)
paths.loc[:, "sign"] = paths.pre.map(idx_to_nt)
# matmul with sparse matrices
for i, layer in enumerate(sorted(paths.layer.unique())):
if i == 0:
initial_el_e = paths[(paths.layer == layer) & (paths.sign == 1)]
initial_el_i = paths[(paths.layer == layer) & (paths.sign == -1)]
csr_e = csr_matrix(
(
initial_el_e.weight,
(
initial_el_e.pre_idx.values,
initial_el_e.post_idx.values,
),
),
shape=(len(local_idx_dict), len(local_idx_dict)),
)
csr_i = csr_matrix(
(
initial_el_i.weight,
(
initial_el_i.pre_idx.values,
initial_el_i.post_idx.values,
),
),
shape=(len(local_idx_dict), len(local_idx_dict)),
)
else:
el_e = paths[(paths.layer == layer) & (paths.sign == 1)]
el_i = paths[(paths.layer == layer) & (paths.sign == -1)]
this_csr_e = csr_matrix(
(el_e.weight, (el_e.pre_idx.values, el_e.post_idx.values)),
shape=(len(local_idx_dict), len(local_idx_dict)),
)
this_csr_i = csr_matrix(
(el_i.weight, (el_i.pre_idx.values, el_i.post_idx.values)),
shape=(len(local_idx_dict), len(local_idx_dict)),
)
# e = ee + ii
# make sure csr_e is not modified in place, so that we can use it
# for csr_i
csr_e_new = csr_e @ this_csr_e + csr_i @ this_csr_i
# i = ie + ei
csr_i = csr_e @ this_csr_i + csr_i @ this_csr_e
# now modify csr_e
csr_e = csr_e_new
coo_e = csr_e.tocoo()
coo_i = csr_i.tocoo()
# make dataframe based on the connectivity matrix
result_el_e = pd.DataFrame(
{"pre_idx": coo_e.row, "post_idx": coo_e.col, "weight": coo_e.data}
)
result_el_i = pd.DataFrame(
{"pre_idx": coo_i.row, "post_idx": coo_i.col, "weight": coo_i.data}
)
# change back to the global names
result_el_e.loc[:, ["pre"]] = result_el_e.pre_idx.map(local_to_global_idx)
result_el_e.loc[:, ["post"]] = result_el_e.post_idx.map(local_to_global_idx)
result_el_i.loc[:, ["pre"]] = result_el_i.pre_idx.map(local_to_global_idx)
result_el_i.loc[:, ["post"]] = result_el_i.post_idx.map(local_to_global_idx)
if group_dict is not None:
result_el_e = group_edge_by(result_el_e, group_dict)
result_el_i = group_edge_by(result_el_i, group_dict)
if wide:
result_el_e = result_el_e.pivot(index="pre", columns="post", values="weight")
result_el_e.fillna(0, inplace=True)
result_el_i = result_el_i.pivot(index="pre", columns="post", values="weight")
result_el_i.fillna(0, inplace=True)
return result_el_e, result_el_i
[docs]
def read_precomputed(
prefix: str, file_path: str | None = None, first_n: int | None = None
) -> List:
"""Reads the precomputed compressed paths.
Args:
prefix (str): The prefix/folder name (expected to be the same) of the
files to read.
file_path (str, optional): The path to the files. Defaults to None. If
None, checks if running in Google Colab, and sets the path: if
running in Colab, sets the path to "/content/"; otherwise, sets the
path to "".
first_n (int, optional): Number of files to read. If None, reads all files.
Returns:
List: A list of matrices (sparse or dense) representing the steps.
"""
if file_path is None:
# Check if running in Google Colab
if "COLAB_GPU" in os.environ:
# Running in Colab
file_path = "/content/"
else:
# Running locally
file_path = ""
steps_cpu = []
# Get all files with the prefix pattern
folder_path = os.path.join(file_path, prefix)
if not os.path.exists(folder_path):
raise FileNotFoundError(f"Directory {folder_path} does not exist")
files = os.listdir(folder_path)
# Filter files that match the pattern prefix_i.npz or prefix_i.npy
step_files = []
for f in files:
if f.startswith(f"{prefix}_") and (f.endswith(".npz") or f.endswith(".npy")):
# Extract the step number from the filename
try:
step_num = int(f.split("_")[-1].split(".")[0])
step_files.append((step_num, f))
except ValueError:
# Skip files that don't match the expected pattern
continue
# Sort by step number to ensure correct order
step_files.sort(key=lambda x: x[0])
if first_n is None:
first_n = len(step_files)
else:
first_n = min(first_n, len(step_files))
# Load the first_n files
for i in range(first_n):
step_num, filename = step_files[i]
file_full_path = os.path.join(folder_path, filename)
if filename.endswith(".npz"):
# Load sparse matrix
matrix = sp.sparse.load_npz(file_full_path)
elif filename.endswith(".npy"):
# Load dense matrix
matrix = np.load(file_full_path)
else:
# This shouldn't happen given our filtering, but just in case
continue
steps_cpu.append(matrix)
if len(steps_cpu) == 0:
raise FileNotFoundError(
f"No files found with pattern {prefix}_*.np[yz] in {folder_path}"
)
return steps_cpu