# Standard library imports
import itertools
from dataclasses import dataclass
from typing import Dict, List
# Third-party package imports
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
from scipy.sparse import issparse, spmatrix
from tqdm import tqdm
from .utils import (
arrayable,
check_consecutive_layers,
count_keys_per_value,
to_nparray,
)
from .compress_paths import effective_conn_from_paths
[docs]
def find_path_once(
inprop,
steps_cpu,
inidx: arrayable,
outidx: arrayable,
target_layer_number,
top_n=-1,
threshold=0,
):
"""
Finds the path once between input and output, of distance
target_layer_number, returning indices of neurons in the previous layer
that connect the input with the output. This works by taking the top_n
direct upstream partners of the outidx neurons, and intersect those with
neurons 'effectively' connected (through steps_cpu) to the inidx neurons.
Args:
inprop (scipy.sparse.csc_matrix): The connectivity matrix in
Compressed Sparse Column format.
steps_cpu (list): A list of compressed connectivity matrices: one matrix
for each compressed path length.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): The
input neuron index/indices.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): The
output neuron index/indices.
target_layer_number (int): The target layer number to examine. Must be
>= 1. When target_layer_number = 1, we are looking at the direct
synaptic connectivity.
top_n (int, optional): The number of top connections to consider based
on direct connectivity from inprop_csc. If top_n = -1, all connections
are considered.
threshold (float, optional): The threshold of the direct connectivity
from inidx to an average outidx.
Returns:
np.ndarray: An array of neuron indices in the previous layer that have
significant connectivity, connecting between the `inidx` and `outidx`.
"""
inprop_csc = inprop.copy()
inprop_csc.data = np.abs(inprop.data)
# make sure they are integers
inidx = [int(i) for i in inidx]
outidx = [int(i) for i in outidx]
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
if target_layer_number == 1:
# if the target layer is 1, we are looking at the direct synaptic
# connectivity
# so we just need to find the indices of the non-zero values in the
# inprop_csc matrix that correspond to the outidx, and intersect
# those with the inidx we are interested in.
colidx = inidx
else:
# first get the neurons that effectively connect to inidx, at layer
# number target_layer_number - 1.
# for example, in the ORN->PN->KC case, target_layer_number = 2 for KCs.
# top_n_row_indices are indices of the PNs that connect to the KCs. So
# in this case we should use direct connectivity from PNs to get the
# ORNs.
# so when target_layer_number == 2, we should use steps_cpu[0] (direct
# connectivity), that is, target_layer_number-2:
# subtract 1 for getting top_n_row_indices below which is going one
# step upstream; and another for 0-based indexing in steps_cpu.
# the next line gets the targets that receive non-zero compressed
# input from inidx
# when target_layer_number >= 2.
# .nonzero() returns row, column of the nonzero values.
colidx = steps_cpu[target_layer_number - 2][inidx, :].nonzero()[1]
# then go back one step from the outidx, based on the direct connectivity
# matrix (inprop)
us = inprop_csc[:, outidx].nonzero()
# intersect the non-zero upstream neurons with those effectively connected
# across layers (colidx)
# these are still row indices of inprop
intersect = np.intersect1d(us[0], colidx)
# direct connectivity between target_layer_number-1 and target_layer_number
submatrix = inprop_csc[intersect, :][:, outidx]
# in case outidx is more than one index, calculate the average of each row
# in the submatrix
# same length as len(intersect)
row_averages = np.array(submatrix.mean(axis=1)).flatten()
# thresholding
# these are indices of row_averages, which can also be used to index
# row_averages
thresholded_indices = np.where(row_averages >= threshold)[0]
thresholded_row_averages = row_averages[thresholded_indices]
# thresholded_intersect are a subset of of intersect, which are indices of
# inprop_csc
# intersect has the same length as row_averages
thresholded_intersect = intersect[thresholded_indices]
# Find the indices (of thresholded_row_averages) of the top n averages
if top_n == -1:
top_n_indices = np.argsort(thresholded_row_averages)
else:
top_n_indices = np.argsort(thresholded_row_averages)[-top_n:]
# Get the original row indices corresponding to these top n averages
top_n_row_indices = thresholded_intersect[top_n_indices]
# make the edgelist
submatrix = inprop_csc[top_n_row_indices, :][:, outidx]
# Convert submatrix to COO format to efficiently find non-zero elements
coo_submatrix = submatrix.tocoo()
# Create DataFrame directly from COO format data
df = pd.DataFrame(
{
# Map back to original indices
"pre": top_n_row_indices[coo_submatrix.row],
# Map back to original output indices
"post": outidx[coo_submatrix.col],
"weight": coo_submatrix.data,
}
)
return df
[docs]
def find_path_iteratively(
inprop_csc: spmatrix,
steps_cpu: list,
inidx: arrayable,
outidx: arrayable,
target_layer_number: int,
top_n: int = -1,
threshold: float = 0,
):
"""
Iteratively finds the path from the specified output (outidx) back to the
input (inidx) across multiple layers, using the `find_path_once` function
to traverse each layer.
Args:
inprop_csc (scipy.sparse matrix): The direct connectivity matrix in
Compressed Sparse Column format.
steps_cpu (list): A list of compressed connectivity matrices: one
matrix for each compressed path length.
inidx (int, float, list, set, numpy.ndarray, or pandas.Series): The
input neuron indices.
outidx (int, float, list, set, numpy.ndarray, or pandas.Series): The
output neuron indices to start the reverse path finding.
target_layer_number (int): The number of layers to traverse backwards
from the outidx. If target_layer_number = 1, we are looking at the
direct synaptic connectivity.
top_n (int, optional): The number of top connections to consider at each
layer based on direct connectivity from inprop_csc. If top_n = -1, all
connections are considered.
threshold (float, optional): The threshold for the average of the direct
connectivity from inidx to outidx.
Returns:
pd.DataFrame: A DataFrame containing the path data, including the
pre-synaptic and post-synaptic neuron indices, the layer (direct
connections from inidx: layer = 1), and the weight (input
proportion of the postsynaptic neuron) of the direct connection
between pre and post.
"""
inprop_csc.data = np.abs(inprop_csc.data)
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
if len(inidx) == 0 or len(outidx) == 0:
# raise error
raise ValueError("The input or output indices are empty!")
if issparse(inprop_csc):
# if stepsn is coo, turn into csc
if inprop_csc.format == "coo":
inprop_csc = inprop_csc.tocsc()
# path_indices = [] # This will store the path data as a list of arrays
dfs = [] # This will store the path data as a list of DataFrames
current_outidx = outidx
# from target_layer_number to 1, go back one step at a time
for layer in range(target_layer_number, 0, -1):
# Find the indices in the current layer that connect to the next layer
df = find_path_once(
inprop_csc,
steps_cpu,
inidx,
current_outidx,
layer,
top_n,
threshold,
)
# If no indices are found, break the loop as no path can be formed
if len(df) == 0:
print(
"Cannot trace back to the input :(. Try providing a bigger "
"top_n value, or a lower threshold?"
)
return
df["layer"] = layer
dfs.append(df)
# Update the outidx for the next iteration to move backwards through
# layers
current_outidx = set(df["pre"])
return pd.concat(dfs)
[docs]
def create_layered_positions(
df: pd.DataFrame, priority_indices=None, sort_dict: dict | None = None
) -> dict:
"""
Creates a dictionary of positions for each neuron in the paths, so that
the paths can be visualized in a layered manner. It assumes that `df`
contains the columns 'layer', 'pre_layer', 'post_layer' (or 'layer', 'pre',
'post'). If a neuron exists in multiple layers, it is plotted multiple
times.
Args:
df (pd.DataFrame): The DataFrame containing the path data, including the layer
number, pre-synaptic index, and post-synaptic index.
priority_indices (list, set, pd.Series, numpy.ndarray optional): A list of
neuron indices that should be plotted on top of each layer. Defaults to None.
sort_dict (dict, optional): A dictionary of neuron indices as keys and their
sorting order as values (bigger value is higher in the plot). Defaults to
None.
Returns:
dict: A dictionary of positions for each neuron in the paths, with the
keys as the neuron indices and the values as the (x, y) coordinates.
"""
# check if layer numbers in 'layer' column are consecutive. If anyone is
# absent, raise an error
if not check_consecutive_layers(df):
raise ValueError("The layer numbers in 'layer' column are not consecutive. ")
# if post_layer and pre_layer are not present, create them
if "post_layer" not in df.columns:
df["post_layer"] = df["post"].astype(str) + "_" + df["layer"].astype(str)
if "pre_layer" not in df.columns:
df["pre_layer"] = df["pre"].astype(str) + "_" + (df["layer"] - 1).astype(str)
if priority_indices is not None:
priority_indices = set(priority_indices)
number_of_layers = df["layer"].nunique()
layer_width = 1.0 / (number_of_layers + 1)
positions = {}
name_to_idx = dict(zip(df.pre_layer, df["pre"]))
name_to_idx.update(dict(zip(df.post_layer, df["post"])))
global_to_local_layer_number = {
l: i for i, l in enumerate(sorted(df["layer"].unique()))
}
df.loc[:, ["local_layer"]] = df["layer"].map(global_to_local_layer_number)
for layer in range(0, number_of_layers + 1):
if layer != number_of_layers:
layer_name = list(
set(df.pre_layer[df.local_layer == layer]).union(
set(df.post_layer[df.local_layer == (layer - 1)])
)
)
else:
layer_name = list(set(df.post_layer[df.local_layer == layer - 1]))
if sort_dict is not None:
layer_name = sorted(
layer_name, key=lambda x: sort_dict.get(x, float("-inf"))
)
if priority_indices is not None:
layer_name_priority = [
item for item in layer_name if name_to_idx[item] in priority_indices
]
layer_name_not = [
item for item in layer_name if name_to_idx[item] not in priority_indices
]
layer_name = layer_name_not + layer_name_priority
for index, neuron in enumerate(layer_name, start=1):
positions[neuron] = (
layer * layer_width,
# the later in the list, the higher
index * 1.0 / (len(layer_name) + 1),
)
return positions
[docs]
def remove_excess_neurons(
df: pd.DataFrame,
keep=None,
target_indices=None,
keep_targets_in_middle: bool = False,
) -> pd.DataFrame:
"""After filtering, some neurons are no longer on the paths between the
input and output neurons. This function removes those neurons from the
paths.
Args:
df (pd.Dataframe): a filtered dataframe with similar structure as the
dataframe returned by `find_path_iteratively()`.
keep (list, set, pd.Series, numpy.ndarray, str, optional): A list of
neuron indices that should be kept in the paths, even if they don't
connect between input and target in the last layer. Defaults to
None.
target_indices (list, set, pd.Series, numpy.ndarray, str, optional): A
list of target neuron indices that should be kept in the last
layer. Defaults to None, in which case all neurons in the last
layer in `df` would be kept.
keep_targets_in_middle (bool, optional): If True, the target_indices
are kept in the middle layers as well, even if they don't connect
between input and target in the last layer. Defaults to False.
Returns:
pd.Dataframe: a dataframe with similar structure as the result of
`find_path_iteratively()`, with the excess neurons removed.
"""
if df.shape[0] == 0:
# raise error
raise ValueError(
"There are no connections (`df` is empty)! Threshold too high?"
)
max_layer_num = df["layer"].max()
if max_layer_num == 1:
return df
# if target_indices are provided, first use this to filter the last layer ----
if target_indices is not None:
# if it's a string or int, convert to list
if isinstance(target_indices, str) or (type(target_indices) == int):
target_indices = [target_indices]
target_indices = set(target_indices)
# check if datatype is the same, between target_indices and post
if not all([type(i) == type(df["post"].iloc[0]) for i in target_indices]):
raise ValueError(
f"The datatype in `target_indices` should be the same as the elements in `post` column of the DataFrame. Elements in `post` is {type(df.post.iloc[0])}."
)
if not target_indices.issubset(df[df["layer"] == df["layer"].max()]["post"]):
raise ValueError(
"The target indices are not in the post-synaptic neurons of the last layer. Here are the indices of the last layer: ",
", ".join(df[df["layer"] == df["layer"].max()]["post"].unique()),
". Your target_indices should be a subset.",
)
df = df[(df["layer"] != df["layer"].max()) | df["post"].isin(target_indices)]
# check if all layer numbers are consecutive ----
if not check_consecutive_layers(df):
print(
"Warning: The layer numbers are not consecutive. Will only use the"
" consecutive layers from the last one."
)
selected = []
# get the consecutive layers from the last one
for l in range(df["layer"].max(), 0, -1):
if l in df["layer"].unique():
selected.append(l)
else:
break
df = df[df["layer"].isin(selected)]
global_to_local_layer_number = {
l: i for i, l in enumerate(sorted(df["layer"].unique()))
}
df.loc[:, ["local_layer"]] = df["layer"].map(global_to_local_layer_number)
elif df["layer"].min() != 1:
# if the layer numbers do not start from 1
print(
"Warning: The layer numbers do not start from 1. Will only use the "
"consecutive layers from the last one."
)
global_to_local_layer_number = {
l: i for i, l in enumerate(sorted(df["layer"].unique()))
}
df.loc[:, ["local_layer"]] = df["layer"].map(global_to_local_layer_number)
else:
df.loc[:, ["local_layer"]] = df["layer"]
# while there is any layer whose post is not the same as the next layer's pre ----
while any(
[
set(df[df.local_layer == i]["post"])
!= set(df[df.local_layer == (i + 1)]["pre"])
for i in range(1, df.local_layer.max())
]
):
if keep is not None:
if isinstance(keep, str):
keep = [keep]
keep = set(keep)
if all(
[
set(df[df.local_layer == i]["post"]).union(keep)
== set(df[df.local_layer == (i + 1)]["pre"]).union(keep)
for i in range(1, df.local_layer.max())
]
):
break
else:
keep = set()
if keep_targets_in_middle:
if target_indices is not None:
keep = keep.union(target_indices)
# start adding each layer to df_layers_update ---
df_layers_update = []
# if there are only two layers
if df.local_layer.max() == 2:
df_layer = df[df.local_layer == 2]
df_prev_layer = df[df.local_layer == 1]
# filter by pre in the second layer
df_layers_update.append(
df_layer[df_layer["pre"].isin(set(df_prev_layer["post"]).union(keep))]
)
# filter by post in the first layer
df_layers_update.append(
df_prev_layer[
df_prev_layer["post"].isin(set(df_layer["pre"]).union(keep))
]
)
df = pd.concat(df_layers_update)
else:
for i in range(2, df.local_layer.max()):
df_layer = df[df.local_layer == i]
df_next_layer = df[df.local_layer == i + 1]
df_prev_layer = df[df.local_layer == i - 1]
# pre that should be in the current layer: an intersection of
# previous layer's post; and current layer's pre
df_pre = set.intersection(
set(df_prev_layer["post"]) & set(df_layer["pre"])
)
# post that should be in the current layer: an intersection of
# current layer's post; and next layer's pre
df_post = set.intersection(
set(df_layer["post"]), set(df_next_layer["pre"])
)
if i == 2:
# add edges in the first layer
df_prev_layer = df_prev_layer[
df_prev_layer["post"].isin(df_pre.union(keep))
]
df_layers_update.append(df_prev_layer)
df_layer = df_layer[
df_layer["pre"].isin(df_pre.union(keep))
& df_layer["post"].isin(df_post.union(keep))
]
if df_layer.shape[0] == 0:
raise ValueError(
"No path found. Try lowering the threshold for the edges to be included in the path."
)
df_layers_update.append(df_layer)
if i == (df.local_layer.max() - 1):
# add edges in the last layer
if target_indices is None:
df_next_layer = df_next_layer[
df_next_layer["pre"].isin(df_post.union(keep))
]
else:
df_next_layer = df_next_layer[
df_next_layer["pre"].isin(
df_post.union(keep).union(target_indices)
)
]
df_layers_update.append(df_next_layer)
df = pd.concat(df_layers_update)
# at this point, if no edges left: raise error
if df.shape[0] == 0:
raise ValueError(
"No path found. Try relaxing the criteria for edge inclusion."
)
df.drop(columns="local_layer", inplace=True)
# in case we removed all the connections in the last layer
if df["layer"].max() != max_layer_num:
raise ValueError(
"No path found. Try lowering the threshold for the edges to be "
"included in the path. Currently no connections are found in the "
"last layer."
)
return df
[docs]
def remove_excess_neurons_batched(
df: pd.DataFrame,
keep=None,
target_indices=None,
keep_targets_in_middle: bool = False,
) -> pd.DataFrame:
"""Does the same thing as `remove_excess_neurons()`, but for batched input
(i.e. assumes column `batch` in `df`).
Args:
df (pd.DataFrame): a filtered dataframe with similar structure as the
dataframe returned by `find_path_iteratively()`. Must contain a
column `batch`.
keep (list, set, pd.Series, numpy.ndarray, str, optional): A list of
neuron indices that should be kept in the paths, even if they don't
connect between input and target in the last layer. Defaults to
None.
target_indices (list, set, pd.Series, numpy.ndarray, str, optional): A
list of target neuron indices that should be kept in the last
layer. Defaults to None, in which case all neurons in the last
layer in `df` would be kept.
keep_targets_in_middle (bool, optional): If True, the target_indices
are kept in the middle layers as well, even if they don't connect
between input and target in the last layer. Defaults to False.
Returns:
pd.DataFrame: The filtered DataFrame containing the path data,
including the layer number, pre-synaptic index, post-synaptic
index, and weight
"""
# first check if 'batch' is in the columns
if "batch" not in df.columns:
raise ValueError("The column `batch` is not in the DataFrame.")
dfs = []
for b in tqdm(df.batch.unique()):
df_batch = df[df.batch == b]
df_batch = remove_excess_neurons(
df_batch, keep, target_indices, keep_targets_in_middle
)
dfs.append(df_batch)
return pd.concat(dfs)
[docs]
def filter_paths(
df: pd.DataFrame,
threshold: float = 0,
necessary_intermediate: Dict[int, arrayable] | None = None,
) -> pd.DataFrame:
"""Filters the paths based on the weight threshold and the necessary
intermediate neurons. The weight threshold refers to the direct
connectivity between connected neurons in the path. It is recommended to
not put too may neurons in necessary_intermediate, as it may be too
stringent and remove all paths.
Args:
df (pd.DataFrame): The DataFrame containing the path data, including
the layer number, pre-synaptic index, post-synaptic index, and
weight.
threshold (float, optional): The threshold for the weight of the
direct connection between pre and post. Defaults to 0.
necessary_intermediate (dict, optional): A dictionary of necessary
intermediate neurons, where the keys are the layer numbers
(starting neurons: 1; directly downstream: 2) and the values are
the neuron indices (can be int, float, list, set, numpy.ndarray,
or pandas.Series). Defaults to None.
Returns:
pd.DataFrame: The filtered DataFrame containing the path data,
including the layer number, pre-synaptic index, post-synaptic
index, and weight.
"""
if df.shape[0] == 0:
# raise error
raise ValueError("The input DataFrame for filter_paths() is empty! ")
if threshold > 0:
df = df[df.weight > threshold]
if df.shape[0] == 0:
print("No edges left after thresholding. Try lowering the threshold.")
return
df = remove_excess_neurons(df)
if necessary_intermediate is not None:
for layer, indices in necessary_intermediate.items():
if type(indices) != list:
indices = list(to_nparray(indices))
if layer < 1:
# error: layer has to be an integer >=1
raise ValueError("Layer has to be an integer >=1")
if layer < (df["layer"].max() + 1):
df = df[df["pre"].isin(indices) | (df["layer"] != layer)]
elif layer == (df["layer"].max() + 1):
# filter for targets
df = df[df["post"].isin(indices) | ((df["layer"] + 1) != layer)]
else:
# error: layer number too big
raise ValueError("Layer number too big")
df = remove_excess_neurons(df)
return df
[docs]
def group_paths(
paths: pd.DataFrame,
pre_group: dict,
post_group: dict,
intermediate_group: dict | None = None,
avg_within_connected: bool = False,
outprop: bool = False,
combining_method: str = "mean",
) -> pd.DataFrame:
"""
Group the paths by user-specified variable (e.g. cell type, cell class
etc.). Weights are summed across presynaptic neurons of the same group and
averaged across all postsynaptic neurons of the same group (even if some
postsynaptic neurons are not in `paths`).
Args:
paths (pd.DataFrame): The DataFrame containing the path data, looking
like the output from `find_path_iteratively()`.
pre_group (dict): A dictionary that maps pre-synaptic neuron indices
to their respective group.
post_group (dict): A dictionary that maps post-synaptic neuron indices
to their respective group.
intermediate_group (dict, optional): A dictionary that maps
intermediate neuron indices to their respective group. Defaults to
None. If None, it will be set to pre_group.
avg_within_connected (bool, optional): If True, the average weight is
calculated within the connected neurons of the same group. If
False, the average weight is calculated across all neurons of the
same group. Defaults to False.
outprop (bool, optional): If True, get the summed output proportion (across
recipient single cells in the same cell type) for each average sender. If
False (default), get the summed input proportion across all senders for
each average recipient.
combining_method (str, optional): Method to combine inputs (outprop=False)
or outputs (outprop=True). Can be 'sum', 'mean', or 'median'. Defaults to
'mean'.
Returns:
pd.DataFrame: The grouped DataFrame containing the path data,
including the layer number, pre-synaptic index, post-synaptic
index, and weight.
"""
assert combining_method in ["mean", "sum", "median"], (
"The combining_method should be either 'mean', 'sum' or 'median'. "
f"Currently it is {combining_method}."
)
if intermediate_group is None:
intermediate_group = pre_group
# make values in pre_group, post_group, and intermediate_group strings
pre_group = {k: str(v) for k, v in pre_group.items()}
post_group = {k: str(v) for k, v in post_group.items()}
intermediate_group = {k: str(v) for k, v in intermediate_group.items()}
# add cell type information
if "layer" in paths.columns:
# first use intermediate_group, then modify specifically for pre at the
# first layer, and post at the last layer
paths["pre_type"] = paths["pre"].map(intermediate_group).astype(str)
paths["post_type"] = paths["post"].map(intermediate_group).astype(str)
paths.loc[paths["layer"] == paths["layer"].min(), "pre_type"] = (
paths.loc[paths["layer"] == paths["layer"].min(), "pre"]
.map(pre_group)
.astype(str)
)
paths.loc[paths["layer"] == paths["layer"].max(), "post_type"] = (
paths.loc[paths["layer"] == paths["layer"].max(), "post"]
.map(post_group)
.astype(str)
)
group_columns = ["layer", "pre_type", "post_type"]
else:
paths["pre_type"] = paths["pre"].map(pre_group).astype(str)
paths["post_type"] = paths["post"].map(post_group).astype(str)
group_columns = ["pre_type", "post_type"]
if not outprop:
# in this case, calculating the summed input proportion across all senders for
# each average recipient
if not avg_within_connected:
# sometimes only one neuron in a type is connected to another type, so
# only this connection is in paths
# but to calculate the average weight between two types, we should
# take into account all the neurons of the post-synaptic type
# so let's count the number of neurons in each post-synaptic type
nneuron_per_type = count_keys_per_value(intermediate_group)
nneuron_per_type.update(count_keys_per_value(post_group))
else:
# count number of unique neurons in each type
nneuron_per_type = paths.groupby("post_type")["post"].nunique().to_dict()
if combining_method == "median":
paths_weights = (
paths.groupby(group_columns)["weight"].median().reset_index()
)
elif combining_method == "sum":
# sum across presynaptic neurons of the same type
paths_weights = paths.groupby(group_columns).weight.sum().reset_index()
elif combining_method == "mean":
# sum across presynaptic neurons of the same type
paths_weights = paths.groupby(group_columns).weight.sum().reset_index()
# divide by number of postsynaptic neurons of the same type
paths_weights["nneuron_post"] = paths_weights.post_type.map(
nneuron_per_type
)
paths_weights["weight"] = paths_weights.weight / paths_weights.nneuron_post
paths_weights.drop(columns="nneuron_post", inplace=True)
if "pre_activation" in paths.columns:
paths = (
paths.groupby(group_columns)
.agg(
pre_activation=("pre_activation", "mean"),
post_activation=("post_activation", "mean"),
)
.reset_index()
)
# then merge the weights
paths = pd.merge(paths, paths_weights, on=group_columns)
else:
paths = paths_weights.copy()
paths.rename(columns={"pre_type": "pre", "post_type": "post"}, inplace=True)
return paths
else: # outprop
# in this case, calculating the summed output proportion (across recipient
# single cells in the same cell type) for each average sender
if not avg_within_connected:
# sometimes only one neuron in a type is connected to another type, so
# only this connection is in paths
# but to calculate the average weight between two types, we should
# take into account all the neurons of the post-synaptic type
# so let's count the number of neurons in each post-synaptic type
nneuron_per_type = count_keys_per_value(intermediate_group)
nneuron_per_type.update(count_keys_per_value(pre_group))
else:
# count number of unique neurons in each type
nneuron_per_type = paths.groupby("pre_type")["pre"].nunique().to_dict()
# weights
if combining_method == "median":
paths_weights = (
paths.groupby(group_columns)["weight"].median().reset_index()
)
elif combining_method == "sum":
# sum across presynaptic neurons of the same type
paths_weights = paths.groupby(group_columns).weight.sum().reset_index()
elif combining_method == "mean":
# sum across presynaptic neurons of the same type
paths_weights = paths.groupby(group_columns).weight.sum().reset_index()
# divide by number of presynaptic neurons of the same type
paths_weights["nneuron_pre"] = paths_weights.pre_type.map(nneuron_per_type)
paths_weights["weight"] = paths_weights.weight / paths_weights.nneuron_pre
paths_weights.drop(columns="nneuron_pre", inplace=True)
if "pre_activation" in paths.columns:
paths = (
paths.groupby(group_columns)
.agg(
pre_activation=("pre_activation", "mean"),
post_activation=("post_activation", "mean"),
)
.reset_index()
)
# then merge the weights
paths = pd.merge(paths, paths_weights, on=group_columns)
else:
paths = paths_weights.copy()
paths.rename(columns={"pre_type": "pre", "post_type": "post"}, inplace=True)
return paths
[docs]
def compare_layered_paths(
paths: List[pd.DataFrame],
priority_indices=None,
neuron_to_sign: dict | None = None,
sign_color_map: dict = {1: "red", -1: "blue"},
el_colours: List[str] = ["rosybrown", "burlywood"],
legend_labels: List[str] = ["Path 1", "Path 2"],
weight_decimals: int = 2,
figsize: tuple = (10, 8),
label_pos: List[float] = [0.7, 0.7],
):
"""
Compare two layered paths by overlaying them and annotating the weights. The paths
should be in the format of the output from `find_path_iteratively()`. The width of
the edges is based on the weight in the first path, when the connection is present
in both paths.
Args:
paths (List[pd.DataFrame]): A list of two DataFrames containing the
path data, including columns 'layer', 'pre', 'post', and 'weight'.
priority_indices (list, set, pd.Series, numpy.ndarray, optional): A
list of neuron indices that should be plotted on top of each layer.
Defaults to None.
neuron_to_sign (dict, optional): A dictionary that maps neuron indices
to their signs. Defaults to None.
sign_color_map (dict, optional): A dictionary that maps neuron signs
to their respective colors. Defaults to {1: 'red', -1: 'blue'}.
el_colours (List[str], optional): A list of two colors for the edge
labels of the two paths. Defaults to ['rosybrown', 'burlywood'].
legend_labels (List[str], optional): A list of two labels for the
legend. Defaults to ['Path 1', 'Path 2'].
weight_decimals (int, optional): The number of decimal places to round
the edge weights to. Defaults to 2.
figsize (tuple, optional): The size of the figure. Defaults to (10, 8).
Returns:
None: The function plots the graph.
"""
# add layer columns if necessary
new_paths = []
for path in paths:
# Create a 'post_layer' column to use as unique identifiers
if "post_layer" not in path.columns:
path["post_layer"] = (
path["post"].astype(str) + "_" + path["layer"].astype(str)
)
if "pre_layer" not in path.columns:
path["pre_layer"] = (
path["pre"].astype(str) + "_" + (path.layer - 1).astype(str)
)
new_paths.append(path)
composite_paths = pd.concat(new_paths)
# get unique edges
composite_paths.drop_duplicates(["pre_layer", "post_layer"], inplace=True)
composite_G = nx.from_pandas_edgelist(
composite_paths,
"pre_layer",
"post_layer",
["weight"],
create_using=nx.DiGraph(),
)
# Labels for nodes
labels = dict(zip(composite_paths.post_layer, composite_paths.post))
labels.update(dict(zip(composite_paths.pre_layer, composite_paths.pre)))
# Determine the width of the edges
weights = [composite_G[u][v]["weight"] for u, v in composite_G.edges()]
weight_min = min(weights)
weight_max = max(weights)
if weight_min == weight_max:
widths = [1.0] * len(weights)
else:
widths = [1 + 9 * (w - weight_min) / (weight_max - weight_min) for w in weights]
# Generate positions
positions = create_layered_positions(composite_paths, priority_indices)
# Edge colors based on neuron signs
default_color = "lightgrey" # Default edge color
# specify edge colour based on pre-neuron sign, if available
edge_colors = []
for u, _ in composite_G.edges():
pre_neuron = labels[u]
if neuron_to_sign and pre_neuron in neuron_to_sign:
edge_colors.append(
sign_color_map.get(neuron_to_sign[pre_neuron], default_color)
)
else:
edge_colors.append(default_color)
# Plot the graph
_, ax = plt.subplots(figsize=figsize)
nx.draw(
composite_G,
pos=positions,
labels=labels,
with_labels=True,
node_size=100,
node_color="lightblue",
arrows=True,
arrowstyle="-|>",
arrowsize=10,
font_size=8,
width=widths,
ax=ax,
edge_color=edge_colors,
)
G1 = nx.from_pandas_edgelist(
paths[0],
"pre_layer",
"post_layer",
["weight"],
create_using=nx.DiGraph(),
)
G2 = nx.from_pandas_edgelist(
paths[1],
"pre_layer",
"post_layer",
["weight"],
create_using=nx.DiGraph(),
)
el1 = {
(u, v): f'{data["weight"]:.{weight_decimals}f}'
for u, v, data in G1.edges(data=True)
}
el2 = {
(u, v): f'{data["weight"]:.{weight_decimals}f}'
for u, v, data in G2.edges(data=True)
}
nx.draw_networkx_edge_labels(
G1,
pos=positions,
edge_labels=el1,
horizontalalignment="left",
verticalalignment="top",
font_color=el_colours[0],
ax=ax,
label_pos=label_pos[0],
)
nx.draw_networkx_edge_labels(
G2,
pos=positions,
edge_labels=el2,
horizontalalignment="right",
verticalalignment="bottom",
font_color=el_colours[1],
ax=ax,
label_pos=label_pos[1],
)
ax.set_ylim(0, 1)
# add lengend using the el_colours
ax.legend(
handles=[
mpatches.Patch(color=el_colours[0], label=legend_labels[0]),
mpatches.Patch(color=el_colours[1], label=legend_labels[1]),
],
loc="upper right",
)
plt.show()
[docs]
@dataclass
class XORCircuit:
# Input nodes
input1: list
input2: list
# Middle layer nodes
exciter1: str # Takes input from input1 only
exciter2: str # Takes input from input2 only
inhibitor: str # Takes input from both inputs
# Output node
output: list
[docs]
def find_xor(paths: pd.DataFrame) -> List[XORCircuit]:
"""
Find XOR-like circuits in a 3-layer network, based on [Wang et al. 2024]
(https://www.biorxiv.org/content/10.1101/2024.09.24.614724v2). Note: this
function currently ignores middle excitatory neruons that receive both
inputs.
Args:
paths: DataFrame with columns ['pre', 'post', 'sign', 'layer']
pre: source node
post: target node
sign: 1 (excitatory) or -1 (inhibitory)
layer: 1 (input->middle) or 2 (middle->output)
Returns:
List of XORCircuit objects, each representing a found XOR motif
"""
# checking input ----
if set(paths["layer"]) != {1, 2}:
raise ValueError(
"The input DataFrame should have exactly 2 unique layers, 1 and 2"
)
# check column names of paths
if not {"pre", "post", "sign", "layer"}.issubset(paths.columns):
raise ValueError(
"The input DataFrame should have columns ['pre', 'post', 'sign', "
"'layer']"
)
# check values of signs: if it's a subset of {1, -1}
if not set(paths["sign"]) <= {1, -1}:
raise ValueError(
f"The input DataFrame should have values 1 (excitatory) or -1 (inhibitory) in the 'sign' column, but got {set(paths['sign'])}"
)
# define variables ----
circuits: list[XORCircuit] = []
exciters = paths["pre"][(paths["layer"] == 2) & (paths["sign"] == 1)].unique()
inhibitors = paths["pre"][(paths["layer"] == 2) & (paths["sign"] == -1)].unique()
l1 = paths[paths["layer"] == 1]
l2 = paths[paths["layer"] == 2]
exciter_us_dict: dict[int | str, set[int | str]] = {
exc: set(l1["pre"][l1["post"] == exc]) for exc in exciters
}
inhibitor_us_dict = {inh: set(l1["pre"][l1["post"] == inh]) for inh in inhibitors}
exciter_ds_dict = {exc: set(l2["post"][l2["pre"] == exc]) for exc in exciters}
inhibitor_ds_dict = {inh: set(l2["post"][l2["pre"] == inh]) for inh in inhibitors}
# main algorithm ----
for e1, e2 in itertools.combinations(exciters, 2):
common = exciter_us_dict[e1] & exciter_us_dict[e2]
onlye1 = exciter_us_dict[e1] - common
onlye2 = exciter_us_dict[e2] - common
for i in inhibitors:
e1_i_intersect = onlye1 & inhibitor_us_dict[i]
e2_i_intersect = onlye2 & inhibitor_us_dict[i]
if (len(e1_i_intersect) == 0) or (len(e2_i_intersect) == 0):
continue
targets = exciter_ds_dict[e1] & exciter_ds_dict[e2] & inhibitor_ds_dict[i]
if not targets:
continue
circuits.append(
XORCircuit(
input1=list(e1_i_intersect),
input2=list(e2_i_intersect),
exciter1=e1,
exciter2=e2,
inhibitor=i,
output=list(targets),
)
)
return circuits
[docs]
def path_for_ngl(path):
"""
Convert a path DataFrame to one that can be used to visualize the path in
neuroglancer with `get_ngl_link(df_format = 'long')`. Neurons are coloured
by their (indirect) connectivity (calculated using
`effective_conn_from_paths()`) to an average neuron in the last layer. `pre`
and `post` columns must contain neuron ids. This function can be used for
visualizing signal propagation in a pathway.
Args:
path (pd.DataFrame): The DataFrame containing the path data, including
the layer number, pre-synaptic index, post-synaptic index, and
weight.
Returns:
pd.DataFrame: A DataFrame with columns 'neuron_id', 'layer', and
'activation' (which is (indirect) connectivity in this case),
suitable for Neuroglancer visualization.
"""
out = []
for alayer in path.layer.unique():
path_sub = path[path.layer >= alayer]
effconn = effective_conn_from_paths(path_sub)
# get mean across columns, for each row
effconn = effconn.mean(axis=1).to_frame()
effconn.loc[:, ["layer"]] = alayer
out.append(effconn)
out = pd.concat(out, axis=0).reset_index()
out.columns = ["neuron_id", "activation", "layer"]
out = pd.concat(
[
out,
(
pd.DataFrame(
{
"neuron_id": path_sub.post,
"activation": 1,
"layer": alayer + 1,
}
)
),
]
)
return out
[docs]
def connected_components(
paths: pd.DataFrame,
threshold: float = 0,
) -> list:
"""
Find connected components in a directed graph represented by a DataFrame
of paths. The DataFrame should contain columns 'pre', 'post', and 'weight'.
The function filters the paths based on a weight threshold and then
constructs a directed graph using NetworkX. It identifies weakly connected
components in the graph and returns a list of DataFrames of paths, each
representing a connected component.
Args:
paths (pd.DataFrame): The DataFrame containing the path data, including
the layer number, pre-synaptic index, post-synaptic index, and
weight.
threshold (float, optional): The threshold for the weight of the
direct connection between pre and post. Defaults to 0.
Returns:
list: A list of DataFrames, each representing a connected component
in the directed graph.
"""
paths = filter_paths(paths, threshold)
paths_unique_edges = paths.drop_duplicates(subset=["pre", "post"])
# Create a graph from the DataFrame
G = nx.from_pandas_edgelist(
paths_unique_edges,
source="pre",
target="post",
edge_attr="weight",
create_using=nx.DiGraph(),
)
# Find connected components
weak_components = list(nx.weakly_connected_components(G))
components = []
for i, component in enumerate(weak_components):
path = paths[paths.pre.isin(component) & paths.post.isin(component)]
path.loc[:, ["component_idx"]] = i
path = remove_excess_neurons(path)
components.append(path)
return components
[docs]
def el_within_n_steps(
inprop: spmatrix,
steps: list,
inidx: arrayable,
outidx: arrayable,
n: int,
threshold: float = 0,
pre_group: dict = None,
post_group: dict = None,
return_raw_el: bool = False,
):
"""
Find paths within a specified number of steps in a directed graph, starting from
input indices and ending at output indices. The unique edges are returned. Filtering
by `threshold` happens after grouping if `idx_to_group` is provided.
Args:
inprop (spmatrix): The connectivity matrix, with presynaptic in the rows.
steps (list): A list of connectivity matrices, e.g. the result from
`compress_paths()`.
inidx (arrayable): The input neuron indices to start the paths from.
outidx (arrayable): The output neuron indices to end the paths at.
n (int): The maximum number of hops. n=1 for direct connections.
threshold (float, optional): The threshold for the weight of the direct
connection between pre and post. Defaults to 0.
idx_to_group (dict, optional): A dictionary mapping neuron indices to
their respective groups. Defaults to None.
Returns:
pd.DataFrame: A DataFrame containing the edges of the paths found,
including columns 'pre', 'post', and 'weight'.
"""
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
all_paths = []
raw_el = []
for i in tqdm(range(n)):
paths = find_path_iteratively(inprop, steps, inidx, outidx, i + 1, threshold=0)
if paths is None:
continue
if return_raw_el:
raw_el.append(paths)
if pre_group is not None and post_group is not None:
paths = group_paths(paths, pre_group, post_group)
paths = filter_paths(paths, threshold)
all_paths.append(paths)
all_paths = pd.concat(all_paths, axis=0)
el = all_paths.drop_duplicates(subset=["pre", "post", "weight"])
if return_raw_el:
raw_el = pd.concat(raw_el, axis=0)
raw_el = raw_el.drop_duplicates(subset=["pre", "post", "weight"])
return el[["pre", "post", "weight"]], raw_el
else:
return el[["pre", "post", "weight"]]