# Standard library imports
import itertools
from dataclasses import dataclass
from typing import Dict, List, Union, Tuple, Optional, Generator
from collections import defaultdict
# Third-party package imports
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
from scipy.sparse import issparse, spmatrix, csr_matrix
from tqdm import tqdm
from scipy.sparse.csgraph import shortest_path
from .utils import (
arrayable,
check_consecutive_layers,
count_keys_per_value,
to_nparray,
)
[docs]
def find_path_once(
inprop,
steps_cpu,
inidx: arrayable,
outidx: arrayable,
target_layer_number,
top_n=-1,
threshold=0,
):
"""
Finds the path once between input and output, of distance
target_layer_number, returning indices of neurons in the previous layer
that connect the input with the output. This works by taking the top_n
direct upstream partners of the outidx neurons, and intersect those with
neurons 'effectively' connected (through steps_cpu) to the inidx neurons.
Args:
inprop (scipy.sparse.csc_matrix): The connectivity matrix in
Compressed Sparse Column format.
steps_cpu (list): A list of compressed connectivity matrices: one matrix
for each compressed path length.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): The
input neuron index/indices.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): The
output neuron index/indices.
target_layer_number (int): The target layer number to examine. Must be
>= 1. When target_layer_number = 1, we are looking at the direct
synaptic connectivity.
top_n (int, optional): The number of top connections to consider based
on direct connectivity from inprop_csc. If top_n = -1, all connections
are considered.
threshold (float, optional): The threshold of the direct connectivity
from inidx to an average outidx.
Returns:
np.ndarray:
An array of neuron indices in the previous layer that have significant
connectivity, connecting between the `inidx` and `outidx`.
"""
inprop_csc = inprop.copy()
inprop_csc.data = np.abs(inprop.data)
# make sure they are integers
inidx = [int(i) for i in inidx]
outidx = [int(i) for i in outidx]
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
if target_layer_number == 1:
# if the target layer is 1, we are looking at the direct synaptic
# connectivity
# so we just need to find the indices of the non-zero values in the
# inprop_csc matrix that correspond to the outidx, and intersect
# those with the inidx we are interested in.
colidx = inidx
else:
# first get the neurons that effectively connect to inidx, at layer
# number target_layer_number - 1.
# for example, in the ORN->PN->KC case, target_layer_number = 2 for KCs.
# top_n_row_indices are indices of the PNs that connect to the KCs. So
# in this case we should use direct connectivity from PNs to get the
# ORNs.
# so when target_layer_number == 2, we should use steps_cpu[0] (direct
# connectivity), that is, target_layer_number-2:
# subtract 1 for getting top_n_row_indices below which is going one
# step upstream; and another for 0-based indexing in steps_cpu.
# the next line gets the targets that receive non-zero compressed
# input from inidx
# when target_layer_number >= 2.
# .nonzero() returns row, column of the nonzero values.
colidx = steps_cpu[target_layer_number - 2][inidx, :].nonzero()[1]
# then go back one step from the outidx, based on the direct connectivity
# matrix (inprop)
us = inprop_csc[:, outidx].nonzero()
# intersect the non-zero upstream neurons with those effectively connected
# across layers (colidx)
# these are still row indices of inprop
intersect = np.intersect1d(us[0], colidx)
# direct connectivity between target_layer_number-1 and target_layer_number
submatrix = inprop_csc[intersect, :][:, outidx]
# in case outidx is more than one index, calculate the average of each row
# in the submatrix
# same length as len(intersect)
row_averages = np.array(submatrix.mean(axis=1)).flatten()
# thresholding
# these are indices of row_averages, which can also be used to index
# row_averages
thresholded_indices = np.where(row_averages >= threshold)[0]
thresholded_row_averages = row_averages[thresholded_indices]
# thresholded_intersect are a subset of of intersect, which are indices of
# inprop_csc
# intersect has the same length as row_averages
thresholded_intersect = intersect[thresholded_indices]
# Find the indices (of thresholded_row_averages) of the top n averages
if top_n == -1:
top_n_indices = np.argsort(thresholded_row_averages)
else:
top_n_indices = np.argsort(thresholded_row_averages)[-top_n:]
# Get the original row indices corresponding to these top n averages
top_n_row_indices = thresholded_intersect[top_n_indices]
# make the edgelist
submatrix = inprop_csc[top_n_row_indices, :][:, outidx]
# Convert submatrix to COO format to efficiently find non-zero elements
coo_submatrix = submatrix.tocoo()
# Create DataFrame directly from COO format data
df = pd.DataFrame(
{
# Map back to original indices
"pre": top_n_row_indices[coo_submatrix.row],
# Map back to original output indices
"post": outidx[coo_submatrix.col],
"weight": coo_submatrix.data,
}
)
return df
[docs]
def find_paths_of_length(
edgelist: Union[spmatrix, pd.DataFrame],
inidx: arrayable,
outidx: arrayable,
target_layer_number: int,
quiet: bool = False,
):
"""
Finds the path of length target_layer_number between inidx and outidx, returning the
edgelist in a DataFrame, including the pre and post indices, the layer (direct
connections from inidx: layer = 1), and the weight of the direct connection between
pre and post.
Args:
edgelist (Union[spmatrix, pd.DataFrame]): The edgelist of the entire graph. If a
DataFrame, it must contain columns "pre", "post", and "weight". If a sparse
matrix, the pre needs to be in the rows.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): The source indices.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): The target
indices.
target_layer_number (int): The target layer number to examine. Must be >= 1. When
target_layer_number = 1, we are looking at the direct synaptic connectivity.
Returns:
pd.DataFrame:
A DataFrame containing the path data, including the pre-synaptic and
post-synaptic neuron indices, the layer (direct connections from inidx: layer =
1), and the weight (input proportion of the postsynaptic neuron) of the direct
connection between pre and post. If no path is found, returns None.
"""
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
assert len(inidx) > 0, "inidx must not be empty"
assert len(outidx) > 0, "outidx must not be empty"
if isinstance(edgelist, pd.DataFrame):
# if edgelist is a DataFrame, convert it to a sparse matrix
if not all(col in edgelist.columns for col in ["pre", "post", "weight"]):
raise ValueError(
"edgelist DataFrame must contain 'pre', 'post', and 'weight' columns."
)
elif isinstance(edgelist, spmatrix):
edgelist = edgelist.tocoo()
edgelist = pd.DataFrame(
data={
"pre": edgelist.row,
"post": edgelist.col,
"weight": edgelist.data,
}
)
else:
raise TypeError("edgelist must be a pandas DataFrame or a scipy sparse matrix.")
assert target_layer_number >= 1, "target_layer_number must be >= 1"
if target_layer_number == 1:
# if target_layer_number is 1, we just return the direct connections
el = edgelist[
(edgelist["pre"].isin(inidx)) & (edgelist["post"].isin(outidx))
].copy()
if el.empty:
return None
el.loc[:, ["layer"]] = 1
return el
# first find the middle layer:
if target_layer_number % 2 == 0:
# when target_layer_number == 2, the middle layer is 1
middle_layer = target_layer_number // 2
else:
# when target_layer_number == 3, the middle layer is 2
middle_layer = (target_layer_number + 1) // 2
# list of empty lists, of length target_layer_number+1
layer_indices = [[] for _ in range(target_layer_number + 1)]
# first one is inidx
layer_indices[0] = inidx
# last one is outidx
layer_indices[-1] = outidx
# first go from inidx to middle layer
for layer in range(1, middle_layer + 1):
layer_indices[layer] = list(
edgelist[edgelist["pre"].isin(layer_indices[layer - 1])]["post"].unique()
)
if len(layer_indices[layer]) == 0:
if not quiet:
print(f"No neurons found in layer {layer}. Returning None.")
return None
# then go from outidx to middle layer
# stopping at the layer after middle_layer
# when target_layer_number is 2 or 3, this is skipped
# when target_layer_number == 4:
for layer in range(target_layer_number - 1, middle_layer, -1): # layer is 3
layer_indices[layer] = list(
edgelist[edgelist["post"].isin(layer_indices[layer + 1])]["pre"].unique()
)
if len(layer_indices[layer]) == 0:
if not quiet:
print(f"No neurons found in layer {layer}. Returning None.")
return None
# link middle layer with the one after
el = edgelist[
(edgelist["pre"].isin(layer_indices[middle_layer]))
& (edgelist["post"].isin(layer_indices[middle_layer + 1]))
]
layer_indices[middle_layer] = el["pre"].unique()
layer_indices[middle_layer + 1] = el["post"].unique()
if (
len(layer_indices[middle_layer]) == 0
or len(layer_indices[middle_layer + 1]) == 0
):
if not quiet:
print(
f"No neurons found in layer {middle_layer} or {middle_layer + 1}. Returning None."
)
return None
# now go from middle layer to the first layer
for layer in range(middle_layer - 1, -1, -1):
el = edgelist[
edgelist["pre"].isin(layer_indices[layer])
& edgelist["post"].isin(layer_indices[layer + 1])
]
layer_indices[layer] = el["pre"].unique()
if len(layer_indices[layer]) == 0:
if not quiet:
print(f"No neurons found in layer {layer}. Returning None.")
return None
# now go from middle layer to the last layer
for layer in range(middle_layer + 1, target_layer_number):
el = edgelist[
edgelist["pre"].isin(layer_indices[layer - 1])
& edgelist["post"].isin(layer_indices[layer])
]
layer_indices[layer] = el["post"].unique()
if len(layer_indices[layer]) == 0:
if not quiet:
print(f"No neurons found in layer {layer}. Returning None.")
return None
# add the weights
path = []
for layer in range(target_layer_number):
el = edgelist[
(edgelist["pre"].isin(layer_indices[layer]))
& (edgelist["post"].isin(layer_indices[layer + 1]))
]
el.loc[:, ["layer"]] = layer + 1
path.append(el)
return pd.concat(path, ignore_index=True)
[docs]
def enumerate_paths(
edgelist: pd.DataFrame,
start_layer: int = 1,
end_layer: Optional[int] = None,
return_generator: bool = False,
loop_mode: str = "allow",
) -> Union[
List[List[Tuple[Union[str, int], Union[str, int], float]]],
Generator[List[Tuple[Union[str, int], Union[str, int], float]], None, None],
]:
"""
Finds all paths that begin with an edge in start_layer and end with an edge in
end_layer, assuming valid paths proceed layer-by-layer without skipping. This
function was written by Claude.
Args:
edgelist (pd.DataFrame): The edgelist of the entire graph. Must contain columns:
"layer", "pre", "post", and "weight". Each row is a directed, weighted edge
from "pre" to "post" at a given layer.
start_layer (int): The layer from which all paths must begin. Must be <= end_layer.
end_layer (int): The layer at which all paths must terminate. If None, defaults to
the maximum layer in the edgelist.
return_generator (bool, optional): If True, returns a generator instead of a list.
Useful for large graphs to avoid memory issues. Defaults to False.
loop_mode (str, optional): How to handle loops in paths. Options:
- "allow" (default): Return all paths, including those that revisit a vertex.
- "exclude": Return only loop-free (simple) paths. A node appearing at both
the start and end of a path is allowed (e.g. A-B-C-A is kept, capturing the
s-t cycle use case), but a node reappearing in the middle is a loop and the
path is dropped (e.g. A-B-A-C). This matches the convention in
:func:`count_paths`.
Returns:
Union[List[List[Tuple]], Generator]:
If return_generator is False, returns a list of valid paths. If True, returns a
generator. Each path is a list of (pre, post, weight) tuples, ordered from start
to end.
"""
if end_layer is None:
end_layer = edgelist["layer"].max()
if start_layer > end_layer:
raise ValueError("start_layer must be less than or equal to end_layer.")
if loop_mode not in ("allow", "exclude"):
raise ValueError(f"loop_mode must be 'allow' or 'exclude', got '{loop_mode}'")
exclude_loops = loop_mode == "exclude"
# Build adjacency: adj[layer][pre] → list of (post, weight)
# pre and post can be either strings or integers
adj: Dict[int, Dict[Union[str, int], List[Tuple[Union[str, int], float]]]] = (
defaultdict(lambda: defaultdict(list))
)
for _, row in edgelist.iterrows():
adj[row["layer"]][row["pre"]].append((row["post"], row["weight"]))
if return_generator:
# Generator version - yields paths as they are found
def generate_paths():
def dfs(
node: Union[str, int],
layer: int,
path: List[Tuple[Union[str, int], Union[str, int], float]],
visited: set,
start_node: Union[str, int],
):
# Loop check (only active when loop_mode == "exclude"). A revisited
# node is rejected, except the start node reappearing exactly at the
# final position, which closes an s-t cycle and is kept.
if exclude_loops and node in visited:
if layer == end_layer and node == start_node:
yield path.copy()
return
if layer == end_layer:
yield path.copy() # yield a copy to avoid mutation issues
return
if exclude_loops:
visited.add(node)
for post, wt in adj.get(layer + 1, {}).get(node, []):
path.append((node, post, wt))
yield from dfs(post, layer + 1, path, visited, start_node)
path.pop()
if exclude_loops:
visited.remove(node)
for pre, edges in adj.get(start_layer, {}).items():
for post, wt in edges:
yield from dfs(post, start_layer, [(pre, post, wt)], {pre}, pre)
return generate_paths()
else:
# List version - uses mutable path for efficiency (append/pop instead of copying)
paths: List[List[Tuple[Union[str, int], Union[str, int], float]]] = []
def dfs(
node: Union[str, int],
layer: int,
path: List[Tuple[Union[str, int], Union[str, int], float]],
visited: set,
start_node: Union[str, int],
):
# Loop check (only active when loop_mode == "exclude"). A revisited node
# is rejected, except the start node reappearing exactly at the final
# position, which closes an s-t cycle and is kept.
if exclude_loops and node in visited:
if layer == end_layer and node == start_node:
paths.append(path.copy())
return
if layer == end_layer:
paths.append(path.copy()) # append a copy to avoid mutation
return
if exclude_loops:
visited.add(node)
for post, wt in adj.get(layer + 1, {}).get(node, []):
path.append((node, post, wt))
dfs(post, layer + 1, path, visited, start_node)
path.pop()
if exclude_loops:
visited.remove(node)
for pre, edges in adj.get(start_layer, {}).items():
for post, wt in edges:
dfs(post, start_layer, [(pre, post, wt)], {pre}, pre)
return paths
[docs]
def find_path_iteratively(
inprop_csc: spmatrix,
steps_cpu: list,
inidx: arrayable,
outidx: arrayable,
target_layer_number: int,
top_n: int = -1,
threshold: float = 0,
quiet: bool = False,
):
"""
Iteratively finds the path from the specified output (outidx) back to the
input (inidx) across multiple layers, using the `find_path_once` function
to traverse each layer.
Args:
inprop_csc (scipy.sparse matrix): The direct connectivity matrix in
Compressed Sparse Column format.
steps_cpu (list): A list of compressed connectivity matrices: one
matrix for each compressed path length.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): The
input neuron indices.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): The
output neuron indices to start the reverse path finding.
target_layer_number (int): The number of layers to traverse backwards
from the outidx. If target_layer_number = 1, we are looking at the
direct synaptic connectivity.
top_n (int, optional): The number of top connections to consider at each
layer based on direct connectivity from inprop_csc. If top_n = -1, all
connections are considered.
threshold (float, optional): The threshold for the average of the direct
connectivity from inidx to outidx.
quiet (bool, optional): If True, suppresses print statements. Defaults to False.
Returns:
pd.DataFrame:
A DataFrame containing the path data, including the pre-synaptic and
post-synaptic neuron indices, the layer (direct connections from inidx:
layer = 1), and the weight (input proportion of the postsynaptic neuron) of
the direct connection between pre and post.
"""
if not quiet:
print(
'Have you tried "find_paths_of_length()" instead? About the same speed, no need to pre-load `steps`, and more accurate!'
)
inprop_csc.data = np.abs(inprop_csc.data)
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
if len(inidx) == 0 or len(outidx) == 0:
# raise error
raise ValueError("The input or output indices are empty!")
if issparse(inprop_csc):
# if stepsn is coo, turn into csc
if inprop_csc.format == "coo":
inprop_csc = inprop_csc.tocsc()
# path_indices = [] # This will store the path data as a list of arrays
dfs = [] # This will store the path data as a list of DataFrames
current_outidx = outidx
# from target_layer_number to 1, go back one step at a time
for layer in range(target_layer_number, 0, -1):
# Find the indices in the current layer that connect to the next layer
df = find_path_once(
inprop_csc,
steps_cpu,
inidx,
current_outidx,
layer,
top_n,
threshold,
)
# If no indices are found, break the loop as no path can be formed
if len(df) == 0:
if not quiet:
print(f"Cannot trace back to the input in {target_layer_number} steps.")
if (top_n > -1) | (threshold > 0):
print("Try lowering the threshold or increasing top_n.")
return
df["layer"] = layer
dfs.append(df)
# Update the outidx for the next iteration to move backwards through
# layers
current_outidx = set(df["pre"])
return pd.concat(dfs)
[docs]
def create_layered_positions(
df: pd.DataFrame, priority_indices=None, sort_dict: dict | None = None
) -> dict:
"""
Creates a dictionary of positions for each neuron in the paths, so that
the paths can be visualized in a layered manner. It assumes that `df`
contains the columns 'layer', 'pre_layer', 'post_layer' (or 'layer', 'pre',
'post'). If a neuron exists in multiple layers, it is plotted multiple
times.
Args:
df (pd.DataFrame): The DataFrame containing the path data, including the layer
number, pre-synaptic index, and post-synaptic index.
priority_indices (list, set, pd.Series, numpy.ndarray optional): A list of
neuron indices that should be plotted on top of each layer. Defaults to None.
sort_dict (dict, optional): A dictionary of neuron indices as keys and their
sorting order as values (bigger value is higher in the plot). Defaults to
None.
Returns:
dict: A dictionary of positions for each neuron in the paths, with the
keys as the neuron indices and the values as the (x, y) coordinates.
"""
# check if layer numbers in 'layer' column are consecutive. If anyone is
# absent, raise an error
if not check_consecutive_layers(df):
raise ValueError("The layer numbers in 'layer' column are not consecutive. ")
# if post_layer and pre_layer are not present, create them
if "post_layer" not in df.columns:
df["post_layer"] = df["post"].astype(str) + "_" + df["layer"].astype(str)
if "pre_layer" not in df.columns:
df["pre_layer"] = df["pre"].astype(str) + "_" + (df["layer"] - 1).astype(str)
if priority_indices is not None:
priority_indices = set(priority_indices)
number_of_layers = df["layer"].nunique()
layer_width = 1.0 / (number_of_layers + 1)
positions = {}
name_to_idx = dict(zip(df.pre_layer, df["pre"]))
name_to_idx.update(dict(zip(df.post_layer, df["post"])))
global_to_local_layer_number = {
l: i for i, l in enumerate(sorted(df["layer"].unique()))
}
df.loc[:, ["local_layer"]] = df["layer"].map(global_to_local_layer_number)
for layer in range(0, number_of_layers + 1):
if layer != number_of_layers:
layer_name = list(
set(df.pre_layer[df.local_layer == layer]).union(
set(df.post_layer[df.local_layer == (layer - 1)])
)
)
else:
layer_name = list(set(df.post_layer[df.local_layer == layer - 1]))
if sort_dict is not None:
layer_name = sorted(
layer_name, key=lambda x: sort_dict.get(x, float("-inf"))
)
if priority_indices is not None:
layer_name_priority = [
item for item in layer_name if name_to_idx[item] in priority_indices
]
layer_name_not = [
item for item in layer_name if name_to_idx[item] not in priority_indices
]
layer_name = layer_name_not + layer_name_priority
for index, neuron in enumerate(layer_name, start=1):
positions[neuron] = (
layer * layer_width,
# the later in the list, the higher
index * 1.0 / (len(layer_name) + 1),
)
return positions
[docs]
def remove_excess_neurons(
df: pd.DataFrame,
keep=None,
target_indices=None,
keep_targets_in_middle: bool = False,
quiet: bool = False,
) -> pd.DataFrame:
"""After filtering, some neurons are no longer on the paths between the input and
output neurons. This function removes those neurons from the paths.
Args:
df (pd.Dataframe): a filtered dataframe with similar structure as the dataframe
returned by `find_paths_of_length()`. If df is empty, and `quiet` is False,
raises a ValueError. If `quiet` is True, returns None.
keep (list, set, pd.Series, numpy.ndarray, str, optional): A list of neuron
indices that should be kept in the paths, even if they don't connect between
input and target in the last layer. Defaults to None.
target_indices (list, set, pd.Series, numpy.ndarray, str, optional): A list of
target neuron indices that should be kept in the last layer. Defaults to
None, in which case all neurons in the last layer in `df` would be kept.
keep_targets_in_middle (bool, optional): If True, the target_indices are kept in
the middle layers as well, even if they don't connect between input and
target in the last layer. Defaults to False.
Returns:
pd.Dataframe:
A dataframe with similar structure as the result of `find_paths_of_length()`,
with the excess neurons removed. If no path is found, returns None.
"""
if df.shape[0] == 0:
if quiet:
return
else:
# raise error
raise ValueError(
"No connections found in the input of `remove_excess_neurons()`. "
)
max_layer_num = df["layer"].max()
if max_layer_num == 1:
return df
# if target_indices are provided, first use this to filter the last layer ----
if target_indices is not None:
# if it's a string or int, convert to list
if isinstance(target_indices, str) or (type(target_indices) == int):
target_indices = [target_indices]
target_indices = set(target_indices)
# check if datatype is the same, between target_indices and post
if not all([type(i) == type(df["post"].iloc[0]) for i in target_indices]):
raise ValueError(
f"The datatype in `target_indices` should be the same as the elements in `post` column of the DataFrame. Elements in `post` is {type(df.post.iloc[0])}."
)
if not target_indices.issubset(df[df["layer"] == df["layer"].max()]["post"]):
raise ValueError(
"The target indices are not in the post-synaptic neurons of the last layer. Here are the indices of the last layer: ",
", ".join(df[df["layer"] == df["layer"].max()]["post"].unique()),
". Your target_indices should be a subset.",
)
df = df[(df["layer"] != df["layer"].max()) | df["post"].isin(target_indices)]
# check if all layer numbers are consecutive ----
if not check_consecutive_layers(df):
if not quiet:
print(
"Warning: The layer numbers are not consecutive. Will only use the"
" consecutive layers from the last one."
)
selected = []
# get the consecutive layers from the last one
for l in range(df["layer"].max(), 0, -1):
if l in df["layer"].unique():
selected.append(l)
else:
break
df = df[df["layer"].isin(selected)]
global_to_local_layer_number = {
l: i for i, l in enumerate(sorted(df["layer"].unique()))
}
df.loc[:, ["local_layer"]] = df["layer"].map(global_to_local_layer_number)
elif df["layer"].min() != 1:
# if the layer numbers do not start from 1
if not quiet:
print(
"Warning: The layer numbers do not start from 1. Will only use the "
"consecutive layers from the last one."
)
global_to_local_layer_number = {
l: i for i, l in enumerate(sorted(df["layer"].unique()))
}
df.loc[:, ["local_layer"]] = df["layer"].map(global_to_local_layer_number)
else:
df.loc[:, ["local_layer"]] = df["layer"]
# while there is any layer whose post is not the same as the next layer's pre ----
while any(
[
set(df[df.local_layer == i]["post"])
!= set(df[df.local_layer == (i + 1)]["pre"])
for i in range(1, df.local_layer.max())
]
):
if keep is not None:
if isinstance(keep, str):
keep = [keep]
keep = set(keep)
if all(
[
set(df[df.local_layer == i]["post"]).union(keep)
== set(df[df.local_layer == (i + 1)]["pre"]).union(keep)
for i in range(1, df.local_layer.max())
]
):
break
else:
keep = set()
if keep_targets_in_middle:
if target_indices is not None:
keep = keep.union(target_indices)
# start adding each layer to df_layers_update ---
df_layers_update = []
# if there are only two layers
if df.local_layer.max() == 2:
df_layer = df[df.local_layer == 2]
df_prev_layer = df[df.local_layer == 1]
# filter by pre in the second layer
df_layers_update.append(
df_layer[df_layer["pre"].isin(set(df_prev_layer["post"]).union(keep))]
)
# filter by post in the first layer
df_layers_update.append(
df_prev_layer[
df_prev_layer["post"].isin(set(df_layer["pre"]).union(keep))
]
)
df = pd.concat(df_layers_update)
else:
for i in range(2, df.local_layer.max()):
df_layer = df[df.local_layer == i]
df_next_layer = df[df.local_layer == i + 1]
df_prev_layer = df[df.local_layer == i - 1]
# pre that should be in the current layer: an intersection of
# previous layer's post; and current layer's pre
df_pre = set.intersection(
set(df_prev_layer["post"]) & set(df_layer["pre"])
)
# post that should be in the current layer: an intersection of
# current layer's post; and next layer's pre
df_post = set.intersection(
set(df_layer["post"]), set(df_next_layer["pre"])
)
if i == 2:
# add edges in the first layer
df_prev_layer = df_prev_layer[
df_prev_layer["post"].isin(df_pre.union(keep))
]
df_layers_update.append(df_prev_layer)
df_layer = df_layer[
df_layer["pre"].isin(df_pre.union(keep))
& df_layer["post"].isin(df_post.union(keep))
]
if df_layer.shape[0] == 0:
if not quiet:
print(
"No path found. Try lowering the threshold for the edges to be included in the path."
)
return
df_layers_update.append(df_layer)
if i == (df.local_layer.max() - 1):
# add edges in the last layer
if target_indices is None:
df_next_layer = df_next_layer[
df_next_layer["pre"].isin(df_post.union(keep))
]
else:
df_next_layer = df_next_layer[
df_next_layer["pre"].isin(
df_post.union(keep).union(target_indices)
)
]
df_layers_update.append(df_next_layer)
df = pd.concat(df_layers_update)
# at this point, if no edges left: return None
if df.shape[0] == 0:
if not quiet:
print("No path found. Try relaxing the criteria for edge inclusion.")
return
df = df.loc[:, df.columns != "local_layer"]
# in case we removed all the connections in the last layer
if df["layer"].max() != max_layer_num:
if not quiet:
print(
"No path found. Try lowering the threshold for the edges to be included in the path."
)
return
return df
[docs]
def remove_excess_neurons_batched(
df: pd.DataFrame,
keep=None,
target_indices=None,
keep_targets_in_middle: bool = False,
quiet: bool = False,
) -> pd.DataFrame:
"""Does the same thing as `remove_excess_neurons()`, but for batched input
(i.e. assumes column `batch` in `df`).
Args:
df (pd.DataFrame): a filtered dataframe with similar structure as the dataframe
returned by `find_path_iteratively()`. Must contain a column `batch`.
keep (list, set, pd.Series, numpy.ndarray, str, optional): A list of neuron
indices that should be kept in the paths, even if they don't connect between
input and target in the last layer. Defaults to None.
target_indices (list, set, pd.Series, numpy.ndarray, str, optional): A list of
target neuron indices that should be kept in the last layer. Defaults to
None, in which case all neurons in the last layer in `df` would be kept.
keep_targets_in_middle (bool, optional): If True, the target_indices are kept in
the middle layers as well, even if they don't connect between input and
target in the last layer. Defaults to False.
quiet (bool, optional): If True, suppresses print statements. Defaults to False.
Returns:
pd.DataFrame:
The filtered DataFrame containing the path data, including the layer number,
pre-synaptic index, post-synaptic index, and weight
"""
# first check if 'batch' is in the columns
if "batch" not in df.columns:
raise ValueError("The column `batch` is not in the DataFrame.")
dfs = []
for b in tqdm(df.batch.unique()):
df_batch = df[df.batch == b]
df_batch = remove_excess_neurons(
df_batch, keep, target_indices, keep_targets_in_middle, quiet=quiet
)
dfs.append(df_batch)
return pd.concat(dfs)
[docs]
def filter_paths(
df: pd.DataFrame,
threshold: float = 0,
necessary_intermediate: Dict[int, arrayable] | None = None,
quiet: bool = False,
) -> pd.DataFrame:
"""Filters the paths based on the weight threshold and the necessary
intermediate neurons. The weight threshold refers to the direct
connectivity between connected neurons in the path. It is recommended to
not put too may neurons in necessary_intermediate, as it may be too
stringent and remove all paths.
Args:
df (pd.DataFrame): The DataFrame containing the path data, including the layer
number, pre-synaptic index, post-synaptic index, and weight. If df is empty,
and `quiet` is False, raises a ValueError. If `quiet` is True, returns None.
threshold (float, optional): The threshold for the weight of the direct
connection between pre and post. Defaults to 0.
necessary_intermediate (dict, optional): A dictionary of necessary intermediate
neurons, where the keys are the layer numbers (starting neurons: 1; directly
downstream: 2) and the values are the neuron indices (can be int, float,
list, set, numpy.ndarray, or pandas.Series). Defaults to None.
Returns:
pd.DataFrame:
The filtered DataFrame containing the path data, including the layer number,
pre-synaptic index, post-synaptic index, and weight.
"""
if df.shape[0] == 0:
if quiet:
return
else: # raise error
raise ValueError("The input DataFrame for filter_paths() is empty! ")
unique_layers = set(df.layer)
if threshold > 0:
df = df[df.weight > threshold]
if df.shape[0] == 0:
if not quiet:
print("No edges left after thresholding. Try lowering the threshold.")
return
elif set(df["layer"]) != unique_layers:
if not quiet:
print(
"Some layers have no edges left after thresholding. Try lowering the threshold."
)
return
df = remove_excess_neurons(df, quiet=quiet)
if necessary_intermediate is not None:
for layer, indices in necessary_intermediate.items():
if type(indices) != list:
indices = list(to_nparray(indices))
if layer < 1:
# error: layer has to be an integer >=1
raise ValueError("Layer has to be an integer >=1")
if layer < (df["layer"].max() + 1):
df = df[df["pre"].isin(indices) | (df["layer"] != layer)]
elif layer == (df["layer"].max() + 1):
# filter for targets
df = df[df["post"].isin(indices) | ((df["layer"] + 1) != layer)]
else:
# error: layer number too big
raise ValueError("Layer number too big")
if set(df["layer"]) != unique_layers:
if not quiet:
print(
"Some layers have no edges left after applying necessary_intermediate."
)
return
df = remove_excess_neurons(df, quiet=quiet)
return df
[docs]
def group_paths(
paths: pd.DataFrame,
pre_group: Optional[dict] = None,
post_group: Optional[dict] = None,
intermediate_group: Optional[dict] = None,
avg_within_connected: bool = False,
outprop: bool = False,
combining_method: str = "mean",
) -> pd.DataFrame:
"""
Group the paths by user-specified variable (e.g. cell type, cell class etc.).
If outprop=False, weights are summed across presynaptic neurons of the same group
and combined across all postsynaptic neurons of the same group using combining_method
(even if some postsynaptic neurons are not in `paths`).
If outprop=True, weights are summed across postsynaptic neurons of the same group
and combined across all presynaptic neurons of the same group using combining_method
(even if some presynaptic neurons are not in `paths`).
Args:
paths (pd.DataFrame): The DataFrame containing the path data, looking
like the output from `find_path_iteratively()`.
pre_group (dict): A dictionary that maps pre-synaptic neuron indices
to their respective group.
post_group (dict): A dictionary that maps post-synaptic neuron indices
to their respective group.
intermediate_group (dict, optional): A dictionary that maps
intermediate neuron indices to their respective group. Defaults to
None. If None, it will be set to pre_group.
avg_within_connected (bool, optional): If True, the average weight is
calculated within the connected neurons of the same group. If
False, the average weight is calculated across all neurons of the
same group. Defaults to False.
outprop (bool, optional): If True, get the summed output proportion (across
recipient single cells in the same cell type) for each average sender. If
False (default), get the summed input proportion across all senders for
each average recipient.
combining_method (str, optional): Method to combine inputs (outprop=False)
or outputs (outprop=True). Can be 'sum', 'mean', or 'median'. Defaults to
'mean'.
Returns:
pd.DataFrame:
The grouped DataFrame containing the path data, including the layer number,
pre-synaptic index, post-synaptic index, and weight.
"""
# if path is empty, return it directly
if paths is None or paths.shape[0] == 0:
return paths
assert combining_method in ["mean", "sum", "median"], (
"The combining_method should be either 'mean', 'sum' or 'median'. "
f"Currently it is {combining_method}."
)
# (new) auto-fill missing keys so every node has a group
all_nodes = set(paths["pre"]).union(set(paths["post"]))
if pre_group is None:
pre_group = {n: n for n in all_nodes}
else:
pre_group = {**{n: n for n in all_nodes}, **pre_group} # user's values win
if post_group is None:
post_group = {n: n for n in all_nodes}
else:
post_group = {**{n: n for n in all_nodes}, **post_group}
if intermediate_group is None:
intermediate_group = dict(pre_group)
else:
intermediate_group = {**{n: n for n in all_nodes}, **intermediate_group}
# make values in pre_group, post_group, and intermediate_group strings
pre_group = {k: str(v) for k, v in pre_group.items()}
post_group = {k: str(v) for k, v in post_group.items()}
intermediate_group = {k: str(v) for k, v in intermediate_group.items()}
# add cell type information
if "layer" in paths.columns:
# first use intermediate_group, then modify specifically for pre at the
# first layer, and post at the last layer
paths["pre_type"] = paths["pre"].map(intermediate_group).astype(str)
paths["post_type"] = paths["post"].map(intermediate_group).astype(str)
# group source by pre_group
paths.loc[paths["layer"] == paths["layer"].min(), "pre_type"] = (
paths.loc[paths["layer"] == paths["layer"].min(), "pre"]
.map(pre_group)
.astype(str)
)
# group target by post_group
paths.loc[paths["layer"] == paths["layer"].max(), "post_type"] = (
paths.loc[paths["layer"] == paths["layer"].max(), "post"]
.map(post_group)
.astype(str)
)
group_columns = ["layer", "pre_type", "post_type"]
else:
paths["pre_type"] = paths["pre"].map(pre_group).astype(str)
paths["post_type"] = paths["post"].map(post_group).astype(str)
group_columns = ["pre_type", "post_type"]
has_activation = "pre_activation" in paths.columns
if has_activation:
paths_act = paths.copy()
if not outprop:
# in this case, calculating the summed input proportion across all senders for
# each average recipient
# regardless of combining method, need to sum within pre_type anyway
paths = paths.groupby(group_columns + ["post"])["weight"].sum().reset_index()
if not avg_within_connected:
# sometimes only one neuron in a type is connected to another type, so
# only this connection is in paths
# but to calculate the average weight between two types, we should
# take into account all the neurons of the post-synaptic type
post_group_df = pd.DataFrame(
{
"post": list(post_group.keys()),
"post_type": list(post_group.values()),
}
)
if (intermediate_group != post_group) and ("layer" in paths.columns):
intermediate_group_df = pd.DataFrame(
{
"post": list(intermediate_group.keys()),
"post_type": list(intermediate_group.values()),
}
)
not_last = paths[paths["layer"] != paths["layer"].max()]
not_last_prepost = not_last[group_columns].drop_duplicates()
not_last_prepost = not_last_prepost.merge(
intermediate_group_df, on=["post_type"], how="left"
)
not_last = not_last_prepost.merge(
not_last, on=group_columns + ["post"], how="left"
).fillna(0)
last = paths[paths["layer"] == paths["layer"].max()]
last_prepost = last[group_columns].drop_duplicates()
last_prepost = last_prepost.merge(
post_group_df, on=["post_type"], how="left"
)
last = last_prepost.merge(
last, on=group_columns + ["post"], how="left"
).fillna(0)
paths = pd.concat([not_last, last], ignore_index=True)
else:
prepost = paths[group_columns].drop_duplicates()
prepost = prepost.merge(post_group_df, on=["post_type"], how="left")
paths = prepost.merge(
paths, on=group_columns + ["post"], how="left"
).fillna(0)
paths = (
paths.groupby(group_columns)["weight"].agg(combining_method).reset_index()
)
else: # outprop
# in this case, calculating the summed output proportion (across recipient
# single cells in the same cell type) for each average sender
# regardless of combining method, need to sum within post_type anyway
paths = paths.groupby(group_columns + ["pre"])["weight"].sum().reset_index()
if not avg_within_connected:
# sometimes only one neuron in a type is connected to another type, so
# only this connection is in paths
# but to calculate the average weight between two types, we should
# take into account all the neurons of the pre-synaptic type
pre_group_df = pd.DataFrame(
{
"pre": list(pre_group.keys()),
"pre_type": list(pre_group.values()),
}
)
if (intermediate_group != pre_group) and ("layer" in paths.columns):
intermediate_group_df = pd.DataFrame(
{
"pre": list(intermediate_group.keys()),
"pre_type": list(intermediate_group.values()),
}
)
not_first = paths[paths["layer"] != paths["layer"].min()]
not_first_prepost = not_first[group_columns].drop_duplicates()
not_first_prepost = not_first_prepost.merge(
intermediate_group_df, on=["pre_type"], how="left"
)
not_first = not_first_prepost.merge(
not_first, on=group_columns + ["pre"], how="left"
).fillna(0)
first = paths[paths["layer"] == paths["layer"].min()]
first_prepost = first[group_columns].drop_duplicates()
first_prepost = first_prepost.merge(
pre_group_df, on=["pre_type"], how="left"
)
first = first_prepost.merge(
first, on=group_columns + ["pre"], how="left"
).fillna(0)
paths = pd.concat([not_first, first], ignore_index=True)
else:
prepost = paths[group_columns].drop_duplicates()
prepost = prepost.merge(pre_group_df, on=["pre_type"], how="left")
paths = prepost.merge(
paths, on=group_columns + ["pre"], how="left"
).fillna(0)
paths = (
paths.groupby(group_columns)["weight"].agg(combining_method).reset_index()
)
if has_activation:
paths_act = (
paths_act.groupby(group_columns)
.agg(
pre_activation=("pre_activation", "mean"),
post_activation=("post_activation", "mean"),
)
.reset_index()
)
# then merge the weights
paths = pd.merge(paths_act, paths, on=group_columns)
paths.rename(columns={"pre_type": "pre", "post_type": "post"}, inplace=True)
return paths
[docs]
def compare_layered_paths(
paths: List[pd.DataFrame],
priority_indices=None,
neuron_to_sign: dict | None = None,
sign_color_map: dict = {1: "red", -1: "blue"},
el_colours: List[str] = ["rosybrown", "burlywood"],
legend_labels: List[str] = ["Path 1", "Path 2"],
weight_decimals: int = 2,
figsize: tuple = (10, 8),
label_pos: List[float] = [0.7, 0.7],
):
"""
Compare two layered paths by overlaying them and annotating the weights. The paths
should be in the format of the output from `find_path_iteratively()`. The width of
the edges is based on the weight in the first path, when the connection is present
in both paths.
Args:
paths (List[pd.DataFrame]): A list of two DataFrames containing the
path data, including columns 'layer', 'pre', 'post', and 'weight'.
priority_indices (list, set, pd.Series, numpy.ndarray, optional): A
list of neuron indices that should be plotted on top of each layer.
Defaults to None.
neuron_to_sign (dict, optional): A dictionary that maps neuron indices
to their signs. Defaults to None.
sign_color_map (dict, optional): A dictionary that maps neuron signs
to their respective colors. Defaults to {1: 'red', -1: 'blue'}.
el_colours (List[str], optional): A list of two colors for the edge
labels of the two paths. Defaults to ['rosybrown', 'burlywood'].
legend_labels (List[str], optional): A list of two labels for the
legend. Defaults to ['Path 1', 'Path 2'].
weight_decimals (int, optional): The number of decimal places to round
the edge weights to. Defaults to 2.
figsize (tuple, optional): The size of the figure. Defaults to (10, 8).
Returns:
None: The function plots the graph.
"""
# add layer columns if necessary
new_paths = []
for path in paths:
# Create a 'post_layer' column to use as unique identifiers
if "post_layer" not in path.columns:
path["post_layer"] = (
path["post"].astype(str) + "_" + path["layer"].astype(str)
)
if "pre_layer" not in path.columns:
path["pre_layer"] = (
path["pre"].astype(str) + "_" + (path.layer - 1).astype(str)
)
new_paths.append(path)
composite_paths = pd.concat(new_paths)
# get unique edges
composite_paths.drop_duplicates(["pre_layer", "post_layer"], inplace=True)
composite_G = nx.from_pandas_edgelist(
composite_paths,
"pre_layer",
"post_layer",
["weight"],
create_using=nx.DiGraph(),
)
# Labels for nodes
labels = dict(zip(composite_paths.post_layer, composite_paths.post))
labels.update(dict(zip(composite_paths.pre_layer, composite_paths.pre)))
# Determine the width of the edges
weights = [composite_G[u][v]["weight"] for u, v in composite_G.edges()]
weight_min = min(weights)
weight_max = max(weights)
if weight_min == weight_max:
widths = [1.0] * len(weights)
else:
widths = [1 + 9 * (w - weight_min) / (weight_max - weight_min) for w in weights]
# Generate positions
positions = create_layered_positions(composite_paths, priority_indices)
# Edge colors based on neuron signs
default_color = "lightgrey" # Default edge color
# specify edge colour based on pre-neuron sign, if available
edge_colors = []
for u, _ in composite_G.edges():
pre_neuron = labels[u]
if neuron_to_sign and pre_neuron in neuron_to_sign:
edge_colors.append(
sign_color_map.get(neuron_to_sign[pre_neuron], default_color)
)
else:
edge_colors.append(default_color)
# Plot the graph
_, ax = plt.subplots(figsize=figsize)
nx.draw(
composite_G,
pos=positions,
labels=labels,
with_labels=True,
node_size=100,
node_color="lightblue",
arrows=True,
arrowstyle="-|>",
arrowsize=10,
font_size=8,
width=widths,
ax=ax,
edge_color=edge_colors,
)
G1 = nx.from_pandas_edgelist(
paths[0],
"pre_layer",
"post_layer",
["weight"],
create_using=nx.DiGraph(),
)
G2 = nx.from_pandas_edgelist(
paths[1],
"pre_layer",
"post_layer",
["weight"],
create_using=nx.DiGraph(),
)
el1 = {
(u, v): f'{data["weight"]:.{weight_decimals}f}'
for u, v, data in G1.edges(data=True)
}
el2 = {
(u, v): f'{data["weight"]:.{weight_decimals}f}'
for u, v, data in G2.edges(data=True)
}
nx.draw_networkx_edge_labels(
G1,
pos=positions,
edge_labels=el1,
horizontalalignment="left",
verticalalignment="top",
font_color=el_colours[0],
ax=ax,
label_pos=label_pos[0],
)
nx.draw_networkx_edge_labels(
G2,
pos=positions,
edge_labels=el2,
horizontalalignment="right",
verticalalignment="bottom",
font_color=el_colours[1],
ax=ax,
label_pos=label_pos[1],
)
ax.set_ylim(0, 1)
# add lengend using the el_colours
ax.legend(
handles=[
mpatches.Patch(color=el_colours[0], label=legend_labels[0]),
mpatches.Patch(color=el_colours[1], label=legend_labels[1]),
],
loc="upper right",
)
plt.show()
[docs]
@dataclass
class XORCircuit:
# Input nodes
input1: list
input2: list
# Middle layer nodes
exciter1: str # Takes input from input1 only
exciter2: str # Takes input from input2 only
inhibitor: str # Takes input from both inputs
# Output node
output: list
[docs]
def find_xor(paths: pd.DataFrame) -> List[XORCircuit]:
"""
Find XOR-like circuits in a 3-layer network, based on [Wang et al. 2024]
(https://www.biorxiv.org/content/10.1101/2024.09.24.614724v2). Note: this
function currently ignores middle excitatory neruons that receive both
inputs.
Args:
paths: DataFrame with columns ['pre', 'post', 'sign', 'layer']
pre: source node
post: target node
sign: 1 (excitatory) or -1 (inhibitory)
layer: 1 (input->middle) or 2 (middle->output)
Returns:
List of XORCircuit objects, each representing a found XOR motif
"""
# checking input ----
if set(paths["layer"]) != {1, 2}:
raise ValueError(
"The input DataFrame should have exactly 2 unique layers, 1 and 2"
)
# check column names of paths
if not {"pre", "post", "sign", "layer"}.issubset(paths.columns):
raise ValueError(
"The input DataFrame should have columns ['pre', 'post', 'sign', "
"'layer']"
)
# check values of signs: if it's a subset of {1, -1}
if not set(paths["sign"]) <= {1, -1}:
raise ValueError(
f"The input DataFrame should have values 1 (excitatory) or -1 (inhibitory) in the 'sign' column, but got {set(paths['sign'])}"
)
# define variables ----
circuits: list[XORCircuit] = []
exciters = paths["pre"][(paths["layer"] == 2) & (paths["sign"] == 1)].unique()
inhibitors = paths["pre"][(paths["layer"] == 2) & (paths["sign"] == -1)].unique()
l1 = paths[paths["layer"] == 1]
l2 = paths[paths["layer"] == 2]
exciter_us_dict: dict[int | str, set[int | str]] = {
exc: set(l1["pre"][l1["post"] == exc]) for exc in exciters
}
inhibitor_us_dict = {inh: set(l1["pre"][l1["post"] == inh]) for inh in inhibitors}
exciter_ds_dict = {exc: set(l2["post"][l2["pre"] == exc]) for exc in exciters}
inhibitor_ds_dict = {inh: set(l2["post"][l2["pre"] == inh]) for inh in inhibitors}
# main algorithm ----
for e1, e2 in itertools.combinations(exciters, 2):
common = exciter_us_dict[e1] & exciter_us_dict[e2]
onlye1 = exciter_us_dict[e1] - common
onlye2 = exciter_us_dict[e2] - common
for i in inhibitors:
e1_i_intersect = onlye1 & inhibitor_us_dict[i]
e2_i_intersect = onlye2 & inhibitor_us_dict[i]
if (len(e1_i_intersect) == 0) or (len(e2_i_intersect) == 0):
continue
targets = exciter_ds_dict[e1] & exciter_ds_dict[e2] & inhibitor_ds_dict[i]
if not targets:
continue
circuits.append(
XORCircuit(
input1=list(e1_i_intersect),
input2=list(e2_i_intersect),
exciter1=e1,
exciter2=e2,
inhibitor=i,
output=list(targets),
)
)
return circuits
[docs]
def path_for_ngl(path):
"""
Convert a path DataFrame to one that can be used to visualize the path in
neuroglancer with `get_ngl_link(df_format = 'long')`. Neurons are coloured
by their (indirect) connectivity (calculated using
`effective_conn_from_paths()`) to an average neuron in the last layer. `pre`
and `post` columns must contain neuron ids. This function can be used for
visualizing signal propagation in a pathway.
Args:
path (pd.DataFrame): The DataFrame containing the path data, including
the layer number, pre-synaptic index, post-synaptic index, and
weight.
Returns:
pd.DataFrame:
A DataFrame with columns 'neuron_id', 'layer', and 'activation' (which is
(indirect) connectivity in this case), suitable for Neuroglancer visualization.
"""
out = []
from .compress_paths import effective_conn_from_paths
for alayer in path.layer.unique():
path_sub = path[path.layer >= alayer]
effconn = effective_conn_from_paths(path_sub)
# get mean across columns, for each row
effconn = effconn.mean(axis=1).to_frame()
effconn.loc[:, ["layer"]] = alayer
out.append(effconn)
out = pd.concat(out, axis=0).reset_index()
out.columns = ["neuron_id", "activation", "layer"]
out = pd.concat(
[
out,
(
pd.DataFrame(
{
"neuron_id": path_sub.post,
"activation": 1,
"layer": alayer + 1,
}
)
),
]
)
return out
[docs]
def connected_components(
paths: pd.DataFrame,
threshold: float = 0,
quiet: bool = False,
) -> list:
"""
Find connected components in a directed graph represented by a DataFrame of paths.
The DataFrame should contain columns 'pre', 'post', 'layer', and 'weight'. The
function filters the paths based on a weight threshold and then constructs a
directed graph using NetworkX. It identifies weakly connected components in the
graph and returns a list of DataFrames of paths, each representing a connected
component.
Args:
paths (pd.DataFrame): The DataFrame containing the path data, including
the layer number, pre-synaptic index, post-synaptic index, and
weight.
threshold (float, optional): The threshold for the weight of the
direct connection between pre and post. Defaults to 0.
Returns:
list:
A list of DataFrames, each representing a connected component
in the directed graph.
"""
paths = filter_paths(paths, threshold)
paths_unique_edges = paths.drop_duplicates(subset=["pre", "post"])
# Create a graph from the DataFrame
G = nx.from_pandas_edgelist(
paths_unique_edges,
source="pre",
target="post",
edge_attr="weight",
create_using=nx.DiGraph(),
)
# Find connected components
weak_components = list(nx.weakly_connected_components(G))
components = []
for i, component in enumerate(weak_components):
path = paths[paths.pre.isin(component) & paths.post.isin(component)]
path.loc[:, ["component_idx"]] = i
path = remove_excess_neurons(path, quiet=quiet)
components.append(path)
return components
[docs]
def el_within_n_steps(
inprop: spmatrix,
inidx: arrayable,
outidx: arrayable,
n: int,
threshold: float = 0,
pre_group: dict = None,
post_group: dict = None,
return_raw_el: bool = False,
combining_method: str = "mean",
avg_within_connected: bool = False,
all_connections_between_groups: bool = False,
quiet: bool = False,
):
"""
Find paths within a specified number of steps in a directed graph, starting from
input indices and ending at output indices. The unique edges are returned. Filtering
by `threshold` happens after grouping if `pre_group` and `post_group` are provided.
`avg_within_connected` calculates the weight based on the connected neurons, instead
of all neurons in any group involved. This might be useful when obtaining paths from
one single neuron in a cell type, while there are many other neurons in the same
type. This might be especially useful for optic lobe connectivity analysis that's
spatially local.
Two neuron groups might be connected to different extents in different layers (when
a pair of cell types are connected with different individual neurons in different
layers.). In that case the highest weight is returned by default, but see
`all_connections_between_groups` argument.
Args:
inprop (spmatrix): The connectivity matrix, with presynaptic in the rows.
inidx (arrayable): The input neuron indices to start the paths from.
outidx (arrayable): The output neuron indices to end the paths at.
n (int): The maximum number of hops. n=1 for direct connections.
threshold (float, optional): The threshold for the weight of the direct
connection between pre and post. If `pre_group` or `post_group` are
provided, filtering happens after grouping. Defaults to 0.
pre_group (dict, optional): A dictionary mapping pre neuron indices to
their respective groups. Defaults to None.
post_group (dict, optional): A dictionary mapping post neuron indices to
their respective groups. Defaults to None.
return_raw_el (bool, optional): If True, returns the raw edges before
grouping. Defaults to False.
combining_method (str, optional): Method to combine inputs (outprop=False)
or outputs (outprop=True). Can be 'sum', 'mean', or 'median'. Defaults to
'mean'.
avg_within_connected (bool, optional): If True, the weight is calculated within
the *connected* neurons of the same group. If False, the weight is
calculated across *all* neurons of the same group. Defaults to False.
all_connections_between_groups (bool, optional): If True, use all connections
between groups inidx and outidx are in, even if inidx doesn't cover all
neurons in that group. For example, if inidx is *one* L1 neuron, and
pre_group maps indices to cell type, outidx is *one* Tm3 neuron, then the
function will return L1->Tm3 connections for *all* L1 and Tm3 neurons.
Defaults to False.
quiet (bool, optional): If True, suppresses output messages. Defaults to False.
Returns:
pd.DataFrame or tuple:
A DataFrame containing the edges of the paths found, including columns 'pre',
'post', and 'weight'. If `return_raw_el` is True, returns a tuple of two
DataFrames: the first is the grouped edges, and the second is the raw edges
before grouping. If no paths are found, returns None.
"""
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
all_paths = []
raw_el = []
for i in tqdm(range(n), disable=quiet):
paths = find_paths_of_length(inprop, inidx, outidx, i + 1)
if paths is None or paths.shape[0] == 0:
continue
if return_raw_el:
raw_el.append(paths)
if pre_group is not None and post_group is not None:
paths = group_paths(
paths,
pre_group,
post_group,
avg_within_connected=avg_within_connected,
combining_method=combining_method,
)
paths = filter_paths(paths, threshold, quiet=quiet)
if paths is not None:
all_paths.append(paths)
if len(all_paths) == 0:
return None
all_paths = pd.concat(all_paths, axis=0)
el = all_paths.groupby(["pre", "post"])["weight"].max().reset_index()
if all_connections_between_groups:
from .compress_paths import result_summary
new_inidx = [idx for idx, grp in pre_group.items() if grp in el.pre]
new_outidx = [idx for idx, grp in post_group.items() if grp in el.post]
mat = result_summary(
inprop,
new_inidx,
new_outidx,
pre_group,
post_group,
display_threshold=0,
display_output=False,
)
# make longer
mat_long = mat.melt(ignore_index=False).reset_index()
mat_long.columns = ["pre", "post", "weight"]
el = el[["pre", "post"]].merge(mat_long, on=["pre", "post"], how="left")
if return_raw_el:
raw_el = pd.concat(raw_el, axis=0)
# pre, post are single neurons, so the rows should be real duplicates
raw_el = raw_el.drop_duplicates(subset=["pre", "post", "weight"])
return el, raw_el
else:
return el
[docs]
def find_shortest_paths(
paths: pd.DataFrame, start_nodes: list[str], end_nodes: list[str]
) -> list[list[str]]:
"""
Find the shortest paths between groups in start_nodes and end_nodes
in a paths dataframe (paths is the output of find_path_iteratively).
Args:
paths (pd.DataFrame): DataFrame containing the path data, including
columns 'weight', 'pre', and 'post'.
start_nodes (list): List of 'pre' groups.
end_nodes (list): List of 'post' groups.
Returns:
list:
A list of shortest paths, where each path is a list of groups that connect
the start and end nodes (ordered from start to end).
"""
if paths.shape[0] == 0:
return []
paths_unique = paths[["weight", "pre", "post"]].drop_duplicates()
nodes_unique = np.unique(
np.concatenate([paths_unique.pre.unique(), paths_unique.post.unique()])
)
pre = np.searchsorted(nodes_unique, paths_unique.pre.values)
post = np.searchsorted(nodes_unique, paths_unique.post.values)
graph_matrix = csr_matrix(
(1 / paths_unique["weight"].values, (pre, post)),
shape=(len(nodes_unique), len(nodes_unique)),
)
def reconstruct_single_path(start_idx, end_idx):
"""Compute shortest path for a single start-end pair."""
# Compute shortest path only from this start node
_, pred = shortest_path(
csgraph=graph_matrix,
directed=True,
indices=start_idx,
return_predecessors=True,
)
if pred[end_idx] == -9999: # not reached
return None
path = [nodes_unique[end_idx]]
i = end_idx
while i != start_idx:
i = pred[i]
if i == -9999:
return None
path.append(nodes_unique[i])
return path[::-1]
shortest_paths = []
for start_node in start_nodes:
idx_start = np.where(nodes_unique == start_node)[0]
if len(idx_start) == 0:
continue
start_idx = idx_start[0]
for end_node in end_nodes:
if start_node != end_node:
idx_end = np.where(nodes_unique == end_node)[0]
if len(idx_end) == 0:
continue
path = reconstruct_single_path(start_idx, idx_end[0])
if path is not None:
shortest_paths.append(path)
return shortest_paths
[docs]
def count_paths(
edgelist: pd.DataFrame,
start_layer: int = 1,
end_layer: Optional[int] = None,
loop_mode: str = "allow",
) -> Union[int, Tuple[int, int]]:
"""
Counts all paths without materializing them in memory. Uses memoization to avoid
redundant computation. This function was written mostly by Claude Sonnet 4.5 under
instructions.
Args:
edgelist (pd.DataFrame): Must contain "layer", "pre", "post" columns.
start_layer (int): Starting layer.
end_layer (int): Ending layer. If None, uses max layer.
loop_mode (str, optional): How to handle loops in paths. Options:
- "allow" (default): Count all paths including those with loops.
- "exclude": Count only loop-free paths.
- "both": Return tuple (count_with_loops, count_without_loops) computed
efficiently in a single DFS pass.
Note: a node appearing at both the start and end of a path is allowed
(e.g., A-B-C-D-A is not considered a loop), but a node appearing again
in the middle is a loop (e.g., A-B-C-A-A has a loop).
Returns:
int or tuple:
If loop_mode is "allow" or "exclude", returns an int with the total number of
valid paths. If loop_mode is "both", returns a tuple (count_with_loops,
count_without_loops).
"""
if end_layer is None:
end_layer = edgelist["layer"].max()
if start_layer > end_layer:
raise ValueError("start_layer must be less than or equal to end_layer.")
if loop_mode not in ["allow", "exclude", "both"]:
raise ValueError(
f"loop_mode must be 'allow', 'exclude', or 'both', got '{loop_mode}'"
)
# Build adjacency
adj: Dict[int, Dict[Union[str, int], List]] = defaultdict(lambda: defaultdict(list))
for _, row in edgelist.iterrows():
adj[row["layer"]][row["pre"]].append(row["post"])
if loop_mode == "both":
# Compute both counts in a single DFS pass
# Use memoization for the with-loops count
memo_with_loops = {}
def dfs_both(
node: Union[str, int], layer: int, visited: set, start_node: Union[str, int]
) -> Tuple[int, int]:
"""
Returns (count_with_loops, count_without_loops) for paths starting from node.
"""
# Check if this is a loop for the no-loops count
is_loop = node in visited
# For with-loops count, use memoization
if (node, layer) in memo_with_loops:
count_with = memo_with_loops[(node, layer)]
else:
# add 1 when reaches the end
if layer == end_layer:
count_with = 1
else:
count_with = 0
# iterate over all postsynaptic nodes
for post in adj.get(layer + 1, {}).get(node, []):
# For with-loops, we don't care about visited history
# So we can recurse with empty visited set for memoization
count_with += dfs_both(post, layer + 1, set(), start_node)[0]
memo_with_loops[(node, layer)] = count_with
# For without-loops count, cannot use memoization
if is_loop:
# If we're at the end layer and this node is the start node, that's OK
if layer == end_layer and node == start_node:
count_without = 1
else:
# Otherwise, this is a loop
count_without = 0
elif layer == end_layer:
count_without = 1
else:
# Add current node to visited set for this path
visited.add(node)
count_without = 0
# iterate over all postsynaptic nodes
for post in adj.get(layer + 1, {}).get(node, []):
# recursively count paths from post
count_without += dfs_both(post, layer + 1, visited, start_node)[1]
# Remove node from visited when backtracking
visited.remove(node)
return (count_with, count_without)
path_count_with = 0
path_count_without = 0
for pre in adj.get(start_layer, {}):
for post in adj.get(start_layer, {}).get(pre, []):
with_loops, without_loops = dfs_both(post, start_layer, {pre}, pre)
path_count_with += with_loops
path_count_without += without_loops
return (path_count_with, path_count_without)
elif loop_mode == "allow":
# Original memoized approach when loops are allowed
memo = {}
def dfs(node: Union[str, int], layer: int) -> int:
# Memoization - crucial for graphs with convergent paths
if (node, layer) in memo:
return memo[(node, layer)]
# add 1 when reaches the end
if layer == end_layer:
return 1
count = 0
# iterate over all postsynaptic nodes
for post in adj.get(layer + 1, {}).get(node, []):
# iterate over their postsynaptic nodes
count += dfs(post, layer + 1)
memo[(node, layer)] = count
return count
path_count = 0
for pre in adj.get(start_layer, {}):
for post in adj.get(start_layer, {}).get(pre, []):
path_count += dfs(post, start_layer)
return path_count
else: # loop_mode == "exclude"
# When loops are not allowed, track visited nodes per path
# Cannot use memoization because path validity depends on history
def dfs_no_loops(
node: Union[str, int], layer: int, visited: set, start_node: Union[str, int]
) -> int:
# Check if we've seen this node before in the current path
# Allow the node to appear at the end even if it was the start node
if node in visited:
# If we're at the end layer and this node is the start node, that's OK
if layer == end_layer and node == start_node:
return 1
# Otherwise, this is a loop
return 0
# add 1 when reaches the end
if layer == end_layer:
return 1
# Add current node to visited set for this path
visited.add(node)
count = 0
# iterate over all postsynaptic nodes
for post in adj.get(layer + 1, {}).get(node, []):
# recursively count paths from post
count += dfs_no_loops(post, layer + 1, visited, start_node)
# Remove node from visited when backtracking
visited.remove(node)
return count
path_count = 0
for pre in adj.get(start_layer, {}):
for post in adj.get(start_layer, {}).get(pre, []):
# Start with pre node in visited set, and track which node started the path
path_count += dfs_no_loops(post, start_layer, {pre}, pre)
return path_count
[docs]
def filter_all_paths_to_cumsum(
all_paths: pd.DataFrame | list[pd.DataFrame],
thre_cumsum: float = 0.5,
thre_step_min: float = 0.0,
necessary_intermediate: Dict[int, arrayable] | None = None,
):
"""
Filters paths such that intermediate neurons are specified in necessary_intermediate and the
filtered cumulative effective weight > thre_cumsum of the total effective weight;
the minimum edge weight across the selected paths is either the minimum along those filtered
paths or the minimum threshold thre_step_min, whichever is larger.
As a rough guide on how to set thre_cumsum, it can be set larger for fewer paths in all_paths,
and smaller for more paths.
Args:
all_paths (pd.DataFrame | list[pd.DataFrame]): The DataFrame or list of DataFrames containing the path data,
where each DataFrame is like the output from `find_paths_of_length()`, i.e., contains paths of a
specific length.
thre_cumsum (float): The cumulative effective weight threshold to reach for filtered paths.
Should be a number between 0 and 1. Defaults to 0.5.
thre_step_min (float, optional): The minimum threshold for the weight of the
direct connection between pre and post. Defaults to 0.0.
necessary_intermediate (dict, optional): A dictionary of necessary
intermediate neurons, where the keys are the layer numbers
(starting neurons: 1; directly downstream: 2) and the values are
the neuron indices (can be int, float, list, set, numpy.ndarray,
or pandas.Series). Defaults to None.
Returns:
tuple:
paths: list of pd.DataFrame
Filtered paths.
w_filter: float
Total effective weight of the filtered paths.
w_all: float
Total effective weight of all paths before filtering.
thre_step : float
Minimum edge weight threshold used to filter paths, bounded below by thre_step_min.
"""
from .external_paths import effective_conn_per_path_from_paths
df_bool = False
if isinstance(all_paths, pd.DataFrame):
df_bool = True
all_paths = [all_paths]
# compute total effective weight across all paths
w_all = 0.0
w_prod = []
w_min = []
for i in range(len(all_paths)):
w_all_i, w_prod_i, w_min_i = effective_conn_per_path_from_paths(all_paths[i])
w_all += w_all_i
w_prod.append(w_prod_i)
w_min.append(w_min_i)
w_prod = np.concatenate(w_prod, axis=0)
w_min = np.concatenate(w_min, axis=0)
# find minimum edge weight across strongest paths that make up thre_cumsum of total weight
idx_sort = np.argsort(-w_prod)
idx_thre = np.where(np.cumsum(w_prod[idx_sort] / w_all) > thre_cumsum)[0][0]
# since in filter_paths, threshold > thre_step, take 99.9%, with a minimum of thre_step_min
thre_step = max(0.999 * np.min(w_min[idx_sort[: idx_thre + 1]]), thre_step_min)
# filter paths and compute total effective weight across those paths
w_filter = 0.0
for i in range(len(all_paths)):
all_paths[i] = filter_paths(all_paths[i], thre_step, necessary_intermediate)
w_filter_i, _, _ = effective_conn_per_path_from_paths(all_paths[i])
w_filter += w_filter_i
if df_bool:
all_paths = all_paths[0]
return all_paths, w_filter, w_all, thre_step