Source code for connectome_interpreter.path_finding

# Standard library imports
import itertools
from dataclasses import dataclass
from typing import Dict, List, Union, Tuple, Optional, Generator
from collections import defaultdict

# Third-party package imports
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
from scipy.sparse import issparse, spmatrix, csr_matrix
from tqdm import tqdm
from scipy.sparse.csgraph import shortest_path

from .utils import (
    arrayable,
    check_consecutive_layers,
    count_keys_per_value,
    to_nparray,
)


[docs] def find_path_once( inprop, steps_cpu, inidx: arrayable, outidx: arrayable, target_layer_number, top_n=-1, threshold=0, ): """ Finds the path once between input and output, of distance target_layer_number, returning indices of neurons in the previous layer that connect the input with the output. This works by taking the top_n direct upstream partners of the outidx neurons, and intersect those with neurons 'effectively' connected (through steps_cpu) to the inidx neurons. Args: inprop (scipy.sparse.csc_matrix): The connectivity matrix in Compressed Sparse Column format. steps_cpu (list): A list of compressed connectivity matrices: one matrix for each compressed path length. inidx (int, float, list, set, numpy.ndarray, or pandas.Series): The input neuron index/indices. outidx (int, float, list, set, numpy.ndarray, or pandas.Series): The output neuron index/indices. target_layer_number (int): The target layer number to examine. Must be >= 1. When target_layer_number = 1, we are looking at the direct synaptic connectivity. top_n (int, optional): The number of top connections to consider based on direct connectivity from inprop_csc. If top_n = -1, all connections are considered. threshold (float, optional): The threshold of the direct connectivity from inidx to an average outidx. Returns: np.ndarray: An array of neuron indices in the previous layer that have significant connectivity, connecting between the `inidx` and `outidx`. """ inprop_csc = inprop.copy() inprop_csc.data = np.abs(inprop.data) # make sure they are integers inidx = [int(i) for i in inidx] outidx = [int(i) for i in outidx] inidx = to_nparray(inidx) outidx = to_nparray(outidx) if target_layer_number == 1: # if the target layer is 1, we are looking at the direct synaptic # connectivity # so we just need to find the indices of the non-zero values in the # inprop_csc matrix that correspond to the outidx, and intersect # those with the inidx we are interested in. colidx = inidx else: # first get the neurons that effectively connect to inidx, at layer # number target_layer_number - 1. # for example, in the ORN->PN->KC case, target_layer_number = 2 for KCs. # top_n_row_indices are indices of the PNs that connect to the KCs. So # in this case we should use direct connectivity from PNs to get the # ORNs. # so when target_layer_number == 2, we should use steps_cpu[0] (direct # connectivity), that is, target_layer_number-2: # subtract 1 for getting top_n_row_indices below which is going one # step upstream; and another for 0-based indexing in steps_cpu. # the next line gets the targets that receive non-zero compressed # input from inidx # when target_layer_number >= 2. # .nonzero() returns row, column of the nonzero values. colidx = steps_cpu[target_layer_number - 2][inidx, :].nonzero()[1] # then go back one step from the outidx, based on the direct connectivity # matrix (inprop) us = inprop_csc[:, outidx].nonzero() # intersect the non-zero upstream neurons with those effectively connected # across layers (colidx) # these are still row indices of inprop intersect = np.intersect1d(us[0], colidx) # direct connectivity between target_layer_number-1 and target_layer_number submatrix = inprop_csc[intersect, :][:, outidx] # in case outidx is more than one index, calculate the average of each row # in the submatrix # same length as len(intersect) row_averages = np.array(submatrix.mean(axis=1)).flatten() # thresholding # these are indices of row_averages, which can also be used to index # row_averages thresholded_indices = np.where(row_averages >= threshold)[0] thresholded_row_averages = row_averages[thresholded_indices] # thresholded_intersect are a subset of of intersect, which are indices of # inprop_csc # intersect has the same length as row_averages thresholded_intersect = intersect[thresholded_indices] # Find the indices (of thresholded_row_averages) of the top n averages if top_n == -1: top_n_indices = np.argsort(thresholded_row_averages) else: top_n_indices = np.argsort(thresholded_row_averages)[-top_n:] # Get the original row indices corresponding to these top n averages top_n_row_indices = thresholded_intersect[top_n_indices] # make the edgelist submatrix = inprop_csc[top_n_row_indices, :][:, outidx] # Convert submatrix to COO format to efficiently find non-zero elements coo_submatrix = submatrix.tocoo() # Create DataFrame directly from COO format data df = pd.DataFrame( { # Map back to original indices "pre": top_n_row_indices[coo_submatrix.row], # Map back to original output indices "post": outidx[coo_submatrix.col], "weight": coo_submatrix.data, } ) return df
[docs] def find_paths_of_length( edgelist: Union[spmatrix, pd.DataFrame], inidx: arrayable, outidx: arrayable, target_layer_number: int, quiet: bool = False, ): """ Finds the path of length target_layer_number between inidx and outidx, returning the edgelist in a DataFrame, including the pre and post indices, the layer (direct connections from inidx: layer = 1), and the weight of the direct connection between pre and post. Args: edgelist (Union[spmatrix, pd.DataFrame]): The edgelist of the entire graph. If a DataFrame, it must contain columns "pre", "post", and "weight". If a sparse matrix, the pre needs to be in the rows. inidx (int, float, list, set, numpy.ndarray, or pandas.Series): The source indices. outidx (int, float, list, set, numpy.ndarray, or pandas.Series): The target indices. target_layer_number (int): The target layer number to examine. Must be >= 1. When target_layer_number = 1, we are looking at the direct synaptic connectivity. Returns: pd.DataFrame: A DataFrame containing the path data, including the pre-synaptic and post-synaptic neuron indices, the layer (direct connections from inidx: layer = 1), and the weight (input proportion of the postsynaptic neuron) of the direct connection between pre and post. If no path is found, returns None. """ inidx = to_nparray(inidx) outidx = to_nparray(outidx) assert len(inidx) > 0, "inidx must not be empty" assert len(outidx) > 0, "outidx must not be empty" if isinstance(edgelist, pd.DataFrame): # if edgelist is a DataFrame, convert it to a sparse matrix if not all(col in edgelist.columns for col in ["pre", "post", "weight"]): raise ValueError( "edgelist DataFrame must contain 'pre', 'post', and 'weight' columns." ) elif isinstance(edgelist, spmatrix): edgelist = edgelist.tocoo() edgelist = pd.DataFrame( data={ "pre": edgelist.row, "post": edgelist.col, "weight": edgelist.data, } ) else: raise TypeError("edgelist must be a pandas DataFrame or a scipy sparse matrix.") assert target_layer_number >= 1, "target_layer_number must be >= 1" if target_layer_number == 1: # if target_layer_number is 1, we just return the direct connections el = edgelist[ (edgelist["pre"].isin(inidx)) & (edgelist["post"].isin(outidx)) ].copy() if el.empty: return None el.loc[:, ["layer"]] = 1 return el # first find the middle layer: if target_layer_number % 2 == 0: # when target_layer_number == 2, the middle layer is 1 middle_layer = target_layer_number // 2 else: # when target_layer_number == 3, the middle layer is 2 middle_layer = (target_layer_number + 1) // 2 # list of empty lists, of length target_layer_number+1 layer_indices = [[] for _ in range(target_layer_number + 1)] # first one is inidx layer_indices[0] = inidx # last one is outidx layer_indices[-1] = outidx # first go from inidx to middle layer for layer in range(1, middle_layer + 1): layer_indices[layer] = list( edgelist[edgelist["pre"].isin(layer_indices[layer - 1])]["post"].unique() ) if len(layer_indices[layer]) == 0: if not quiet: print(f"No neurons found in layer {layer}. Returning None.") return None # then go from outidx to middle layer # stopping at the layer after middle_layer # when target_layer_number is 2 or 3, this is skipped # when target_layer_number == 4: for layer in range(target_layer_number - 1, middle_layer, -1): # layer is 3 layer_indices[layer] = list( edgelist[edgelist["post"].isin(layer_indices[layer + 1])]["pre"].unique() ) if len(layer_indices[layer]) == 0: if not quiet: print(f"No neurons found in layer {layer}. Returning None.") return None # link middle layer with the one after el = edgelist[ (edgelist["pre"].isin(layer_indices[middle_layer])) & (edgelist["post"].isin(layer_indices[middle_layer + 1])) ] layer_indices[middle_layer] = el["pre"].unique() layer_indices[middle_layer + 1] = el["post"].unique() if ( len(layer_indices[middle_layer]) == 0 or len(layer_indices[middle_layer + 1]) == 0 ): if not quiet: print( f"No neurons found in layer {middle_layer} or {middle_layer + 1}. Returning None." ) return None # now go from middle layer to the first layer for layer in range(middle_layer - 1, -1, -1): el = edgelist[ edgelist["pre"].isin(layer_indices[layer]) & edgelist["post"].isin(layer_indices[layer + 1]) ] layer_indices[layer] = el["pre"].unique() if len(layer_indices[layer]) == 0: if not quiet: print(f"No neurons found in layer {layer}. Returning None.") return None # now go from middle layer to the last layer for layer in range(middle_layer + 1, target_layer_number): el = edgelist[ edgelist["pre"].isin(layer_indices[layer - 1]) & edgelist["post"].isin(layer_indices[layer]) ] layer_indices[layer] = el["post"].unique() if len(layer_indices[layer]) == 0: if not quiet: print(f"No neurons found in layer {layer}. Returning None.") return None # add the weights path = [] for layer in range(target_layer_number): el = edgelist[ (edgelist["pre"].isin(layer_indices[layer])) & (edgelist["post"].isin(layer_indices[layer + 1])) ] el.loc[:, ["layer"]] = layer + 1 path.append(el) return pd.concat(path, ignore_index=True)
[docs] def enumerate_paths( edgelist: pd.DataFrame, start_layer: int = 1, end_layer: Optional[int] = None, return_generator: bool = False, loop_mode: str = "allow", ) -> Union[ List[List[Tuple[Union[str, int], Union[str, int], float]]], Generator[List[Tuple[Union[str, int], Union[str, int], float]], None, None], ]: """ Finds all paths that begin with an edge in start_layer and end with an edge in end_layer, assuming valid paths proceed layer-by-layer without skipping. This function was written by Claude. Args: edgelist (pd.DataFrame): The edgelist of the entire graph. Must contain columns: "layer", "pre", "post", and "weight". Each row is a directed, weighted edge from "pre" to "post" at a given layer. start_layer (int): The layer from which all paths must begin. Must be <= end_layer. end_layer (int): The layer at which all paths must terminate. If None, defaults to the maximum layer in the edgelist. return_generator (bool, optional): If True, returns a generator instead of a list. Useful for large graphs to avoid memory issues. Defaults to False. loop_mode (str, optional): How to handle loops in paths. Options: - "allow" (default): Return all paths, including those that revisit a vertex. - "exclude": Return only loop-free (simple) paths. A node appearing at both the start and end of a path is allowed (e.g. A-B-C-A is kept, capturing the s-t cycle use case), but a node reappearing in the middle is a loop and the path is dropped (e.g. A-B-A-C). This matches the convention in :func:`count_paths`. Returns: Union[List[List[Tuple]], Generator]: If return_generator is False, returns a list of valid paths. If True, returns a generator. Each path is a list of (pre, post, weight) tuples, ordered from start to end. """ if end_layer is None: end_layer = edgelist["layer"].max() if start_layer > end_layer: raise ValueError("start_layer must be less than or equal to end_layer.") if loop_mode not in ("allow", "exclude"): raise ValueError(f"loop_mode must be 'allow' or 'exclude', got '{loop_mode}'") exclude_loops = loop_mode == "exclude" # Build adjacency: adj[layer][pre] → list of (post, weight) # pre and post can be either strings or integers adj: Dict[int, Dict[Union[str, int], List[Tuple[Union[str, int], float]]]] = ( defaultdict(lambda: defaultdict(list)) ) for _, row in edgelist.iterrows(): adj[row["layer"]][row["pre"]].append((row["post"], row["weight"])) if return_generator: # Generator version - yields paths as they are found def generate_paths(): def dfs( node: Union[str, int], layer: int, path: List[Tuple[Union[str, int], Union[str, int], float]], visited: set, start_node: Union[str, int], ): # Loop check (only active when loop_mode == "exclude"). A revisited # node is rejected, except the start node reappearing exactly at the # final position, which closes an s-t cycle and is kept. if exclude_loops and node in visited: if layer == end_layer and node == start_node: yield path.copy() return if layer == end_layer: yield path.copy() # yield a copy to avoid mutation issues return if exclude_loops: visited.add(node) for post, wt in adj.get(layer + 1, {}).get(node, []): path.append((node, post, wt)) yield from dfs(post, layer + 1, path, visited, start_node) path.pop() if exclude_loops: visited.remove(node) for pre, edges in adj.get(start_layer, {}).items(): for post, wt in edges: yield from dfs(post, start_layer, [(pre, post, wt)], {pre}, pre) return generate_paths() else: # List version - uses mutable path for efficiency (append/pop instead of copying) paths: List[List[Tuple[Union[str, int], Union[str, int], float]]] = [] def dfs( node: Union[str, int], layer: int, path: List[Tuple[Union[str, int], Union[str, int], float]], visited: set, start_node: Union[str, int], ): # Loop check (only active when loop_mode == "exclude"). A revisited node # is rejected, except the start node reappearing exactly at the final # position, which closes an s-t cycle and is kept. if exclude_loops and node in visited: if layer == end_layer and node == start_node: paths.append(path.copy()) return if layer == end_layer: paths.append(path.copy()) # append a copy to avoid mutation return if exclude_loops: visited.add(node) for post, wt in adj.get(layer + 1, {}).get(node, []): path.append((node, post, wt)) dfs(post, layer + 1, path, visited, start_node) path.pop() if exclude_loops: visited.remove(node) for pre, edges in adj.get(start_layer, {}).items(): for post, wt in edges: dfs(post, start_layer, [(pre, post, wt)], {pre}, pre) return paths
[docs] def find_path_iteratively( inprop_csc: spmatrix, steps_cpu: list, inidx: arrayable, outidx: arrayable, target_layer_number: int, top_n: int = -1, threshold: float = 0, quiet: bool = False, ): """ Iteratively finds the path from the specified output (outidx) back to the input (inidx) across multiple layers, using the `find_path_once` function to traverse each layer. Args: inprop_csc (scipy.sparse matrix): The direct connectivity matrix in Compressed Sparse Column format. steps_cpu (list): A list of compressed connectivity matrices: one matrix for each compressed path length. inidx (int, float, list, set, numpy.ndarray, or pandas.Series): The input neuron indices. outidx (int, float, list, set, numpy.ndarray, or pandas.Series): The output neuron indices to start the reverse path finding. target_layer_number (int): The number of layers to traverse backwards from the outidx. If target_layer_number = 1, we are looking at the direct synaptic connectivity. top_n (int, optional): The number of top connections to consider at each layer based on direct connectivity from inprop_csc. If top_n = -1, all connections are considered. threshold (float, optional): The threshold for the average of the direct connectivity from inidx to outidx. quiet (bool, optional): If True, suppresses print statements. Defaults to False. Returns: pd.DataFrame: A DataFrame containing the path data, including the pre-synaptic and post-synaptic neuron indices, the layer (direct connections from inidx: layer = 1), and the weight (input proportion of the postsynaptic neuron) of the direct connection between pre and post. """ if not quiet: print( 'Have you tried "find_paths_of_length()" instead? About the same speed, no need to pre-load `steps`, and more accurate!' ) inprop_csc.data = np.abs(inprop_csc.data) inidx = to_nparray(inidx) outidx = to_nparray(outidx) if len(inidx) == 0 or len(outidx) == 0: # raise error raise ValueError("The input or output indices are empty!") if issparse(inprop_csc): # if stepsn is coo, turn into csc if inprop_csc.format == "coo": inprop_csc = inprop_csc.tocsc() # path_indices = [] # This will store the path data as a list of arrays dfs = [] # This will store the path data as a list of DataFrames current_outidx = outidx # from target_layer_number to 1, go back one step at a time for layer in range(target_layer_number, 0, -1): # Find the indices in the current layer that connect to the next layer df = find_path_once( inprop_csc, steps_cpu, inidx, current_outidx, layer, top_n, threshold, ) # If no indices are found, break the loop as no path can be formed if len(df) == 0: if not quiet: print(f"Cannot trace back to the input in {target_layer_number} steps.") if (top_n > -1) | (threshold > 0): print("Try lowering the threshold or increasing top_n.") return df["layer"] = layer dfs.append(df) # Update the outidx for the next iteration to move backwards through # layers current_outidx = set(df["pre"]) return pd.concat(dfs)
[docs] def create_layered_positions( df: pd.DataFrame, priority_indices=None, sort_dict: dict | None = None ) -> dict: """ Creates a dictionary of positions for each neuron in the paths, so that the paths can be visualized in a layered manner. It assumes that `df` contains the columns 'layer', 'pre_layer', 'post_layer' (or 'layer', 'pre', 'post'). If a neuron exists in multiple layers, it is plotted multiple times. Args: df (pd.DataFrame): The DataFrame containing the path data, including the layer number, pre-synaptic index, and post-synaptic index. priority_indices (list, set, pd.Series, numpy.ndarray optional): A list of neuron indices that should be plotted on top of each layer. Defaults to None. sort_dict (dict, optional): A dictionary of neuron indices as keys and their sorting order as values (bigger value is higher in the plot). Defaults to None. Returns: dict: A dictionary of positions for each neuron in the paths, with the keys as the neuron indices and the values as the (x, y) coordinates. """ # check if layer numbers in 'layer' column are consecutive. If anyone is # absent, raise an error if not check_consecutive_layers(df): raise ValueError("The layer numbers in 'layer' column are not consecutive. ") # if post_layer and pre_layer are not present, create them if "post_layer" not in df.columns: df["post_layer"] = df["post"].astype(str) + "_" + df["layer"].astype(str) if "pre_layer" not in df.columns: df["pre_layer"] = df["pre"].astype(str) + "_" + (df["layer"] - 1).astype(str) if priority_indices is not None: priority_indices = set(priority_indices) number_of_layers = df["layer"].nunique() layer_width = 1.0 / (number_of_layers + 1) positions = {} name_to_idx = dict(zip(df.pre_layer, df["pre"])) name_to_idx.update(dict(zip(df.post_layer, df["post"]))) global_to_local_layer_number = { l: i for i, l in enumerate(sorted(df["layer"].unique())) } df.loc[:, ["local_layer"]] = df["layer"].map(global_to_local_layer_number) for layer in range(0, number_of_layers + 1): if layer != number_of_layers: layer_name = list( set(df.pre_layer[df.local_layer == layer]).union( set(df.post_layer[df.local_layer == (layer - 1)]) ) ) else: layer_name = list(set(df.post_layer[df.local_layer == layer - 1])) if sort_dict is not None: layer_name = sorted( layer_name, key=lambda x: sort_dict.get(x, float("-inf")) ) if priority_indices is not None: layer_name_priority = [ item for item in layer_name if name_to_idx[item] in priority_indices ] layer_name_not = [ item for item in layer_name if name_to_idx[item] not in priority_indices ] layer_name = layer_name_not + layer_name_priority for index, neuron in enumerate(layer_name, start=1): positions[neuron] = ( layer * layer_width, # the later in the list, the higher index * 1.0 / (len(layer_name) + 1), ) return positions
[docs] def remove_excess_neurons( df: pd.DataFrame, keep=None, target_indices=None, keep_targets_in_middle: bool = False, quiet: bool = False, ) -> pd.DataFrame: """After filtering, some neurons are no longer on the paths between the input and output neurons. This function removes those neurons from the paths. Args: df (pd.Dataframe): a filtered dataframe with similar structure as the dataframe returned by `find_paths_of_length()`. If df is empty, and `quiet` is False, raises a ValueError. If `quiet` is True, returns None. keep (list, set, pd.Series, numpy.ndarray, str, optional): A list of neuron indices that should be kept in the paths, even if they don't connect between input and target in the last layer. Defaults to None. target_indices (list, set, pd.Series, numpy.ndarray, str, optional): A list of target neuron indices that should be kept in the last layer. Defaults to None, in which case all neurons in the last layer in `df` would be kept. keep_targets_in_middle (bool, optional): If True, the target_indices are kept in the middle layers as well, even if they don't connect between input and target in the last layer. Defaults to False. Returns: pd.Dataframe: A dataframe with similar structure as the result of `find_paths_of_length()`, with the excess neurons removed. If no path is found, returns None. """ if df.shape[0] == 0: if quiet: return else: # raise error raise ValueError( "No connections found in the input of `remove_excess_neurons()`. " ) max_layer_num = df["layer"].max() if max_layer_num == 1: return df # if target_indices are provided, first use this to filter the last layer ---- if target_indices is not None: # if it's a string or int, convert to list if isinstance(target_indices, str) or (type(target_indices) == int): target_indices = [target_indices] target_indices = set(target_indices) # check if datatype is the same, between target_indices and post if not all([type(i) == type(df["post"].iloc[0]) for i in target_indices]): raise ValueError( f"The datatype in `target_indices` should be the same as the elements in `post` column of the DataFrame. Elements in `post` is {type(df.post.iloc[0])}." ) if not target_indices.issubset(df[df["layer"] == df["layer"].max()]["post"]): raise ValueError( "The target indices are not in the post-synaptic neurons of the last layer. Here are the indices of the last layer: ", ", ".join(df[df["layer"] == df["layer"].max()]["post"].unique()), ". Your target_indices should be a subset.", ) df = df[(df["layer"] != df["layer"].max()) | df["post"].isin(target_indices)] # check if all layer numbers are consecutive ---- if not check_consecutive_layers(df): if not quiet: print( "Warning: The layer numbers are not consecutive. Will only use the" " consecutive layers from the last one." ) selected = [] # get the consecutive layers from the last one for l in range(df["layer"].max(), 0, -1): if l in df["layer"].unique(): selected.append(l) else: break df = df[df["layer"].isin(selected)] global_to_local_layer_number = { l: i for i, l in enumerate(sorted(df["layer"].unique())) } df.loc[:, ["local_layer"]] = df["layer"].map(global_to_local_layer_number) elif df["layer"].min() != 1: # if the layer numbers do not start from 1 if not quiet: print( "Warning: The layer numbers do not start from 1. Will only use the " "consecutive layers from the last one." ) global_to_local_layer_number = { l: i for i, l in enumerate(sorted(df["layer"].unique())) } df.loc[:, ["local_layer"]] = df["layer"].map(global_to_local_layer_number) else: df.loc[:, ["local_layer"]] = df["layer"] # while there is any layer whose post is not the same as the next layer's pre ---- while any( [ set(df[df.local_layer == i]["post"]) != set(df[df.local_layer == (i + 1)]["pre"]) for i in range(1, df.local_layer.max()) ] ): if keep is not None: if isinstance(keep, str): keep = [keep] keep = set(keep) if all( [ set(df[df.local_layer == i]["post"]).union(keep) == set(df[df.local_layer == (i + 1)]["pre"]).union(keep) for i in range(1, df.local_layer.max()) ] ): break else: keep = set() if keep_targets_in_middle: if target_indices is not None: keep = keep.union(target_indices) # start adding each layer to df_layers_update --- df_layers_update = [] # if there are only two layers if df.local_layer.max() == 2: df_layer = df[df.local_layer == 2] df_prev_layer = df[df.local_layer == 1] # filter by pre in the second layer df_layers_update.append( df_layer[df_layer["pre"].isin(set(df_prev_layer["post"]).union(keep))] ) # filter by post in the first layer df_layers_update.append( df_prev_layer[ df_prev_layer["post"].isin(set(df_layer["pre"]).union(keep)) ] ) df = pd.concat(df_layers_update) else: for i in range(2, df.local_layer.max()): df_layer = df[df.local_layer == i] df_next_layer = df[df.local_layer == i + 1] df_prev_layer = df[df.local_layer == i - 1] # pre that should be in the current layer: an intersection of # previous layer's post; and current layer's pre df_pre = set.intersection( set(df_prev_layer["post"]) & set(df_layer["pre"]) ) # post that should be in the current layer: an intersection of # current layer's post; and next layer's pre df_post = set.intersection( set(df_layer["post"]), set(df_next_layer["pre"]) ) if i == 2: # add edges in the first layer df_prev_layer = df_prev_layer[ df_prev_layer["post"].isin(df_pre.union(keep)) ] df_layers_update.append(df_prev_layer) df_layer = df_layer[ df_layer["pre"].isin(df_pre.union(keep)) & df_layer["post"].isin(df_post.union(keep)) ] if df_layer.shape[0] == 0: if not quiet: print( "No path found. Try lowering the threshold for the edges to be included in the path." ) return df_layers_update.append(df_layer) if i == (df.local_layer.max() - 1): # add edges in the last layer if target_indices is None: df_next_layer = df_next_layer[ df_next_layer["pre"].isin(df_post.union(keep)) ] else: df_next_layer = df_next_layer[ df_next_layer["pre"].isin( df_post.union(keep).union(target_indices) ) ] df_layers_update.append(df_next_layer) df = pd.concat(df_layers_update) # at this point, if no edges left: return None if df.shape[0] == 0: if not quiet: print("No path found. Try relaxing the criteria for edge inclusion.") return df = df.loc[:, df.columns != "local_layer"] # in case we removed all the connections in the last layer if df["layer"].max() != max_layer_num: if not quiet: print( "No path found. Try lowering the threshold for the edges to be included in the path." ) return return df
[docs] def remove_excess_neurons_batched( df: pd.DataFrame, keep=None, target_indices=None, keep_targets_in_middle: bool = False, quiet: bool = False, ) -> pd.DataFrame: """Does the same thing as `remove_excess_neurons()`, but for batched input (i.e. assumes column `batch` in `df`). Args: df (pd.DataFrame): a filtered dataframe with similar structure as the dataframe returned by `find_path_iteratively()`. Must contain a column `batch`. keep (list, set, pd.Series, numpy.ndarray, str, optional): A list of neuron indices that should be kept in the paths, even if they don't connect between input and target in the last layer. Defaults to None. target_indices (list, set, pd.Series, numpy.ndarray, str, optional): A list of target neuron indices that should be kept in the last layer. Defaults to None, in which case all neurons in the last layer in `df` would be kept. keep_targets_in_middle (bool, optional): If True, the target_indices are kept in the middle layers as well, even if they don't connect between input and target in the last layer. Defaults to False. quiet (bool, optional): If True, suppresses print statements. Defaults to False. Returns: pd.DataFrame: The filtered DataFrame containing the path data, including the layer number, pre-synaptic index, post-synaptic index, and weight """ # first check if 'batch' is in the columns if "batch" not in df.columns: raise ValueError("The column `batch` is not in the DataFrame.") dfs = [] for b in tqdm(df.batch.unique()): df_batch = df[df.batch == b] df_batch = remove_excess_neurons( df_batch, keep, target_indices, keep_targets_in_middle, quiet=quiet ) dfs.append(df_batch) return pd.concat(dfs)
[docs] def filter_paths( df: pd.DataFrame, threshold: float = 0, necessary_intermediate: Dict[int, arrayable] | None = None, quiet: bool = False, ) -> pd.DataFrame: """Filters the paths based on the weight threshold and the necessary intermediate neurons. The weight threshold refers to the direct connectivity between connected neurons in the path. It is recommended to not put too may neurons in necessary_intermediate, as it may be too stringent and remove all paths. Args: df (pd.DataFrame): The DataFrame containing the path data, including the layer number, pre-synaptic index, post-synaptic index, and weight. If df is empty, and `quiet` is False, raises a ValueError. If `quiet` is True, returns None. threshold (float, optional): The threshold for the weight of the direct connection between pre and post. Defaults to 0. necessary_intermediate (dict, optional): A dictionary of necessary intermediate neurons, where the keys are the layer numbers (starting neurons: 1; directly downstream: 2) and the values are the neuron indices (can be int, float, list, set, numpy.ndarray, or pandas.Series). Defaults to None. Returns: pd.DataFrame: The filtered DataFrame containing the path data, including the layer number, pre-synaptic index, post-synaptic index, and weight. """ if df.shape[0] == 0: if quiet: return else: # raise error raise ValueError("The input DataFrame for filter_paths() is empty! ") unique_layers = set(df.layer) if threshold > 0: df = df[df.weight > threshold] if df.shape[0] == 0: if not quiet: print("No edges left after thresholding. Try lowering the threshold.") return elif set(df["layer"]) != unique_layers: if not quiet: print( "Some layers have no edges left after thresholding. Try lowering the threshold." ) return df = remove_excess_neurons(df, quiet=quiet) if necessary_intermediate is not None: for layer, indices in necessary_intermediate.items(): if type(indices) != list: indices = list(to_nparray(indices)) if layer < 1: # error: layer has to be an integer >=1 raise ValueError("Layer has to be an integer >=1") if layer < (df["layer"].max() + 1): df = df[df["pre"].isin(indices) | (df["layer"] != layer)] elif layer == (df["layer"].max() + 1): # filter for targets df = df[df["post"].isin(indices) | ((df["layer"] + 1) != layer)] else: # error: layer number too big raise ValueError("Layer number too big") if set(df["layer"]) != unique_layers: if not quiet: print( "Some layers have no edges left after applying necessary_intermediate." ) return df = remove_excess_neurons(df, quiet=quiet) return df
[docs] def group_paths( paths: pd.DataFrame, pre_group: Optional[dict] = None, post_group: Optional[dict] = None, intermediate_group: Optional[dict] = None, avg_within_connected: bool = False, outprop: bool = False, combining_method: str = "mean", ) -> pd.DataFrame: """ Group the paths by user-specified variable (e.g. cell type, cell class etc.). If outprop=False, weights are summed across presynaptic neurons of the same group and combined across all postsynaptic neurons of the same group using combining_method (even if some postsynaptic neurons are not in `paths`). If outprop=True, weights are summed across postsynaptic neurons of the same group and combined across all presynaptic neurons of the same group using combining_method (even if some presynaptic neurons are not in `paths`). Args: paths (pd.DataFrame): The DataFrame containing the path data, looking like the output from `find_path_iteratively()`. pre_group (dict): A dictionary that maps pre-synaptic neuron indices to their respective group. post_group (dict): A dictionary that maps post-synaptic neuron indices to their respective group. intermediate_group (dict, optional): A dictionary that maps intermediate neuron indices to their respective group. Defaults to None. If None, it will be set to pre_group. avg_within_connected (bool, optional): If True, the average weight is calculated within the connected neurons of the same group. If False, the average weight is calculated across all neurons of the same group. Defaults to False. outprop (bool, optional): If True, get the summed output proportion (across recipient single cells in the same cell type) for each average sender. If False (default), get the summed input proportion across all senders for each average recipient. combining_method (str, optional): Method to combine inputs (outprop=False) or outputs (outprop=True). Can be 'sum', 'mean', or 'median'. Defaults to 'mean'. Returns: pd.DataFrame: The grouped DataFrame containing the path data, including the layer number, pre-synaptic index, post-synaptic index, and weight. """ # if path is empty, return it directly if paths is None or paths.shape[0] == 0: return paths assert combining_method in ["mean", "sum", "median"], ( "The combining_method should be either 'mean', 'sum' or 'median'. " f"Currently it is {combining_method}." ) # (new) auto-fill missing keys so every node has a group all_nodes = set(paths["pre"]).union(set(paths["post"])) if pre_group is None: pre_group = {n: n for n in all_nodes} else: pre_group = {**{n: n for n in all_nodes}, **pre_group} # user's values win if post_group is None: post_group = {n: n for n in all_nodes} else: post_group = {**{n: n for n in all_nodes}, **post_group} if intermediate_group is None: intermediate_group = dict(pre_group) else: intermediate_group = {**{n: n for n in all_nodes}, **intermediate_group} # make values in pre_group, post_group, and intermediate_group strings pre_group = {k: str(v) for k, v in pre_group.items()} post_group = {k: str(v) for k, v in post_group.items()} intermediate_group = {k: str(v) for k, v in intermediate_group.items()} # add cell type information if "layer" in paths.columns: # first use intermediate_group, then modify specifically for pre at the # first layer, and post at the last layer paths["pre_type"] = paths["pre"].map(intermediate_group).astype(str) paths["post_type"] = paths["post"].map(intermediate_group).astype(str) # group source by pre_group paths.loc[paths["layer"] == paths["layer"].min(), "pre_type"] = ( paths.loc[paths["layer"] == paths["layer"].min(), "pre"] .map(pre_group) .astype(str) ) # group target by post_group paths.loc[paths["layer"] == paths["layer"].max(), "post_type"] = ( paths.loc[paths["layer"] == paths["layer"].max(), "post"] .map(post_group) .astype(str) ) group_columns = ["layer", "pre_type", "post_type"] else: paths["pre_type"] = paths["pre"].map(pre_group).astype(str) paths["post_type"] = paths["post"].map(post_group).astype(str) group_columns = ["pre_type", "post_type"] has_activation = "pre_activation" in paths.columns if has_activation: paths_act = paths.copy() if not outprop: # in this case, calculating the summed input proportion across all senders for # each average recipient # regardless of combining method, need to sum within pre_type anyway paths = paths.groupby(group_columns + ["post"])["weight"].sum().reset_index() if not avg_within_connected: # sometimes only one neuron in a type is connected to another type, so # only this connection is in paths # but to calculate the average weight between two types, we should # take into account all the neurons of the post-synaptic type post_group_df = pd.DataFrame( { "post": list(post_group.keys()), "post_type": list(post_group.values()), } ) if (intermediate_group != post_group) and ("layer" in paths.columns): intermediate_group_df = pd.DataFrame( { "post": list(intermediate_group.keys()), "post_type": list(intermediate_group.values()), } ) not_last = paths[paths["layer"] != paths["layer"].max()] not_last_prepost = not_last[group_columns].drop_duplicates() not_last_prepost = not_last_prepost.merge( intermediate_group_df, on=["post_type"], how="left" ) not_last = not_last_prepost.merge( not_last, on=group_columns + ["post"], how="left" ).fillna(0) last = paths[paths["layer"] == paths["layer"].max()] last_prepost = last[group_columns].drop_duplicates() last_prepost = last_prepost.merge( post_group_df, on=["post_type"], how="left" ) last = last_prepost.merge( last, on=group_columns + ["post"], how="left" ).fillna(0) paths = pd.concat([not_last, last], ignore_index=True) else: prepost = paths[group_columns].drop_duplicates() prepost = prepost.merge(post_group_df, on=["post_type"], how="left") paths = prepost.merge( paths, on=group_columns + ["post"], how="left" ).fillna(0) paths = ( paths.groupby(group_columns)["weight"].agg(combining_method).reset_index() ) else: # outprop # in this case, calculating the summed output proportion (across recipient # single cells in the same cell type) for each average sender # regardless of combining method, need to sum within post_type anyway paths = paths.groupby(group_columns + ["pre"])["weight"].sum().reset_index() if not avg_within_connected: # sometimes only one neuron in a type is connected to another type, so # only this connection is in paths # but to calculate the average weight between two types, we should # take into account all the neurons of the pre-synaptic type pre_group_df = pd.DataFrame( { "pre": list(pre_group.keys()), "pre_type": list(pre_group.values()), } ) if (intermediate_group != pre_group) and ("layer" in paths.columns): intermediate_group_df = pd.DataFrame( { "pre": list(intermediate_group.keys()), "pre_type": list(intermediate_group.values()), } ) not_first = paths[paths["layer"] != paths["layer"].min()] not_first_prepost = not_first[group_columns].drop_duplicates() not_first_prepost = not_first_prepost.merge( intermediate_group_df, on=["pre_type"], how="left" ) not_first = not_first_prepost.merge( not_first, on=group_columns + ["pre"], how="left" ).fillna(0) first = paths[paths["layer"] == paths["layer"].min()] first_prepost = first[group_columns].drop_duplicates() first_prepost = first_prepost.merge( pre_group_df, on=["pre_type"], how="left" ) first = first_prepost.merge( first, on=group_columns + ["pre"], how="left" ).fillna(0) paths = pd.concat([not_first, first], ignore_index=True) else: prepost = paths[group_columns].drop_duplicates() prepost = prepost.merge(pre_group_df, on=["pre_type"], how="left") paths = prepost.merge( paths, on=group_columns + ["pre"], how="left" ).fillna(0) paths = ( paths.groupby(group_columns)["weight"].agg(combining_method).reset_index() ) if has_activation: paths_act = ( paths_act.groupby(group_columns) .agg( pre_activation=("pre_activation", "mean"), post_activation=("post_activation", "mean"), ) .reset_index() ) # then merge the weights paths = pd.merge(paths_act, paths, on=group_columns) paths.rename(columns={"pre_type": "pre", "post_type": "post"}, inplace=True) return paths
[docs] def compare_layered_paths( paths: List[pd.DataFrame], priority_indices=None, neuron_to_sign: dict | None = None, sign_color_map: dict = {1: "red", -1: "blue"}, el_colours: List[str] = ["rosybrown", "burlywood"], legend_labels: List[str] = ["Path 1", "Path 2"], weight_decimals: int = 2, figsize: tuple = (10, 8), label_pos: List[float] = [0.7, 0.7], ): """ Compare two layered paths by overlaying them and annotating the weights. The paths should be in the format of the output from `find_path_iteratively()`. The width of the edges is based on the weight in the first path, when the connection is present in both paths. Args: paths (List[pd.DataFrame]): A list of two DataFrames containing the path data, including columns 'layer', 'pre', 'post', and 'weight'. priority_indices (list, set, pd.Series, numpy.ndarray, optional): A list of neuron indices that should be plotted on top of each layer. Defaults to None. neuron_to_sign (dict, optional): A dictionary that maps neuron indices to their signs. Defaults to None. sign_color_map (dict, optional): A dictionary that maps neuron signs to their respective colors. Defaults to {1: 'red', -1: 'blue'}. el_colours (List[str], optional): A list of two colors for the edge labels of the two paths. Defaults to ['rosybrown', 'burlywood']. legend_labels (List[str], optional): A list of two labels for the legend. Defaults to ['Path 1', 'Path 2']. weight_decimals (int, optional): The number of decimal places to round the edge weights to. Defaults to 2. figsize (tuple, optional): The size of the figure. Defaults to (10, 8). Returns: None: The function plots the graph. """ # add layer columns if necessary new_paths = [] for path in paths: # Create a 'post_layer' column to use as unique identifiers if "post_layer" not in path.columns: path["post_layer"] = ( path["post"].astype(str) + "_" + path["layer"].astype(str) ) if "pre_layer" not in path.columns: path["pre_layer"] = ( path["pre"].astype(str) + "_" + (path.layer - 1).astype(str) ) new_paths.append(path) composite_paths = pd.concat(new_paths) # get unique edges composite_paths.drop_duplicates(["pre_layer", "post_layer"], inplace=True) composite_G = nx.from_pandas_edgelist( composite_paths, "pre_layer", "post_layer", ["weight"], create_using=nx.DiGraph(), ) # Labels for nodes labels = dict(zip(composite_paths.post_layer, composite_paths.post)) labels.update(dict(zip(composite_paths.pre_layer, composite_paths.pre))) # Determine the width of the edges weights = [composite_G[u][v]["weight"] for u, v in composite_G.edges()] weight_min = min(weights) weight_max = max(weights) if weight_min == weight_max: widths = [1.0] * len(weights) else: widths = [1 + 9 * (w - weight_min) / (weight_max - weight_min) for w in weights] # Generate positions positions = create_layered_positions(composite_paths, priority_indices) # Edge colors based on neuron signs default_color = "lightgrey" # Default edge color # specify edge colour based on pre-neuron sign, if available edge_colors = [] for u, _ in composite_G.edges(): pre_neuron = labels[u] if neuron_to_sign and pre_neuron in neuron_to_sign: edge_colors.append( sign_color_map.get(neuron_to_sign[pre_neuron], default_color) ) else: edge_colors.append(default_color) # Plot the graph _, ax = plt.subplots(figsize=figsize) nx.draw( composite_G, pos=positions, labels=labels, with_labels=True, node_size=100, node_color="lightblue", arrows=True, arrowstyle="-|>", arrowsize=10, font_size=8, width=widths, ax=ax, edge_color=edge_colors, ) G1 = nx.from_pandas_edgelist( paths[0], "pre_layer", "post_layer", ["weight"], create_using=nx.DiGraph(), ) G2 = nx.from_pandas_edgelist( paths[1], "pre_layer", "post_layer", ["weight"], create_using=nx.DiGraph(), ) el1 = { (u, v): f'{data["weight"]:.{weight_decimals}f}' for u, v, data in G1.edges(data=True) } el2 = { (u, v): f'{data["weight"]:.{weight_decimals}f}' for u, v, data in G2.edges(data=True) } nx.draw_networkx_edge_labels( G1, pos=positions, edge_labels=el1, horizontalalignment="left", verticalalignment="top", font_color=el_colours[0], ax=ax, label_pos=label_pos[0], ) nx.draw_networkx_edge_labels( G2, pos=positions, edge_labels=el2, horizontalalignment="right", verticalalignment="bottom", font_color=el_colours[1], ax=ax, label_pos=label_pos[1], ) ax.set_ylim(0, 1) # add lengend using the el_colours ax.legend( handles=[ mpatches.Patch(color=el_colours[0], label=legend_labels[0]), mpatches.Patch(color=el_colours[1], label=legend_labels[1]), ], loc="upper right", ) plt.show()
[docs] @dataclass class XORCircuit: # Input nodes input1: list input2: list # Middle layer nodes exciter1: str # Takes input from input1 only exciter2: str # Takes input from input2 only inhibitor: str # Takes input from both inputs # Output node output: list
[docs] def find_xor(paths: pd.DataFrame) -> List[XORCircuit]: """ Find XOR-like circuits in a 3-layer network, based on [Wang et al. 2024] (https://www.biorxiv.org/content/10.1101/2024.09.24.614724v2). Note: this function currently ignores middle excitatory neruons that receive both inputs. Args: paths: DataFrame with columns ['pre', 'post', 'sign', 'layer'] pre: source node post: target node sign: 1 (excitatory) or -1 (inhibitory) layer: 1 (input->middle) or 2 (middle->output) Returns: List of XORCircuit objects, each representing a found XOR motif """ # checking input ---- if set(paths["layer"]) != {1, 2}: raise ValueError( "The input DataFrame should have exactly 2 unique layers, 1 and 2" ) # check column names of paths if not {"pre", "post", "sign", "layer"}.issubset(paths.columns): raise ValueError( "The input DataFrame should have columns ['pre', 'post', 'sign', " "'layer']" ) # check values of signs: if it's a subset of {1, -1} if not set(paths["sign"]) <= {1, -1}: raise ValueError( f"The input DataFrame should have values 1 (excitatory) or -1 (inhibitory) in the 'sign' column, but got {set(paths['sign'])}" ) # define variables ---- circuits: list[XORCircuit] = [] exciters = paths["pre"][(paths["layer"] == 2) & (paths["sign"] == 1)].unique() inhibitors = paths["pre"][(paths["layer"] == 2) & (paths["sign"] == -1)].unique() l1 = paths[paths["layer"] == 1] l2 = paths[paths["layer"] == 2] exciter_us_dict: dict[int | str, set[int | str]] = { exc: set(l1["pre"][l1["post"] == exc]) for exc in exciters } inhibitor_us_dict = {inh: set(l1["pre"][l1["post"] == inh]) for inh in inhibitors} exciter_ds_dict = {exc: set(l2["post"][l2["pre"] == exc]) for exc in exciters} inhibitor_ds_dict = {inh: set(l2["post"][l2["pre"] == inh]) for inh in inhibitors} # main algorithm ---- for e1, e2 in itertools.combinations(exciters, 2): common = exciter_us_dict[e1] & exciter_us_dict[e2] onlye1 = exciter_us_dict[e1] - common onlye2 = exciter_us_dict[e2] - common for i in inhibitors: e1_i_intersect = onlye1 & inhibitor_us_dict[i] e2_i_intersect = onlye2 & inhibitor_us_dict[i] if (len(e1_i_intersect) == 0) or (len(e2_i_intersect) == 0): continue targets = exciter_ds_dict[e1] & exciter_ds_dict[e2] & inhibitor_ds_dict[i] if not targets: continue circuits.append( XORCircuit( input1=list(e1_i_intersect), input2=list(e2_i_intersect), exciter1=e1, exciter2=e2, inhibitor=i, output=list(targets), ) ) return circuits
[docs] def path_for_ngl(path): """ Convert a path DataFrame to one that can be used to visualize the path in neuroglancer with `get_ngl_link(df_format = 'long')`. Neurons are coloured by their (indirect) connectivity (calculated using `effective_conn_from_paths()`) to an average neuron in the last layer. `pre` and `post` columns must contain neuron ids. This function can be used for visualizing signal propagation in a pathway. Args: path (pd.DataFrame): The DataFrame containing the path data, including the layer number, pre-synaptic index, post-synaptic index, and weight. Returns: pd.DataFrame: A DataFrame with columns 'neuron_id', 'layer', and 'activation' (which is (indirect) connectivity in this case), suitable for Neuroglancer visualization. """ out = [] from .compress_paths import effective_conn_from_paths for alayer in path.layer.unique(): path_sub = path[path.layer >= alayer] effconn = effective_conn_from_paths(path_sub) # get mean across columns, for each row effconn = effconn.mean(axis=1).to_frame() effconn.loc[:, ["layer"]] = alayer out.append(effconn) out = pd.concat(out, axis=0).reset_index() out.columns = ["neuron_id", "activation", "layer"] out = pd.concat( [ out, ( pd.DataFrame( { "neuron_id": path_sub.post, "activation": 1, "layer": alayer + 1, } ) ), ] ) return out
[docs] def connected_components( paths: pd.DataFrame, threshold: float = 0, quiet: bool = False, ) -> list: """ Find connected components in a directed graph represented by a DataFrame of paths. The DataFrame should contain columns 'pre', 'post', 'layer', and 'weight'. The function filters the paths based on a weight threshold and then constructs a directed graph using NetworkX. It identifies weakly connected components in the graph and returns a list of DataFrames of paths, each representing a connected component. Args: paths (pd.DataFrame): The DataFrame containing the path data, including the layer number, pre-synaptic index, post-synaptic index, and weight. threshold (float, optional): The threshold for the weight of the direct connection between pre and post. Defaults to 0. Returns: list: A list of DataFrames, each representing a connected component in the directed graph. """ paths = filter_paths(paths, threshold) paths_unique_edges = paths.drop_duplicates(subset=["pre", "post"]) # Create a graph from the DataFrame G = nx.from_pandas_edgelist( paths_unique_edges, source="pre", target="post", edge_attr="weight", create_using=nx.DiGraph(), ) # Find connected components weak_components = list(nx.weakly_connected_components(G)) components = [] for i, component in enumerate(weak_components): path = paths[paths.pre.isin(component) & paths.post.isin(component)] path.loc[:, ["component_idx"]] = i path = remove_excess_neurons(path, quiet=quiet) components.append(path) return components
[docs] def el_within_n_steps( inprop: spmatrix, inidx: arrayable, outidx: arrayable, n: int, threshold: float = 0, pre_group: dict = None, post_group: dict = None, return_raw_el: bool = False, combining_method: str = "mean", avg_within_connected: bool = False, all_connections_between_groups: bool = False, quiet: bool = False, ): """ Find paths within a specified number of steps in a directed graph, starting from input indices and ending at output indices. The unique edges are returned. Filtering by `threshold` happens after grouping if `pre_group` and `post_group` are provided. `avg_within_connected` calculates the weight based on the connected neurons, instead of all neurons in any group involved. This might be useful when obtaining paths from one single neuron in a cell type, while there are many other neurons in the same type. This might be especially useful for optic lobe connectivity analysis that's spatially local. Two neuron groups might be connected to different extents in different layers (when a pair of cell types are connected with different individual neurons in different layers.). In that case the highest weight is returned by default, but see `all_connections_between_groups` argument. Args: inprop (spmatrix): The connectivity matrix, with presynaptic in the rows. inidx (arrayable): The input neuron indices to start the paths from. outidx (arrayable): The output neuron indices to end the paths at. n (int): The maximum number of hops. n=1 for direct connections. threshold (float, optional): The threshold for the weight of the direct connection between pre and post. If `pre_group` or `post_group` are provided, filtering happens after grouping. Defaults to 0. pre_group (dict, optional): A dictionary mapping pre neuron indices to their respective groups. Defaults to None. post_group (dict, optional): A dictionary mapping post neuron indices to their respective groups. Defaults to None. return_raw_el (bool, optional): If True, returns the raw edges before grouping. Defaults to False. combining_method (str, optional): Method to combine inputs (outprop=False) or outputs (outprop=True). Can be 'sum', 'mean', or 'median'. Defaults to 'mean'. avg_within_connected (bool, optional): If True, the weight is calculated within the *connected* neurons of the same group. If False, the weight is calculated across *all* neurons of the same group. Defaults to False. all_connections_between_groups (bool, optional): If True, use all connections between groups inidx and outidx are in, even if inidx doesn't cover all neurons in that group. For example, if inidx is *one* L1 neuron, and pre_group maps indices to cell type, outidx is *one* Tm3 neuron, then the function will return L1->Tm3 connections for *all* L1 and Tm3 neurons. Defaults to False. quiet (bool, optional): If True, suppresses output messages. Defaults to False. Returns: pd.DataFrame or tuple: A DataFrame containing the edges of the paths found, including columns 'pre', 'post', and 'weight'. If `return_raw_el` is True, returns a tuple of two DataFrames: the first is the grouped edges, and the second is the raw edges before grouping. If no paths are found, returns None. """ inidx = to_nparray(inidx) outidx = to_nparray(outidx) all_paths = [] raw_el = [] for i in tqdm(range(n), disable=quiet): paths = find_paths_of_length(inprop, inidx, outidx, i + 1) if paths is None or paths.shape[0] == 0: continue if return_raw_el: raw_el.append(paths) if pre_group is not None and post_group is not None: paths = group_paths( paths, pre_group, post_group, avg_within_connected=avg_within_connected, combining_method=combining_method, ) paths = filter_paths(paths, threshold, quiet=quiet) if paths is not None: all_paths.append(paths) if len(all_paths) == 0: return None all_paths = pd.concat(all_paths, axis=0) el = all_paths.groupby(["pre", "post"])["weight"].max().reset_index() if all_connections_between_groups: from .compress_paths import result_summary new_inidx = [idx for idx, grp in pre_group.items() if grp in el.pre] new_outidx = [idx for idx, grp in post_group.items() if grp in el.post] mat = result_summary( inprop, new_inidx, new_outidx, pre_group, post_group, display_threshold=0, display_output=False, ) # make longer mat_long = mat.melt(ignore_index=False).reset_index() mat_long.columns = ["pre", "post", "weight"] el = el[["pre", "post"]].merge(mat_long, on=["pre", "post"], how="left") if return_raw_el: raw_el = pd.concat(raw_el, axis=0) # pre, post are single neurons, so the rows should be real duplicates raw_el = raw_el.drop_duplicates(subset=["pre", "post", "weight"]) return el, raw_el else: return el
[docs] def find_shortest_paths( paths: pd.DataFrame, start_nodes: list[str], end_nodes: list[str] ) -> list[list[str]]: """ Find the shortest paths between groups in start_nodes and end_nodes in a paths dataframe (paths is the output of find_path_iteratively). Args: paths (pd.DataFrame): DataFrame containing the path data, including columns 'weight', 'pre', and 'post'. start_nodes (list): List of 'pre' groups. end_nodes (list): List of 'post' groups. Returns: list: A list of shortest paths, where each path is a list of groups that connect the start and end nodes (ordered from start to end). """ if paths.shape[0] == 0: return [] paths_unique = paths[["weight", "pre", "post"]].drop_duplicates() nodes_unique = np.unique( np.concatenate([paths_unique.pre.unique(), paths_unique.post.unique()]) ) pre = np.searchsorted(nodes_unique, paths_unique.pre.values) post = np.searchsorted(nodes_unique, paths_unique.post.values) graph_matrix = csr_matrix( (1 / paths_unique["weight"].values, (pre, post)), shape=(len(nodes_unique), len(nodes_unique)), ) def reconstruct_single_path(start_idx, end_idx): """Compute shortest path for a single start-end pair.""" # Compute shortest path only from this start node _, pred = shortest_path( csgraph=graph_matrix, directed=True, indices=start_idx, return_predecessors=True, ) if pred[end_idx] == -9999: # not reached return None path = [nodes_unique[end_idx]] i = end_idx while i != start_idx: i = pred[i] if i == -9999: return None path.append(nodes_unique[i]) return path[::-1] shortest_paths = [] for start_node in start_nodes: idx_start = np.where(nodes_unique == start_node)[0] if len(idx_start) == 0: continue start_idx = idx_start[0] for end_node in end_nodes: if start_node != end_node: idx_end = np.where(nodes_unique == end_node)[0] if len(idx_end) == 0: continue path = reconstruct_single_path(start_idx, idx_end[0]) if path is not None: shortest_paths.append(path) return shortest_paths
[docs] def count_paths( edgelist: pd.DataFrame, start_layer: int = 1, end_layer: Optional[int] = None, loop_mode: str = "allow", ) -> Union[int, Tuple[int, int]]: """ Counts all paths without materializing them in memory. Uses memoization to avoid redundant computation. This function was written mostly by Claude Sonnet 4.5 under instructions. Args: edgelist (pd.DataFrame): Must contain "layer", "pre", "post" columns. start_layer (int): Starting layer. end_layer (int): Ending layer. If None, uses max layer. loop_mode (str, optional): How to handle loops in paths. Options: - "allow" (default): Count all paths including those with loops. - "exclude": Count only loop-free paths. - "both": Return tuple (count_with_loops, count_without_loops) computed efficiently in a single DFS pass. Note: a node appearing at both the start and end of a path is allowed (e.g., A-B-C-D-A is not considered a loop), but a node appearing again in the middle is a loop (e.g., A-B-C-A-A has a loop). Returns: int or tuple: If loop_mode is "allow" or "exclude", returns an int with the total number of valid paths. If loop_mode is "both", returns a tuple (count_with_loops, count_without_loops). """ if end_layer is None: end_layer = edgelist["layer"].max() if start_layer > end_layer: raise ValueError("start_layer must be less than or equal to end_layer.") if loop_mode not in ["allow", "exclude", "both"]: raise ValueError( f"loop_mode must be 'allow', 'exclude', or 'both', got '{loop_mode}'" ) # Build adjacency adj: Dict[int, Dict[Union[str, int], List]] = defaultdict(lambda: defaultdict(list)) for _, row in edgelist.iterrows(): adj[row["layer"]][row["pre"]].append(row["post"]) if loop_mode == "both": # Compute both counts in a single DFS pass # Use memoization for the with-loops count memo_with_loops = {} def dfs_both( node: Union[str, int], layer: int, visited: set, start_node: Union[str, int] ) -> Tuple[int, int]: """ Returns (count_with_loops, count_without_loops) for paths starting from node. """ # Check if this is a loop for the no-loops count is_loop = node in visited # For with-loops count, use memoization if (node, layer) in memo_with_loops: count_with = memo_with_loops[(node, layer)] else: # add 1 when reaches the end if layer == end_layer: count_with = 1 else: count_with = 0 # iterate over all postsynaptic nodes for post in adj.get(layer + 1, {}).get(node, []): # For with-loops, we don't care about visited history # So we can recurse with empty visited set for memoization count_with += dfs_both(post, layer + 1, set(), start_node)[0] memo_with_loops[(node, layer)] = count_with # For without-loops count, cannot use memoization if is_loop: # If we're at the end layer and this node is the start node, that's OK if layer == end_layer and node == start_node: count_without = 1 else: # Otherwise, this is a loop count_without = 0 elif layer == end_layer: count_without = 1 else: # Add current node to visited set for this path visited.add(node) count_without = 0 # iterate over all postsynaptic nodes for post in adj.get(layer + 1, {}).get(node, []): # recursively count paths from post count_without += dfs_both(post, layer + 1, visited, start_node)[1] # Remove node from visited when backtracking visited.remove(node) return (count_with, count_without) path_count_with = 0 path_count_without = 0 for pre in adj.get(start_layer, {}): for post in adj.get(start_layer, {}).get(pre, []): with_loops, without_loops = dfs_both(post, start_layer, {pre}, pre) path_count_with += with_loops path_count_without += without_loops return (path_count_with, path_count_without) elif loop_mode == "allow": # Original memoized approach when loops are allowed memo = {} def dfs(node: Union[str, int], layer: int) -> int: # Memoization - crucial for graphs with convergent paths if (node, layer) in memo: return memo[(node, layer)] # add 1 when reaches the end if layer == end_layer: return 1 count = 0 # iterate over all postsynaptic nodes for post in adj.get(layer + 1, {}).get(node, []): # iterate over their postsynaptic nodes count += dfs(post, layer + 1) memo[(node, layer)] = count return count path_count = 0 for pre in adj.get(start_layer, {}): for post in adj.get(start_layer, {}).get(pre, []): path_count += dfs(post, start_layer) return path_count else: # loop_mode == "exclude" # When loops are not allowed, track visited nodes per path # Cannot use memoization because path validity depends on history def dfs_no_loops( node: Union[str, int], layer: int, visited: set, start_node: Union[str, int] ) -> int: # Check if we've seen this node before in the current path # Allow the node to appear at the end even if it was the start node if node in visited: # If we're at the end layer and this node is the start node, that's OK if layer == end_layer and node == start_node: return 1 # Otherwise, this is a loop return 0 # add 1 when reaches the end if layer == end_layer: return 1 # Add current node to visited set for this path visited.add(node) count = 0 # iterate over all postsynaptic nodes for post in adj.get(layer + 1, {}).get(node, []): # recursively count paths from post count += dfs_no_loops(post, layer + 1, visited, start_node) # Remove node from visited when backtracking visited.remove(node) return count path_count = 0 for pre in adj.get(start_layer, {}): for post in adj.get(start_layer, {}).get(pre, []): # Start with pre node in visited set, and track which node started the path path_count += dfs_no_loops(post, start_layer, {pre}, pre) return path_count
[docs] def filter_all_paths_to_cumsum( all_paths: pd.DataFrame | list[pd.DataFrame], thre_cumsum: float = 0.5, thre_step_min: float = 0.0, necessary_intermediate: Dict[int, arrayable] | None = None, ): """ Filters paths such that intermediate neurons are specified in necessary_intermediate and the filtered cumulative effective weight > thre_cumsum of the total effective weight; the minimum edge weight across the selected paths is either the minimum along those filtered paths or the minimum threshold thre_step_min, whichever is larger. As a rough guide on how to set thre_cumsum, it can be set larger for fewer paths in all_paths, and smaller for more paths. Args: all_paths (pd.DataFrame | list[pd.DataFrame]): The DataFrame or list of DataFrames containing the path data, where each DataFrame is like the output from `find_paths_of_length()`, i.e., contains paths of a specific length. thre_cumsum (float): The cumulative effective weight threshold to reach for filtered paths. Should be a number between 0 and 1. Defaults to 0.5. thre_step_min (float, optional): The minimum threshold for the weight of the direct connection between pre and post. Defaults to 0.0. necessary_intermediate (dict, optional): A dictionary of necessary intermediate neurons, where the keys are the layer numbers (starting neurons: 1; directly downstream: 2) and the values are the neuron indices (can be int, float, list, set, numpy.ndarray, or pandas.Series). Defaults to None. Returns: tuple: paths: list of pd.DataFrame Filtered paths. w_filter: float Total effective weight of the filtered paths. w_all: float Total effective weight of all paths before filtering. thre_step : float Minimum edge weight threshold used to filter paths, bounded below by thre_step_min. """ from .external_paths import effective_conn_per_path_from_paths df_bool = False if isinstance(all_paths, pd.DataFrame): df_bool = True all_paths = [all_paths] # compute total effective weight across all paths w_all = 0.0 w_prod = [] w_min = [] for i in range(len(all_paths)): w_all_i, w_prod_i, w_min_i = effective_conn_per_path_from_paths(all_paths[i]) w_all += w_all_i w_prod.append(w_prod_i) w_min.append(w_min_i) w_prod = np.concatenate(w_prod, axis=0) w_min = np.concatenate(w_min, axis=0) # find minimum edge weight across strongest paths that make up thre_cumsum of total weight idx_sort = np.argsort(-w_prod) idx_thre = np.where(np.cumsum(w_prod[idx_sort] / w_all) > thre_cumsum)[0][0] # since in filter_paths, threshold > thre_step, take 99.9%, with a minimum of thre_step_min thre_step = max(0.999 * np.min(w_min[idx_sort[: idx_thre + 1]]), thre_step_min) # filter paths and compute total effective weight across those paths w_filter = 0.0 for i in range(len(all_paths)): all_paths[i] = filter_paths(all_paths[i], thre_step, necessary_intermediate) w_filter_i, _, _ = effective_conn_per_path_from_paths(all_paths[i]) w_filter += w_filter_i if df_bool: all_paths = all_paths[0] return all_paths, w_filter, w_all, thre_step