Source code for genecover.geneCover

import gurobipy as grb
import numpy as np
from scipy.stats import spearmanr
from pyscipopt import Model, quicksum

[docs] def gene_gene_correlation(X, method = 'spearman'): """ Compute the gene-gene correlation matrix from the gene expression matrix X. Args: X (np.ndarray or List[np.ndarray]): A matrix of shape (N, d), where N is the number of cells/spots and d is the number of genes. Alternatively, a list of such matrices (from different batches/samples) with consistent gene dimensions. method (str): Method used to compute correlation. Must be either 'spearman' or 'pearson'. Returns: np.ndarray: A gene-gene correlation matrix of shape (d, d). """ if type(X) ==list: corr_mat_list = [] for x in X: if method == 'spearman': corr_mat, _ = spearmanr(x) if method == 'pearson': corr_mat = np.corrcoef(x.T) corr_mat_list.append(corr_mat) corr_mat = np.vstack(corr_mat_list) else : if method == 'spearman': corr_mat, _ = spearmanr(X) if method == 'pearson': corr_mat = np.corrcoef(X.T) return corr_mat
[docs] def covering(Z, minSize=1, alpha=0.05, weights = 1., output=None, callBack = None, poolSolutions=None, poolSearchMode=None, poolGap = None, timeLimit=None, LogToConsole= 1,restart=None): """ Solves the minimal weight set covering problem using the Gurobi solver. Args: Z (np.ndarray): A binary matrix of shape (N, d), where N is the number of samples and d is the number of genes. minSize (int): The minimum number of genes to select. alpha (float): The minimum fraction of samples that must be covered. weights (np.ndarray): A 1D array of weights for each gene. Higher weights indicate higher cost for selection. output (int): Enables or disables solver output. Set to 1 to print optimization details, 0 to suppress. callBack (Callable): A callback function to be invoked during optimization. poolSolutions (int): Number of solutions to store in the solution pool. See: https://www.gurobi.com/documentation/current/refman/poolsolutions.html poolSearchMode (int): Mode for exploring the MIP search tree. See: https://www.gurobi.com/documentation/current/refman/poolsearchmode.html poolGap (float): Relative MIP optimality gap for accepting solutions into the pool. See:https://www.gurobi.com/documentation/current/refman/poolgap.html timeLimit (float): Time limit (in seconds) for the optimization run. LogToConsole (int): Whether to print the optimization log. Set to 1 to enable. restart (gurobipy.Model): A Gurobi model instance to restart the optimization from. Returns: List[int]: Indices of the selected genes. """ if restart is not None: cov = restart if output is not None: cov.Params.OutputFlag = output if poolSolutions is not None: cov.Params.PoolSolutions = poolSolutions if poolSearchMode is not None: cov.Params.PoolSearchMode = poolSearchMode if poolGap is not None: cov.Params.PoolGap = poolGap if timeLimit is not None: cov.Params.TimeLimit = timeLimit if LogToConsole is not None: cov.Params.LogToConsole = LogToConsole if callBack is None: cov.optimize() else: cov.optimize(callBack) return cov if np.isscalar(minSize): minSize = [minSize] if np.isscalar(alpha): alpha = [alpha]*len(minSize) N = Z.shape[0] d = Z.shape[1] if type(weights) == str and weights=='prob': w = 1 - 0.01 * np.mean(Z, axis=0) elif np.isscalar(weights): w = weights * np.ones(d) else: w = weights cov = grb.Model() if output is not None: cov.Params.OutputFlag=output if poolSolutions is not None: cov.Params.PoolSolutions = poolSolutions if poolSearchMode is not None: cov.Params.PoolSearchMode = poolSearchMode if poolGap is not None: cov.Params.PoolGap = poolGap if timeLimit is not None: cov.Params.TimeLimit = timeLimit if LogToConsole is not None: cov.Params.LogToConsole = LogToConsole nlevels = len(minSize) x = [] y = [] for l in range(nlevels): x.append(cov.addMVar(d, vtype=grb.GRB.BINARY)) for l in range(nlevels): y.append(cov.addMVar(N, vtype=grb.GRB.BINARY)) for l in range(nlevels): expr = y[l].sum() cov.addConstr(expr >= N*(1-alpha[l]), 'Coverage_'+str(l)) for l in range(nlevels): expr = Z @ x[l] - minSize[l]*y[l] cov.addConstr(expr >= 0, 'covered_' + str(l)) # if B is not None: # exprB = B @ x[l] - MinMarkerPerClass # cov.addConstr(exprB >= 0, 'MinMarkerPerClass_' + str(l)) for l in range(nlevels-1): for j in range(d): cov.addConstr(x[l+1].tolist()[j] - x[l].tolist()[j] >= 0, name= 'Nesting'+str(j)+'_'+str(l)) expr = grb.LinExpr() for l in range(nlevels): expr += (w * x[l]).sum() cov.setObjective(expr, grb.GRB.MINIMIZE) if callBack is None: cov.optimize() else: cov.optimize(callBack) return cov
[docs] def covering_scip(Z, minSize=1, weights=1.0, timeLimit=None, output=1): """ Solves the minimal weight set covering problem using the SCIP solver. Args: Z (np.ndarray): A binary matrix of shape (N, d), where Z[i, j] == 1 indicates that set j covers element i. minSize (int, optional): Minimum number of sets required to cover each element. Defaults to 1. weights (str|float|Sequence[float], optional): Weights for each set. - If 'prob', uses 1 - 0.01 * mean coverage per column. - If scalar, uses the same weight for all sets. - Otherwise, an array of length d giving each set’s weight. Defaults to 1.0. timeLimit (float, optional): Time limit in seconds for the SCIP solver. Defaults to None (no limit). output (int, optional): 1 to enable solver output, 0 to suppress it. Defaults to 1. Returns: List[int]: List of selected set indices (column indices of Z). """ N, d = Z.shape # Prepare weight array if isinstance(weights, str) and weights == 'prob': w = 1 - 0.01 * np.mean(Z, axis=0) elif np.isscalar(weights): w = weights * np.ones(d, dtype=float) else: w = np.asarray(weights, dtype=float) # Precompute non‐zero columns per row and check coverage feasibility cover_indices = [np.flatnonzero(Z[i]) for i in range(N)] for i, cols in enumerate(cover_indices): if cols.size == 0: raise ValueError(f"Element {i} has no covering sets → problem infeasible") # Build SCIP model model = Model("SetCover_simple") if timeLimit is not None: model.setParam("limits/time", timeLimit) if output == 0: model.hideOutput() # Variables: x[j]=1 if set j is selected x = [model.addVar(vtype="B", name=f"x_{j}") for j in range(d)] # Objective: minimize total weight model.setObjective(quicksum(w[j] * x[j] for j in range(d)), "minimize") # Cover each element i with at least minSize selected sets for i, cols in enumerate(cover_indices): model.addCons( quicksum(x[j] for j in cols) >= minSize, name=f"cover_i{i}" ) # Solve model.optimize() # Extract solution selected = [j for j in range(d) if model.getVal(x[j]) > 0.5] return selected
[docs] def greedy_weighted_set_cover(Z, w) : """ Greedy heuristic for the weighted set cover problem. Args: Z (np.ndarray): A binary matrix of shape (n_elements, m_sets), where `Z[i, j] == 1` indicates that set `j` covers element `i`. w (np.ndarray): A 1D array of length `m_sets` representing the weight of each set. Returns: List[int]: Indices of the selected sets (column indices of `Z`) that form a cover. """ n, m = Z.shape # which elements are still uncovered uncovered = np.ones(n, dtype=bool) # which sets are still available available = np.ones(m, dtype=bool) selected = [] while uncovered.any(): # For each set j: how many of the still-uncovered elements it would cover? # Z[uncovered] is an array of shape (#uncovered_elements, m) cover_counts = Z[uncovered].sum(axis=0) # shape (m,) # zero out the ones we've already taken cover_counts = np.where(available, cover_counts, 0) # fast-path: if the best we can do is cover exactly one element per set, # grab them all at once and be done if cover_counts.max() == Z.shape[0] // Z.shape[1]: singletons = np.where((available) & (cover_counts == 1))[0] selected.extend(singletons.tolist()) break # otherwise pick the set with max (covered_new_elems / weight) nonzero = cover_counts > 0 if not nonzero.any(): # nothing left can cover any new element break ratios = np.zeros(m, dtype=float) ratios[nonzero] = cover_counts[nonzero] / w[nonzero] best = int(ratios.argmax()) selected.append(best) # mark its covered elements as now covered # Z[:, best] is the column for set "best" uncovered &= ~Z[:, best].astype(bool) # and remove that set from future consideration available[best] = False return selected
[docs] def GeneCover(num_marker, corr_mat, w, m = 3,interval = 0, lambdaMax = .3, lambdaMin = 0.05, timeLimit = 600, output = 0, solver = "Gurobi") : """ Selects marker genes based on gene-gene correlation using combinatorial optimization or a greedy heuristic. Args: num_marker (int): Desired number of markers to select. corr_mat (np.ndarray): Gene-gene correlation matrix. interval (int): Allowed deviation from `num_marker`. The final number of markers may vary within this range. w (np.ndarray): An array of weights for the genes. Higher weights indicate higher cost for selection. lambdaMax (float): Maximum threshold for acceptable gene-gene correlation. lambdaMin (float): Minimum threshold for acceptable gene-gene correlation. timeLimit (float): Time limit (in seconds) for the optimization. ouput (int): Whether to print the optimization process. Set to 1 to enable. solver (str): The solver to use for the optimization. Options are "Gurobi", "SCIP", and "Greedy". greedy (bool): Whether to use a greedy algorithm for set cover instead of the Gurobi solver. Default: False. Returns: List[int]: Indices of the selected marker genes. """ epsilon = (lambdaMax + lambdaMin)/2 best_marker_length_gap = 1e6 selection = np.arange(corr_mat.shape[1]) G_v3 = corr_mat > epsilon if solver == "Gurobi": cov_sol = covering(G_v3, minSize=1, alpha=0.0, weights=w, timeLimit=timeLimit, output = output) cov_sol = selection[np.array(cov_sol.x)[:len(selection)] > 0.5] elif solver == "Greedy": cov_sol = greedy_weighted_set_cover(G_v3, w) elif solver == "SCIP": cov_sol = covering_scip(G_v3, minSize=1, weights=w, timeLimit=timeLimit, output=output) else: raise ValueError("Invalid solver specified. Choose from 'Gurobi', 'SCIP', or 'Greedy'.") markers = [] num_batches = G_v3.shape[0] // G_v3.shape[1] num_genes = G_v3.shape[1] for i in cov_sol: if num_batches > 1: if G_v3[[i + j * num_genes for j in range(num_batches)]].sum(axis = 1).min() >= m: markers.append(i) else: if G_v3[i].sum() >= m: markers.append(i) n_markers = len(markers) current_gap = abs(n_markers - num_marker) best_marker_length_gap = current_gap best_epsilon = epsilon while (lambdaMax - lambdaMin) > 1e-6 and (n_markers < num_marker or n_markers > num_marker + interval): if n_markers< num_marker: lambdaMax = epsilon else: lambdaMin = epsilon epsilon = (lambdaMin+lambdaMax)/2 G_v3 = corr_mat > epsilon if solver == "Gurobi": cov_sol = covering(G_v3, minSize=1, alpha=0.0, weights=w, timeLimit=timeLimit,output = output) cov_sol = selection[np.array(cov_sol.x)[:len(selection)] > 0.5] elif solver == "Greedy": cov_sol = greedy_weighted_set_cover(G_v3, w) elif solver == "SCIP": cov_sol = covering_scip(G_v3, minSize=1, weights=w, timeLimit=timeLimit, output=output) else: raise ValueError("Invalid solver specified. Choose from 'Gurobi', 'SCIP', or 'Greedy'.") markers = [] for i in cov_sol: if num_batches > 1: if G_v3[[i + j * num_genes for j in range(num_batches)]].sum(axis = 1).min() >= m: markers.append(i) else: if G_v3[i].sum() >= m: markers.append(i) n_markers = len(markers) current_gap = abs(n_markers - num_marker) if current_gap < best_marker_length_gap: best_marker_length_gap = current_gap best_epsilon = epsilon best_lambdaMin = lambdaMin best_lambdaMax = lambdaMax best_direction = n_markers < num_marker print("Best Gap: ", best_marker_length_gap) print("Best Epsilon: ", best_epsilon) return markers
[docs] def Iterative_GeneCover(incremental_sizes,corr_mat, w,m = 3, lambdaMin = .05,lambdaMax = .3, timeLimit = 600, output = 0, solver = "Gurobi"): """ Performs iterative marker gene selection using the GeneCover algorithm. Args: corr_mat (np.ndarray): Gene-gene correlation matrix of shape (d, d). incremental_sizes (List[int]): A list indicating the number of markers to select at each iteration. w (np.ndarray): An array of weights for each gene. Higher weights indicate higher cost for selection. lambdaMax (float): Maximum threshold for gene-gene correlation. lambdaMin (float): Minimum threshold for gene-gene correlation. timeLimit (float): Time limit (in seconds) for the optimization. output (int): Whether to print the optimization process. Set to 1 to enable. solver (str): The solver to use for the optimization. Options are "Gurobi", "SCIP", and "Greedy". Returns: List[List[int]]: A list where each element is a list of indices of the selected marker genes at the corresponding iteration. """ num_batches = corr_mat.shape[0] // corr_mat.shape[1] num_genes = corr_mat.shape[1] MARKERS = [] print("Iteration 1") markers = GeneCover(incremental_sizes[0],corr_mat,w = w,m =m,lambdaMax=lambdaMax,lambdaMin=lambdaMin,timeLimit=timeLimit,output=output,solver=solver) selection = np.arange(corr_mat.shape[1]) MARKERS.append(markers) remaining_genes_idx_abs = np.setdiff1d(selection, markers) for t, size in enumerate(incremental_sizes[1:]): print("Iteration ", t+2) remaining_genes_idx_abs_batches = np.array([remaining_genes_idx_abs + j * num_genes for j in range(num_batches)]).flatten() corr_mat_remain=corr_mat[remaining_genes_idx_abs_batches][:,remaining_genes_idx_abs] markers=GeneCover(size,corr_mat_remain,w=w[remaining_genes_idx_abs],m=m,lambdaMin=lambdaMin,lambdaMax=lambdaMax,timeLimit=timeLimit,output=output,solver=solver) MARKERS.append(remaining_genes_idx_abs[markers]) remaining_genes_idx_abs = np.setdiff1d(remaining_genes_idx_abs, [j for i in MARKERS for j in i]) return MARKERS