# Standard library imports
import numpy as np
import pandas as pd
import os
from matplotlib import pyplot as plt
from typing import Dict, Union, Optional
# Third-party package imports
from scipy.sparse import spmatrix
from tqdm import tqdm
from .utils import to_nparray, arrayable
from .path_finding import (
group_paths,
filter_paths,
find_paths_of_length,
filter_all_paths_to_cumsum,
)
from .compress_paths import effective_conn_from_paths
[docs]
def compute_flow_hitting_time(
conn_df: Union[pd.DataFrame, spmatrix],
flow_seed_idx: np.ndarray[int],
flow_steps: int,
flow_thre: float,
):
"""
Compute hitting time for all cells in conn_df.
Hitting time is the average number of hops required to reach a cell from a set of seed cells.
The main algorithm is implemented in the 'navis' library (https://github.com/navis-org/navis).
Args:
conn_df (pd.DataFrame): DataFrame containing the connections with columns 'pre', 'post', and 'weight'.
flow_seed_idx (np.ndarray): Array of seed cell indices.
flow_steps (int): Number of steps for flow calculation.
flow_thre (float): Threshold for activation in flow calculation.
Returns:
pd.DataFrame:
DataFrame with columns 'idx' and 'hitting_time', where 'idx' is the cell
index and 'hitting_time' is the computed hitting time.
"""
try:
from navis.models import BayesianTraversalModel, linear_activation_p
except ImportError as e:
raise ImportError(
"The 'navis' library is required for computing information flow."
) from e
# choose threshold for flow activation and define model
def my_act(x):
return linear_activation_p(x, min_w=0, max_w=flow_thre)
if isinstance(conn_df, spmatrix):
# convert sparse matrix to DataFrame
coo = conn_df.tocoo()
conn_df = pd.DataFrame({"pre": coo.row, "post": coo.col, "weight": coo.data})
elif isinstance(conn_df, pd.DataFrame):
# make sure the column names are correct
if not all(col in conn_df.columns for col in ["pre", "post", "weight"]):
raise ValueError(
"Input DataFrame must contain columns 'pre', 'post', and 'weight'."
)
else:
raise TypeError(
"Input must be a sparse matrix or a DataFrame with columns 'pre', 'post', and 'weight'."
)
print(
f"Computing hitting time for {len(set(conn_df.pre)|set(conn_df.post))} cells... May take a while."
)
edges = conn_df[["pre", "post", "weight"]]
edges.columns = ["source", "target", "weight"]
model = BayesianTraversalModel(
edges, flow_seed_idx, max_steps=flow_steps + 1, traversal_func=my_act
)
res = model.run()
# compute hitting times from cmf (cumulative mass function)
cmf_arr = np.stack(res["cmf"].values)
hitting_prob = np.zeros(cmf_arr.shape)
hitting_prob[:, 1:] = np.diff(cmf_arr, axis=1)
hitting_time = np.dot(hitting_prob, np.arange(0, flow_steps + 1))
# cmf of unreached and seed cell types is 0
# change hitting time of unreached cell types to flow_steps
idx_unreached = np.where(
(hitting_time < 0.1) & (~np.isin(res["node"], flow_seed_idx))
)[0]
if len(idx_unreached) > 0:
hitting_time[idx_unreached] = flow_steps
flow_df = pd.DataFrame({"idx": res["node"].values, "hitting_time": hitting_time})
return flow_df
[docs]
def find_instance_flow(
inprop: Union[spmatrix, pd.DataFrame],
idx_to_group: dict,
flow_seed_groups: list[str] = [
"L1",
"L2",
"L3",
"R7p",
"R8p",
"R7y",
"R8y",
"R7d",
"R8d",
"HBeyelet",
],
file_path: str = None,
save_flow: Optional[bool] = True,
save_prefix: Optional[str] = "flow_",
flow_steps: int = 20,
flow_thre: float = 0.1,
) -> pd.DataFrame:
"""
Get the hitting time for all cell groups. The hitting time is computed using the
information flow algorithm (navis:https://github.com/navis-org/navis) for each
neuron, and then taking the median across neurons of each cell group.
Args:
inprop (Union[spmatrix, pd.DataFrame]): Input sparse matrix or DataFrame
representing connections. If a DataFrame, it should have columns 'pre',
'post', and 'weight'.
idx_to_group (dict): Dictionary mapping cell indices to their respective cell
groups.
flow_seed_groups (list): List of cell groups to be used as seeds for flow
calculation.
file_path (str): Path to the directory containing the hitting time data.
save_flow (bool): Whether to save the computed hitting time to a CSV file.
Defaults to True.
save_prefix (str): Prefix for the saved file names.
flow_seed_groups (list): List of cell group to be used as seeds for flow
calculation.
flow_steps (int): Number of steps for flow calculation.
flow_thre (float): Threshold for flow calculation.
Returns:
pd.DataFrame:
DataFrame with columns 'cell_group' and 'hitting_time', where 'cell_group'
is the name of the cell group and 'hitting_time' is the median hitting time
for that group.
"""
if file_path is None:
# Check if running in Google Colab
if "COLAB_GPU" in os.environ:
# Running in Colab
file_path = "/content/"
else:
# Running locally
file_path = "./"
# load hitting time or compute
flow_file_name = os.path.join(
file_path, f"{save_prefix}{flow_steps}step_{flow_thre}thre_hit.csv"
)
if os.path.exists(flow_file_name):
flow_df = pd.read_csv(flow_file_name)
if "cell_group" not in flow_df.columns:
flow_df["cell_group"] = flow_df["idx"].map(idx_to_group)
else:
# if file_path doesn't exist, create it
if not os.path.exists(file_path):
os.makedirs(file_path)
# get indices whose value is in flow_seed_groups
flow_seed_idx = np.array(
[
idx
for idx, cell_type in idx_to_group.items()
if cell_type in flow_seed_groups
]
)
if len(flow_seed_idx) == 0:
raise ValueError(
f"No flow seed groups found in the provided idx_to_group mapping. "
f"Please check the flow_seed_groups: {flow_seed_groups}."
)
flow_df = compute_flow_hitting_time(
inprop, flow_seed_idx, flow_steps, flow_thre
)
# add cell group to flow_df
# note: this only includes indices in flow_seed_idx, which could in theory be a
# subset of cells in a cell type
flow_df["cell_group"] = flow_df["idx"].map(idx_to_group)
if save_flow:
print(f"Saving hitting time to {flow_file_name}")
flow_df.to_csv(flow_file_name, index=False)
flow_type_df = (
flow_df.groupby("cell_group")["hitting_time"]
.median()
.to_frame()
.reset_index()
.sort_values("hitting_time")
)
flow_type_file_name = os.path.join(
file_path, f"{save_prefix}{flow_steps}step_{flow_thre}thre_hit_per_group.csv"
)
if not os.path.exists(flow_type_file_name):
if save_flow:
print(f"Saving flow group hitting time to {flow_type_file_name}")
flow_type_df.to_csv(flow_type_file_name, index=False)
return flow_type_df
[docs]
def layered_el(
inprop: spmatrix,
inidx: arrayable,
outidx: arrayable,
n: int,
idx_to_group: dict,
thre_cumsum: float | None = None,
thre_step_min: float = 0.0,
combining_method: str = "mean",
flow_steps: int = 20,
flow_thre: float = 0.1,
flow: pd.DataFrame | None = None,
):
"""
Similar to `el_within_n_steps` but using filter_paths_to_cumsum and layers based
on the information flow hitting time (navis: https://github.com/navis-org/navis).
If thre_cumsum is None, then paths are filtered based on direct weight thre_step_min;
otherwise, paths are filtered such that the cumulative effective weight reaches
thre_cumsum.
Args:
inprop (spmatrix): Input sparse matrix representing connections.
inidx (np.ndarray): Array of input indices.
outidx (np.ndarray): Array of output indices.
n (int): The maximum number of hops. n=1 for direct connections.
idx_to_group (dict): Dictionary mapping cell indices to their respective cell
groups.
thre_cumsum (float): The cumulative effective weight threshold to reach for filtered paths.
Defaults to None, in which case paths are only filtered based on thre_step_min.
thre_step_min (float, optional): The minimum threshold for the weight of the
direct connection between pre and post. Defaults to 0.0.
combining_method (str, optional): Method to combine inputs (outprop=False) or
outputs (outprop=True). Can be 'sum', 'mean', or 'median'. Defaults to 'mean'.
flow_steps (int): Number of steps for flow calculation. Defaults to 20.
flow_thre (float): Threshold for flow calculation. Defaults to 0.1.
flow (pd.DataFrame, optional): DataFrame containing the flow hitting time.
If provided, it should have columns 'cell_group' and 'hitting_time'.
If None, the flow hitting time is computed from `inprop` and `idx_to_group`.
Returns:
tuple:
pd.DataFrame:
DataFrame containing the edgelist with flow layers.
float:
The total effective weight of the filtered paths.
float:
The total effective weight of all paths before filtering.
float:
The minimum edge weight threshold used to filter paths which is minimally thre_step_min.
"""
inidx = to_nparray(inidx)
outidx = to_nparray(outidx)
# get edge list, both grouped by idx_to_group, and with raw indices
all_paths = []
rawel = []
for i in tqdm(range(n)):
paths = find_paths_of_length(inprop, inidx, outidx, i + 1)
if paths is None or paths.shape[0] == 0:
continue
rawel.append(paths)
paths = group_paths(
paths,
idx_to_group,
idx_to_group,
combining_method=combining_method,
)
if paths is None or paths.shape[0] == 0:
continue
all_paths.append(paths)
if len(all_paths) == 0:
return None, 0, 0
if thre_cumsum is None:
w_all = 0
w_filter = 0
for i in range(len(all_paths)):
thre_step = thre_step_min
w_all_i = effective_conn_from_paths(all_paths[i])
w_all += w_all_i.sum().sum()
all_paths[i] = filter_paths(all_paths[i], thre_step_min)
if all_paths[i] is None or all_paths[i].shape[0] == 0:
continue
w_filter_i = effective_conn_from_paths(all_paths[i])
w_filter += w_filter_i.sum().sum()
else:
all_paths, w_filter, w_all, thre_step = filter_all_paths_to_cumsum(
all_paths, thre_cumsum, thre_step_min
)
all_paths = pd.concat(all_paths, axis=0)
grouped = all_paths.groupby(["pre", "post"])["weight"].max().reset_index()
rawel = pd.concat(rawel, axis=0)
# pre, post are single neurons, so the rows should be real duplicates
rawel = rawel.drop_duplicates(subset=["pre", "post", "weight"])
# information flow
if flow is None:
rawel["pre_type"] = rawel.pre.map(idx_to_group)
rawel["post_type"] = rawel.post.map(idx_to_group)
# only keep the neurons in the types that pass the threshold
rawel = rawel[
rawel.pre_type.isin(grouped.pre) & rawel.post_type.isin(grouped.post)
]
# compute flow hitting time
flow = find_instance_flow(
rawel,
idx_to_group,
set([grp for idx, grp in idx_to_group.items() if idx in inidx]),
save_flow=False,
flow_steps=flow_steps,
flow_thre=flow_thre,
)
else:
# use provided flow hitting time
if not isinstance(flow, pd.DataFrame):
raise TypeError("Flow must be a pandas DataFrame.")
if "cell_group" not in flow.columns or "hitting_time" not in flow.columns:
raise ValueError(
"Flow DataFrame must contain 'cell_group' and 'hitting_time' columns."
)
type_layer = dict(zip(flow.cell_group, flow.hitting_time))
grouped["pre_layer"] = grouped.pre.map(type_layer)
grouped["post_layer"] = grouped.post.map(type_layer)
# neurons that were not reached are assigned the layer flow_steps
grouped.fillna(flow_steps, inplace=True)
return grouped, w_filter, w_all, thre_step
[docs]
def effective_conn_per_path_from_paths(
paths_df: pd.DataFrame,
) -> tuple[float, np.ndarray, np.ndarray]:
"""
Compute the total effective weight of each path of fixed length.
Beware that this might be slow for long paths.
Args:
paths_df (pd.DataFrame): DataFrame containing the path data with columns
'layer', 'pre', 'post', and 'weight'.
Returns:
tuple:
total_effective_weight (float):
Total effective weight of all paths ending in target group.
all_path_weights (np.array):
Array of weights for each individual path.
min_path_weights (np.array):
Array of minimum edge weights for each individual path.
"""
if paths_df is None or paths_df.empty:
return 0.0, [], []
# Convert to set for fast membership tests
max_layer = int(paths_df["layer"].max())
target_set = set(paths_df[paths_df["layer"] == max_layer]["post"].unique())
# Ensure columns are the right dtype for merging speed
paths_df = paths_df[["layer", "pre", "post", "weight"]].copy()
paths_df["layer"] = paths_df["layer"].astype(int)
# Prepare a mapping from layer -> DataFrame of edges
layer_edges = {l: df for l, df in paths_df.groupby("layer")}
# Start from input layer (layer 1 presynaptic neurons)
current_paths = layer_edges[1][["pre", "post", "weight"]].copy()
current_paths.rename(
columns={"pre": "path_start", "post": "path_end"}, inplace=True
)
current_paths["path_weight"] = current_paths["weight"]
current_paths["min_edge_weight"] = current_paths["weight"]
# Iterate layer by layer (forward chaining)
for layer in range(2, max_layer + 1):
if layer not in layer_edges:
break
next_edges = layer_edges[layer][["pre", "post", "weight"]]
# Join current path ends to next layer presynaptic neurons
merged = current_paths.merge(
next_edges,
left_on="path_end",
right_on="pre",
how="inner",
suffixes=("", "_next"),
)
if merged.empty:
break
# Update path info
merged["path_end"] = merged["post"]
merged["path_weight"] = merged["path_weight"] * merged["weight_next"]
merged["min_edge_weight"] = merged[["min_edge_weight", "weight_next"]].min(
axis=1
)
current_paths = merged[
["path_start", "path_end", "path_weight", "min_edge_weight"]
].copy()
current_paths["weight"] = current_paths["path_weight"]
# Only keep paths that end in target neurons
final_paths = current_paths[current_paths["path_end"].isin(target_set)]
if final_paths.empty:
return 0.0, [], []
# Compute relevant effective weights
min_path_weights = final_paths["min_edge_weight"].to_numpy()
all_path_weights = final_paths["path_weight"].to_numpy()
total_effective_weight = all_path_weights.sum()
return total_effective_weight, all_path_weights, min_path_weights