Skip to content

API reference

Generated from the source docstrings. The pure modules (transform, accumulate, config, earlystop) import with the standard library only; the heavier modules load torch and numpy lazily.

Configuration

pydeepmapper.config

Configuration for a DeepMapper run, plain dataclasses, no heavy deps.

Importing this module is cheap (stdlib only); torch/timm are only touched when a backbone is actually built. The defaults encode two project decisions:

  • Minimal augmentation. Pseudo-images are simpler and far less variable than natural photos, and geometric augmentations (flips/rotations/crops) would scramble the fixed feature->pixel layout. So augmentation is OFF by default, see docs/design.md and the benchmark that verifies this.
  • Small backbone first. The default backbone is a small CNN; the hypothesis that small, locality-biased models are faster and better here than large data-hungry ones is something the benchmark is set up to verify.

AugmentConfig dataclass

Augmentation policy. Default = identity (no augmentation).

Source code in pydeepmapper/config.py
@dataclass
class AugmentConfig:
    """Augmentation policy. Default = identity (no augmentation)."""
    enabled: bool = False
    # Only signal-preserving options are offered; geometric transforms are
    # intentionally absent because they break the fixed feature->pixel layout.
    gaussian_noise_std: float = 0.0     # SmoothGrad-style input jitter (0 = off)

BackboneSpec dataclass

Which model sits behind the pseudo-image. CNN <-> ViT <-> autoencoder swap happens here and nowhere else (see backbones.build).

Source code in pydeepmapper/config.py
@dataclass
class BackboneSpec:
    """Which model sits behind the pseudo-image. CNN <-> ViT <-> autoencoder swap
    happens here and nowhere else (see backbones.build)."""
    kind: str = "cnn_small"             # cnn_small | resnet18 | vit_cct | timm:<name> | conv_vae
    img_size: int = 0                   # 0 = derive from feature count
    in_chans: int = 1
    pretrained: bool = False            # ImageNet weights don't transfer to pseudo-images
    # free-form extras passed to the builder (e.g. timm kwargs, latent_dim for AE)
    extra: dict = field(default_factory=dict)

DeepMapperConfig dataclass

A full iterate-N-accumulate run.

Source code in pydeepmapper/config.py
@dataclass
class DeepMapperConfig:
    """A full iterate-N-accumulate run."""
    # iterate-N-accumulate
    n_passes: int = 30                  # N arrangements to accumulate over
    min_accuracy: float = 0.0           # per-pass quality gate (held-out accuracy)
    max_tries: int = 0                  # 0 = max_tries == n_passes; else cap retries
    top_k: int = 20                     # top-k for selection-frequency stability stat
    # transform
    buffer: int = 0                     # extra pixels beyond feature count
    index_features: bool = False        # True = fixed (identity) arrangement, no shuffle
    # training
    epochs: int = 20                    # max epochs; early stop ends sooner at a plateau
    batch_size: int = 64
    lr: float = 1e-3
    test_size: float = 0.25
    # plateau early-stopping (Keras EarlyStopping semantics, restore_best_weights)
    early_stop: bool = True             # monitor held-out loss, stop when it plateaus
    patience: int = 20                  # epochs of no improvement before stopping
    min_epochs: int = 40                # floor: never stop in the undertrained zone
    min_delta: float = 1e-3             # smallest loss drop that counts as improvement
    val_size: float = 0.15              # fraction of the train split held out to monitor
    # attribution
    attribution: str = "integrated_gradients"   # integrated_gradients | saliency | occlusion | attention_rollout | chefer
    attribution_samples: int = 64               # cap test samples fed to attribution (memory)
    ig_steps: int = 32                          # Integrated Gradients path steps
    ig_internal_batch: int = 16                 # IG internal batch size (bounds memory)
    # composition
    backbone: BackboneSpec = field(default_factory=BackboneSpec)
    augment: AugmentConfig = field(default_factory=AugmentConfig)
    seed: int = 0                       # base seed; pass t uses seed + t

    def effective_max_tries(self) -> int:
        return self.max_tries if self.max_tries > 0 else self.n_passes

Runner

pydeepmapper.runner

The DeepMapper iterate-N-accumulate orchestrator.

Generalises the legacy deepmap(X, y, num_passes, min_accuracy, max_tries) loop: each pass draws a fresh feature->pixel arrangement (seeded permutation), builds pseudo-images, trains the swappable backbone, gates on held-out accuracy, then attributes and back-projects to per-feature findings. After N accepted passes the pure accumulators (accumulate.py) produce the robust, stability-scored result.

The numeric heavy-lifting needs torch; the accumulation is the pure, contracted core and is unit-tested without torch. Importing this module is cheap.

Findings dataclass

Accumulated per-feature result of a run.

Source code in pydeepmapper/runner.py
@dataclass
class Findings:
    """Accumulated per-feature result of a run."""
    n_features: int
    n_passes_run: int
    n_accepted: int
    accuracies: List[float] = field(default_factory=list)
    stop_epochs: List[int] = field(default_factory=list)
    median_importance: List[float] = field(default_factory=list)
    mean_rank: List[float] = field(default_factory=list)
    selection_frequency: List[float] = field(default_factory=list)
    feature_names: Optional[List[str]] = None

    def ranking(self, top: int = 20):
        """Top features by selection frequency (then median importance), as
        ``(name_or_index, selection_frequency, median_importance)`` tuples."""
        idx = sorted(range(self.n_features),
                     key=lambda i: (-self.selection_frequency[i], -self.median_importance[i]))
        out = []
        for i in idx[:top]:
            name = self.feature_names[i] if self.feature_names else i
            out.append((name, self.selection_frequency[i], self.median_importance[i]))
        return out

ranking

ranking(top: int = 20)

Top features by selection frequency (then median importance), as (name_or_index, selection_frequency, median_importance) tuples.

Source code in pydeepmapper/runner.py
def ranking(self, top: int = 20):
    """Top features by selection frequency (then median importance), as
    ``(name_or_index, selection_frequency, median_importance)`` tuples."""
    idx = sorted(range(self.n_features),
                 key=lambda i: (-self.selection_frequency[i], -self.median_importance[i]))
    out = []
    for i in idx[:top]:
        name = self.feature_names[i] if self.feature_names else i
        out.append((name, self.selection_frequency[i], self.median_importance[i]))
    return out

run

run(X, y, config: DeepMapperConfig, feature_names: Optional[List[str]] = None) -> Findings

Execute the full iterate-N-accumulate pipeline. Returns :class:Findings.

X: (n_samples, n_features) array-like. y: integer class labels.

Source code in pydeepmapper/runner.py
def run(X, y, config: DeepMapperConfig, feature_names: Optional[List[str]] = None) -> Findings:
    """Execute the full iterate-N-accumulate pipeline. Returns :class:`Findings`.

    ``X``: ``(n_samples, n_features)`` array-like. ``y``: integer class labels.
    """
    try:
        import numpy as np
        import torch
        from torch.utils.data import DataLoader, TensorDataset
    except ImportError as e:                       # pragma: no cover
        raise RuntimeError(
            "pydeepmapper.runner.run needs torch + numpy; the pure accumulation "
            "API (pydeepmapper.accumulate) works without them.") from e

    X = np.asarray(X, dtype=np.float32)
    y = np.asarray(y).astype(np.int64)
    n_samples, n_features = X.shape
    num_classes = int(y.max()) + 1
    if torch.cuda.is_available():
        device = "cuda"
    elif getattr(torch.backends, "mps", None) is not None and torch.backends.mps.is_available():
        device = "mps"                              # Apple Silicon GPU
    else:
        device = "cpu"
    # Captum gradient attribution is most reliable off-MPS; attribute on CPU when
    # training on MPS (small capped batch, so the device hop is cheap).
    attr_device = "cpu" if device == "mps" else device
    aug = make_train_transform(config.augment)

    importance_lists: List[List[float]] = []
    rank_lists: List[List[float]] = []
    top_sets: List[List[int]] = []
    accuracies: List[float] = []
    stop_epochs: List[int] = []
    accepted = 0
    passes_run = 0

    for t in range(config.effective_max_tries()):
        if accepted >= config.n_passes:
            break
        passes_run += 1
        seed = config.seed + t
        perm = None if config.index_features else permutation(seed, n_features)

        images = to_images(X, buffer=config.buffer, perm=perm)         # (N, dim, dim, 1)
        images = np.transpose(images, (0, 3, 1, 2))                    # -> (N, C, H, W)
        Xt = torch.tensor(images, dtype=torch.float32)
        yt = torch.tensor(y, dtype=torch.int64)

        n_test = max(1, int(round(config.test_size * n_samples)))
        g = torch.Generator().manual_seed(seed)
        perm_idx = torch.randperm(n_samples, generator=g)
        te, tr = perm_idx[:n_test], perm_idx[n_test:]
        # carve a validation slice from train to monitor the plateau; te stays
        # untouched for the reported accuracy and for attribution.
        n_val = max(1, int(round(config.val_size * len(tr)))) if config.early_stop else 0
        val, trf = (tr[:n_val], tr[n_val:]) if n_val else (tr[:0], tr)

        model = backbones.build(config.backbone, num_classes, n_features).to(device)
        opt = torch.optim.Adam(model.parameters(), lr=config.lr)
        loss_fn = torch.nn.CrossEntropyLoss()
        loader = DataLoader(TensorDataset(Xt[trf], yt[trf]),
                            batch_size=config.batch_size, shuffle=True)

        # train to the held-out-loss plateau (Keras EarlyStopping), keep best weights
        stopper = EarlyStopper(config.patience, config.min_delta, config.min_epochs, mode="min")
        best_state = None
        stop_epoch = config.epochs
        model.train()
        for epoch in range(config.epochs):
            for xb, yb in loader:
                xb = aug(xb).to(device)
                yb = yb.to(device)
                opt.zero_grad()
                loss_fn(model(xb), yb).backward()
                opt.step()
            if not config.early_stop or n_val == 0:
                continue
            model.eval()
            with torch.no_grad():
                vloss = float(loss_fn(model(Xt[val].to(device)), yt[val].to(device)))
            model.train()
            stop = stopper.step(epoch, vloss)
            if stopper.improved:
                best_state = {k: v.detach().clone() for k, v in model.state_dict().items()}
            if stop:
                stop_epoch = epoch + 1
                break
        if config.early_stop and best_state is not None:
            model.load_state_dict(best_state)
            stop_epoch = stopper.best_epoch + 1
        stop_epochs.append(stop_epoch)

        model.eval()
        with torch.no_grad():
            pred = model(Xt[te].to(device)).argmax(1).cpu()
        acc_t = float((pred == yt[te]).float().mean())
        accuracies.append(acc_t)

        if not acc.accept_pass(acc_t, config.min_accuracy):
            continue                                                  # pass rejected
        accepted += 1

        # cap the number of test samples fed to attribution (bounds memory/time)
        attr_idx = te[:config.attribution_samples]
        model.to(attr_device)
        imp = attribution.feature_importances(
            model, Xt[attr_idx].to(attr_device), yt[attr_idx].to(attr_device),
            config.attribution, perm, n_features,
            ig_steps=config.ig_steps, ig_internal_batch=config.ig_internal_batch)
        imp = [float(v) for v in imp]
        importance_lists.append(imp)
        rank_lists.append([float(r) for r in acc.ranks_from_importances(imp)])
        top_sets.append(acc.top_k_set(imp, config.top_k))

    if accepted == 0:
        return Findings(n_features=n_features, n_passes_run=passes_run, n_accepted=0,
                        accuracies=accuracies, stop_epochs=stop_epochs,
                        feature_names=feature_names)

    return Findings(
        n_features=n_features, n_passes_run=passes_run, n_accepted=accepted,
        accuracies=accuracies, stop_epochs=stop_epochs,
        median_importance=acc.median_importance(importance_lists),
        mean_rank=acc.mean_rank(rank_lists),
        selection_frequency=acc.selection_frequency(top_sets, n_features),
        feature_names=feature_names)

Linear baseline

pydeepmapper.linear_baseline

Deterministic linear backbone for DeepMapper, penalised logistic regression on the FULL feature set, with no imagification and no multi-pass accumulation.

Why this exists (the methodological point): for a linear classifier the iterate-N-accumulate machinery is redundant. (i) A flatten+linear model is permutation-equivariant, so the feature->pixel arrangement is irrelevant and averaging over random arrangements removes nothing. (ii) Logistic regression is convex, so a deterministic solver (lbfgs to convergence) returns a unique optimum, every pass would be identical, so N=1 is exact. (iii) Integrated gradients on a linear model equals coef*(x - baseline), so the attribution ranking is exactly the (standardised) coefficients, no gradient sampling needed.

Imagification and iterate-N-accumulate are therefore for arrangement-SENSITIVE backbones (CNN/ViT); the linear analysis needs neither. This module makes that concrete and gives an exact, reproducible baseline.

fit

fit(X, y, feature_names: Optional[Sequence[str]] = None, C: float = 1.0, max_iter: int = 5000)

Fit the deterministic linear backbone. Returns (clf, scaler, ranking) where ranking is a list of (feature_name_or_index, importance) sorted descending. Importance = max_class |standardised coef|.

Source code in pydeepmapper/linear_baseline.py
def fit(X, y, feature_names: Optional[Sequence[str]] = None, C: float = 1.0, max_iter: int = 5000):
    """Fit the deterministic linear backbone. Returns (clf, scaler, ranking) where ranking is a list of
    (feature_name_or_index, importance) sorted descending. Importance = max_class |standardised coef|."""
    X = np.asarray(X, np.float32); y = np.asarray(y).astype(int)
    scaler = StandardScaler().fit(X)
    clf = LogisticRegression(max_iter=max_iter, C=C, solver="lbfgs").fit(scaler.transform(X), y)
    coef = np.abs(clf.coef_).max(0)                       # (p,), binary or one-vs-rest max
    order = np.argsort(coef)[::-1]
    names = list(feature_names) if feature_names is not None else list(range(X.shape[1]))
    ranking = [(names[i], float(coef[i])) for i in order]
    return clf, scaler, ranking

top_genes

top_genes(ranking, k: int) -> List

Top-k feature names from a fit ranking.

Source code in pydeepmapper/linear_baseline.py
def top_genes(ranking, k: int) -> List:
    """Top-k feature names from a `fit` ranking."""
    return [name for name, _ in ranking[:k]]

Data loading

pydeepmapper.io

Data intake for real scRNA-seq benchmarks (AnnData / .h5ad).

Turns an AnnData into the plain (X, y, var_names, label_names) the DeepMapper runner consumes, and provides the preprocessing parity helpers the paper cases need: log-normalization, highly-variable-gene selection (to measure what filtering costs), and the gene-intersection of two datasets (the stim-vs-unstim "common genes only" case, C5). scanpy/anndata are imported lazily, importing this module is cheap.

See docs/paper-cases.md.

Dataset dataclass

A benchmark-ready matrix: cells × genes, integer labels, names.

Source code in pydeepmapper/io.py
@dataclass
class Dataset:
    """A benchmark-ready matrix: cells × genes, integer labels, names."""
    X: "object"                     # numpy ndarray (n_cells, n_genes), float32
    y: "object"                     # numpy ndarray (n_cells,), int64
    var_names: List[str]            # gene names (len n_genes)
    label_names: List[str]          # class index -> label string
    obs_label_key: str = ""

    @property
    def n_cells(self): return self.X.shape[0]
    @property
    def n_genes(self): return self.X.shape[1]

load_h5ad

load_h5ad(path: str, label_key: str, normalize: bool = True, target_sum: float = 10000.0) -> Dataset

Load an AnnData .h5ad → :class:Dataset. label_key is the obs column holding the ground-truth class. normalize applies normalize_total + log1p (skip if the matrix is already normalized).

Source code in pydeepmapper/io.py
def load_h5ad(path: str, label_key: str, normalize: bool = True,
              target_sum: float = 1e4) -> Dataset:
    """Load an AnnData ``.h5ad`` → :class:`Dataset`. ``label_key`` is the ``obs`` column
    holding the ground-truth class. ``normalize`` applies ``normalize_total`` + ``log1p``
    (skip if the matrix is already normalized)."""
    import numpy as np
    import anndata as ad

    a = ad.read_h5ad(path)
    if normalize:
        a = _lognorm(a, target_sum)
    X = _densify(a.X)
    labels = a.obs[label_key].astype("category")
    y = labels.cat.codes.to_numpy().astype(np.int64)
    return Dataset(X=X, y=y, var_names=list(map(str, a.var_names)),
                   label_names=list(map(str, labels.cat.categories)),
                   obs_label_key=label_key)

load_10x_populations

load_10x_populations(specs, normalize: bool = True, target_sum: float = 10000.0, n_per_class: Optional[int] = None, seed: int = 0) -> Dataset

Load several FACS-sorted 10x populations and merge into one labeled Dataset.

specs: list of (mtx_dir, label), e.g. the Zheng CD4 subsets. Cells from each are tagged with label (the ground-truth FACS class); matrices are concatenated on shared genes (these share the same 10x reference, so genes align). Optionally subsample n_per_class cells per population (cap big runs). NO gene filtering, that's the point.

Source code in pydeepmapper/io.py
def load_10x_populations(specs, normalize: bool = True, target_sum: float = 1e4,
                         n_per_class: Optional[int] = None, seed: int = 0) -> Dataset:
    """Load several FACS-sorted 10x populations and merge into one labeled Dataset.

    ``specs``: list of ``(mtx_dir, label)``, e.g. the Zheng CD4 subsets. Cells from each
    are tagged with ``label`` (the ground-truth FACS class); matrices are concatenated on
    shared genes (these share the same 10x reference, so genes align). Optionally subsample
    ``n_per_class`` cells per population (cap big runs). NO gene filtering, that's the point.
    """
    import numpy as np
    import anndata as ad
    import scanpy as sc

    parts = []
    for mtx_dir, label in specs:
        a = sc.read_10x_mtx(_find_mtx_dir(mtx_dir), var_names="gene_symbols")
        a.var_names_make_unique()
        if n_per_class and a.n_obs > n_per_class:
            rng = np.random.default_rng(seed)
            a = a[rng.choice(a.n_obs, n_per_class, replace=False)].copy()
        a.obs["label"] = label
        parts.append(a)
    merged = ad.concat(parts, join="inner", label="batch")    # inner = shared genes
    if normalize:
        merged = _lognorm(merged, target_sum)
    X = _densify(merged.X)
    labels = merged.obs["label"].astype("category")
    return Dataset(X=X, y=labels.cat.codes.to_numpy().astype(np.int64),
                   var_names=list(map(str, merged.var_names)),
                   label_names=list(map(str, labels.cat.categories)), obs_label_key="label")

highly_variable_subset

highly_variable_subset(ds: Dataset, n_top_genes: int = 2000) -> Tuple[Dataset, List[int]]

Return the HVG-filtered dataset + the kept gene indices, used to measure what dimension reduction discards (run DeepMapper on full vs HVG, see if attribution genes survive selection).

Source code in pydeepmapper/io.py
def highly_variable_subset(ds: Dataset, n_top_genes: int = 2000) -> Tuple[Dataset, List[int]]:
    """Return the HVG-filtered dataset + the kept gene indices, used to *measure what
    dimension reduction discards* (run DeepMapper on full vs HVG, see if attribution genes
    survive selection)."""
    import numpy as np
    import anndata as ad
    import scanpy as sc

    a = ad.AnnData(np.asarray(ds.X))
    a.var_names = ds.var_names
    sc.pp.highly_variable_genes(a, n_top_genes=n_top_genes)
    keep = list(np.where(a.var["highly_variable"].to_numpy())[0])
    sub = Dataset(X=ds.X[:, keep], y=ds.y,
                  var_names=[ds.var_names[i] for i in keep],
                  label_names=ds.label_names, obs_label_key=ds.obs_label_key)
    return sub, keep

intersect_genes

intersect_genes(a: Dataset, b: Dataset) -> Tuple[Dataset, Dataset, List[str]]

Restrict two datasets to their shared genes (C5 "common genes only"). Returns the two restricted datasets + the shared gene list, both column-aligned to that order.

Source code in pydeepmapper/io.py
def intersect_genes(a: Dataset, b: Dataset) -> Tuple[Dataset, Dataset, List[str]]:
    """Restrict two datasets to their shared genes (C5 "common genes only"). Returns the
    two restricted datasets + the shared gene list, both column-aligned to that order."""
    shared = [g for g in a.var_names if g in set(b.var_names)]
    ai = {g: i for i, g in enumerate(a.var_names)}
    bi = {g: i for i, g in enumerate(b.var_names)}
    aidx = [ai[g] for g in shared]
    bidx = [bi[g] for g in shared]
    A = Dataset(X=a.X[:, aidx], y=a.y, var_names=shared,
                label_names=a.label_names, obs_label_key=a.obs_label_key)
    B = Dataset(X=b.X[:, bidx], y=b.y, var_names=shared,
                label_names=b.label_names, obs_label_key=b.obs_label_key)
    return A, B, shared

exclusive_genes

exclusive_genes(a: Dataset, b: Dataset) -> Tuple[List[str], List[str]]

Genes unique to each dataset (the "trivially separating" features in C5).

Source code in pydeepmapper/io.py
def exclusive_genes(a: Dataset, b: Dataset) -> Tuple[List[str], List[str]]:
    """Genes unique to each dataset (the "trivially separating" features in C5)."""
    sa, sb = set(a.var_names), set(b.var_names)
    return [g for g in a.var_names if g not in sb], [g for g in b.var_names if g not in sa]

export_for_seurat

export_for_seurat(ds: Dataset, outdir: str) -> str

Write counts.csv (cells × genes) + labels.csv (cell,label) for bench/seurat_pipeline.R. Returns outdir. Cell ids are cell0..cellN.

Source code in pydeepmapper/io.py
def export_for_seurat(ds: Dataset, outdir: str) -> str:
    """Write ``counts.csv`` (cells × genes) + ``labels.csv`` (cell,label) for
    ``bench/seurat_pipeline.R``. Returns ``outdir``. Cell ids are ``cell0..cellN``."""
    import os
    import numpy as np
    import pandas as pd

    os.makedirs(outdir, exist_ok=True)
    cells = [f"cell{i}" for i in range(ds.n_cells)]
    pd.DataFrame(np.asarray(ds.X), index=cells, columns=ds.var_names).to_csv(
        os.path.join(outdir, "counts.csv"))
    pd.DataFrame({"cell": cells,
                  "label": [ds.label_names[i] for i in ds.y]}).to_csv(
        os.path.join(outdir, "labels.csv"), index=False)
    return outdir

Evaluation

pydeepmapper.evaluate

Held-out evaluation with arrangement-ensembling, for confusion matrices & metrics.

The iterate-N runner (runner.run) accumulates attribution; this module accumulates predictions. It fixes ONE stratified train/test split, trains the backbone under N arrangements, and ensembles the softmax over arrangements into a final prediction, the principled DeepMapper class call. Returns y_true / y_pred / y_proba + a sklearn report, ready for plots.py. Needs torch (the pure core stays torch-free).

Attribution

pydeepmapper.attribution

Per-pixel attribution -> per-feature findings.

One attribute() seam, several methods: CNN : integrated_gradients (default), saliency, occlusion [Captum] ViT : attention_rollout, chefer [attention-based]

The method computes a per-pixel importance map for a batch, then transform.pixels_to_features back-projects it through the arrangement's inverse permutation to a per-feature vector (the clean 1-pixel-per-feature gather). torch / captum are imported lazily; a missing dependency raises a clear error.

feature_importances

feature_importances(model, X_images, targets, method: str, perm: Optional[Sequence[int]], n_features: int, baseline=None, ig_steps: int = 32, ig_internal_batch: int = 16)

Return a per-feature importance vector (length n_features) for method.

X_images: a (batch, C, H, W) tensor on the model's device. targets: per-sample class indices. Importances are averaged over the batch, then back-projected to feature space via the arrangement's inverse permutation. ig_steps / ig_internal_batch bound Integrated-Gradients memory.

Source code in pydeepmapper/attribution.py
def feature_importances(model, X_images, targets, method: str,
                        perm: Optional[Sequence[int]], n_features: int,
                        baseline=None, ig_steps: int = 32, ig_internal_batch: int = 16):
    """Return a per-feature importance vector (length ``n_features``) for ``method``.

    ``X_images``: a ``(batch, C, H, W)`` tensor on the model's device. ``targets``:
    per-sample class indices. Importances are averaged over the batch, then
    back-projected to feature space via the arrangement's inverse permutation.
    ``ig_steps`` / ``ig_internal_batch`` bound Integrated-Gradients memory.
    """
    try:
        import numpy as np
        import torch
    except ImportError as e:                       # pragma: no cover
        raise AttributionUnavailable("attribution needs torch") from e

    model.eval()
    if method in _CNN_METHODS:
        pix = _captum_pixels(model, X_images, targets, method, baseline,
                             ig_steps, ig_internal_batch)
    elif method in _VIT_METHODS:
        pix = _attention_pixels(model, X_images, targets, method)
    else:
        raise ValueError(f"unknown attribution method {method!r}")
    # pix: (batch, H, W) non-negative importance; average over batch then per-feature
    pix = np.asarray(pix, dtype=np.float32)
    mean_img = pix.mean(axis=0)
    return pixels_to_features(mean_img, perm, n_features)

Backbones

pydeepmapper.backbones

Swappable backbones, CNN <-> ViT <-> autoencoder behind one interface.

The whole point of the revision: the model is a plug-in. build(spec, num_classes) returns a torch.nn.Module for any registered kind; the rest of the pipeline (runner, attribution) never names a concrete architecture.

torch / timm are imported lazily inside the builders, so importing this module is free and the pure core stays torch-independent. If a backbone is requested without its dependency installed, a clear BackboneUnavailable is raised.

Registry

cnn_small, a tiny 3-conv net. DEFAULT. Small + locality-biased = fast & strong here (the hypothesis the benchmark verifies). resnet18, torchvision ResNet-18 (legacy DeepMapper parity baseline). vit_cct, Compact Convolutional Transformer (top ViT pick for pseudo-images). timm:, any timm model (e.g. timm:mobilevit_xxs, timm:convnext_nano). conv_vae, convolutional VAE for the latent-space option (encoder used as head). linear, flatten + single Linear: logistic regression on the FULL unfiltered feature set, run through the same no-filtering + multi-pass + attribution pipeline. The head-to-head baseline showing the architecture is incidental. mlp, flatten + 1 hidden layer: dense NON-linear baseline with NO convolution, isolating "non-linearity" from "convolution/imagification".

Non-neural classifiers (sklearn LogReg-L1 / random forest / gradient boosting) are NOT registered here: integrated-gradients attribution needs a differentiable model, so they would require a separate permutation-importance attribution path (future work).

BackboneUnavailable

Bases: RuntimeError

A backbone was requested but its dependency (torch/timm) isn't installed.

Source code in pydeepmapper/backbones.py
class BackboneUnavailable(RuntimeError):
    """A backbone was requested but its dependency (torch/timm) isn't installed."""

build

build(spec: BackboneSpec, num_classes: int, n_features: int)

Build the backbone for spec. kind is a registry key or timm:<name>.

Source code in pydeepmapper/backbones.py
def build(spec: BackboneSpec, num_classes: int, n_features: int):
    """Build the backbone for ``spec``. ``kind`` is a registry key or ``timm:<name>``."""
    if spec.kind.startswith("timm:"):
        return _build_timm(spec, num_classes, n_features)
    if spec.kind not in _REGISTRY:
        raise ValueError(
            f"unknown backbone kind {spec.kind!r}; "
            f"known: {sorted(_REGISTRY)} or 'timm:<name>'")
    return _REGISTRY[spec.kind](spec, num_classes, n_features)

available

available() -> list

Registry keys (plus the dynamic timm:<name> form).

Source code in pydeepmapper/backbones.py
def available() -> list:
    """Registry keys (plus the dynamic ``timm:<name>`` form)."""
    return sorted(_REGISTRY) + ["timm:<name>"]

Early stopping

pydeepmapper.earlystop

Plateau early-stopping, the pure decision logic (no torch).

Reproduces the Keras EarlyStopping(monitor, patience, min_delta, restore_best_weights=True) behaviour that the original TF DeepMapper used and that was lost in the PyTorch port: train until the monitored held-out metric stops improving for patience epochs, then restore the best-epoch weights. A min_epochs floor keeps the run out of the undertrained zone where attribution is unreliable (the 12-epoch 88% artefact).

Kept torch-free and side-effect-free so it is unit-tested without a GPU, the same split the project uses for accumulate.

EarlyStopper dataclass

Track a monitored metric and decide when training has plateaued.

Call :meth:step once per epoch with the held-out value. It records whether this epoch was a new best (self.improved, the caller saves weights then) and returns whether training should stop. mode='min' for a loss, 'max' for an accuracy.

Source code in pydeepmapper/earlystop.py
@dataclass
class EarlyStopper:
    """Track a monitored metric and decide when training has plateaued.

    Call :meth:`step` once per epoch with the held-out value. It records whether
    this epoch was a new best (``self.improved``, the caller saves weights then)
    and returns whether training should stop. ``mode='min'`` for a loss,
    ``'max'`` for an accuracy.
    """
    patience: int = 20
    min_delta: float = 1e-3
    min_epochs: int = 40
    mode: str = "min"

    best: float = float("inf")
    best_epoch: int = -1
    num_bad: int = 0
    improved: bool = False

    def __post_init__(self):
        if self.mode not in ("min", "max"):
            raise ValueError("mode must be 'min' or 'max'")
        self.best = float("inf") if self.mode == "min" else float("-inf")

    def step(self, epoch: int, value: float) -> bool:
        """Record ``value`` at ``epoch`` (0-based); return True to stop now.

        Before ``min_epochs`` (the warmup) we neither record a best epoch nor
        count patience, so the restored best-epoch model has always trained at
        least ``min_epochs``. The floor gates which epoch we keep, not only when
        we stop, on an easy task whose validation loss bottoms out very early
        this keeps attribution off a barely-trained model.
        """
        if (epoch + 1) < self.min_epochs:
            self.improved = False
            return False
        self.improved = is_improvement(value, self.best, self.min_delta, self.mode)
        if self.improved:
            self.best, self.best_epoch, self.num_bad = value, epoch, 0
        else:
            self.num_bad += 1
        return should_stop(self.num_bad, self.patience, epoch, self.min_epochs)

step

step(epoch: int, value: float) -> bool

Record value at epoch (0-based); return True to stop now.

Before min_epochs (the warmup) we neither record a best epoch nor count patience, so the restored best-epoch model has always trained at least min_epochs. The floor gates which epoch we keep, not only when we stop, on an easy task whose validation loss bottoms out very early this keeps attribution off a barely-trained model.

Source code in pydeepmapper/earlystop.py
def step(self, epoch: int, value: float) -> bool:
    """Record ``value`` at ``epoch`` (0-based); return True to stop now.

    Before ``min_epochs`` (the warmup) we neither record a best epoch nor
    count patience, so the restored best-epoch model has always trained at
    least ``min_epochs``. The floor gates which epoch we keep, not only when
    we stop, on an easy task whose validation loss bottoms out very early
    this keeps attribution off a barely-trained model.
    """
    if (epoch + 1) < self.min_epochs:
        self.improved = False
        return False
    self.improved = is_improvement(value, self.best, self.min_delta, self.mode)
    if self.improved:
        self.best, self.best_epoch, self.num_bad = value, epoch, 0
    else:
        self.num_bad += 1
    return should_stop(self.num_bad, self.patience, epoch, self.min_epochs)

is_improvement

is_improvement(value: float, best: float, min_delta: float, mode: str = 'min') -> bool

True if value beats best by more than min_delta (pure).

Source code in pydeepmapper/earlystop.py
def is_improvement(value: float, best: float, min_delta: float, mode: str = "min") -> bool:
    """True if ``value`` beats ``best`` by more than ``min_delta`` (pure)."""
    if mode == "min":
        return value < best - min_delta
    return value > best + min_delta

should_stop

should_stop(num_bad: int, patience: int, epoch: int, min_epochs: int) -> bool

True if training has plateaued: past the min_epochs floor AND patience consecutive non-improving epochs have elapsed (pure).

Source code in pydeepmapper/earlystop.py
def should_stop(num_bad: int, patience: int, epoch: int, min_epochs: int) -> bool:
    """True if training has plateaued: past the ``min_epochs`` floor AND
    ``patience`` consecutive non-improving epochs have elapsed (pure)."""
    return (epoch + 1) >= min_epochs and num_bad >= patience

Core transform

pydeepmapper.transform

DeepMapper feature->pixel transform, the PURE, contracted core.

A feature vector is laid out into a small dim x dim pseudo-image by padding to a perfect square and reshaping. Different arrangements come from a seeded permutation of the feature order; per-pixel attributions are projected back to feature space through the inverse permutation (a clean 1-pixel-per-feature gather, DeepMapper's differentiating property).

The scalar/list functions here are stdlib-only and deterministic, so they are testable without numpy or torch. Only the array builders (to_images / pixels_to_features) need numpy, imported lazily so importing this module never requires it.

square_dim

square_dim(n_features: int, buffer: int = 0) -> int

Smallest square side dim with dim*dim >= n_features + buffer.

Mirrors the canonical DeepMapper.map sizing. Minimal by construction: (dim-1)**2 < n_features + buffer <= dim**2.

Source code in pydeepmapper/transform.py
def square_dim(n_features: int, buffer: int = 0) -> int:
    """Smallest square side ``dim`` with ``dim*dim >= n_features + buffer``.

    Mirrors the canonical ``DeepMapper.map`` sizing. Minimal by construction:
    ``(dim-1)**2 < n_features + buffer <= dim**2``.
    """
    if not isinstance(n_features, int) or n_features < 1:
        raise ValueError("n_features must be a positive int")
    if not isinstance(buffer, int) or buffer < 0:
        raise ValueError("buffer must be a non-negative int")
    minsize = n_features + buffer
    dim = math.isqrt(minsize)
    if dim * dim < minsize:
        dim += 1
    return dim

image_side

image_side(n_features: int) -> int

Square side for n_features with no buffer (batch-facing alias).

Source code in pydeepmapper/transform.py
def image_side(n_features: int) -> int:
    """Square side for ``n_features`` with no buffer (batch-facing alias)."""
    return square_dim(n_features, 0)

permutation

permutation(seed: int, n: int) -> List[int]

A deterministic, seeded permutation of range(n) (one arrangement).

Same seed always yields the same permutation (reproducible runs). Uses the same random.Random(seed).shuffle mechanism as the canonical DeepMapper.shuffle_to_seed.

Source code in pydeepmapper/transform.py
def permutation(seed: int, n: int) -> List[int]:
    """A deterministic, seeded permutation of ``range(n)`` (one *arrangement*).

    Same ``seed`` always yields the same permutation (reproducible runs). Uses the
    same ``random.Random(seed).shuffle`` mechanism as the canonical
    ``DeepMapper.shuffle_to_seed``.
    """
    lst = list(range(n))
    random.Random(seed).shuffle(lst)
    return lst

inverse_permutation

inverse_permutation(perm: Sequence[int]) -> List[int]

Inverse of a permutation: inv[perm[i]] == i and perm[inv[i]] == i.

Source code in pydeepmapper/transform.py
def inverse_permutation(perm: Sequence[int]) -> List[int]:
    """Inverse of a permutation: ``inv[perm[i]] == i`` and ``perm[inv[i]] == i``."""
    inv = [0] * len(perm)
    for i, p in enumerate(perm):
        inv[p] = i
    return inv

to_images

to_images(X, buffer: int = 0, perm: Optional[Sequence[int]] = None)

Lay a (n_samples, n_features) matrix out as (n_samples, dim, dim, 1).

If perm is given, features are reordered by it first (the arrangement); feature perm[j] lands at flattened pixel j. Padding to dim*dim is zeros. (Deviation from the legacy code: we pad to exactly dim*dim rather than using np.resize, which silently repeats data when a buffer is set.)

Source code in pydeepmapper/transform.py
def to_images(X, buffer: int = 0, perm: Optional[Sequence[int]] = None):
    """Lay a ``(n_samples, n_features)`` matrix out as ``(n_samples, dim, dim, 1)``.

    If ``perm`` is given, features are reordered by it first (the arrangement);
    feature ``perm[j]`` lands at flattened pixel ``j``. Padding to ``dim*dim`` is
    zeros. (Deviation from the legacy code: we pad to exactly ``dim*dim`` rather
    than using ``np.resize``, which silently *repeats* data when a buffer is set.)
    """
    import numpy as np

    X = np.asarray(X, dtype=np.float32)
    if X.ndim != 2:
        raise ValueError("X must be 2-D (n_samples, n_features)")
    n, f = X.shape
    if perm is not None:
        if len(perm) != f:
            raise ValueError("perm length must equal n_features")
        X = X[:, list(perm)]
    dim = square_dim(f, buffer)
    pad = dim * dim - f
    Xp = np.pad(X, ((0, 0), (0, pad))) if pad else X
    return Xp.reshape(n, dim, dim, 1)

pixels_to_features

pixels_to_features(attr_image, perm: Optional[Sequence[int]], n_features: int)

Project a per-pixel attribution image back to per-feature importances.

Inverse of :func:to_images' layout: flattened pixel j (for j < n_features) carries feature perm[j], so out[perm[j]] = a[j]. Padding pixels are dropped. With perm=None the identity arrangement is assumed.

Source code in pydeepmapper/transform.py
def pixels_to_features(attr_image, perm: Optional[Sequence[int]], n_features: int):
    """Project a per-pixel attribution image back to per-feature importances.

    Inverse of :func:`to_images`' layout: flattened pixel ``j`` (for ``j <
    n_features``) carries feature ``perm[j]``, so ``out[perm[j]] = a[j]``. Padding
    pixels are dropped. With ``perm=None`` the identity arrangement is assumed.
    """
    import numpy as np

    a = np.asarray(attr_image, dtype=np.float32).reshape(-1)
    if perm is None:
        return a[:n_features]
    out = np.zeros(n_features, dtype=np.float32)
    for j in range(n_features):
        out[perm[j]] = a[j]
    return out

Accumulation

pydeepmapper.accumulate

Iterate-N accumulation, the PURE aggregation of per-pass findings.

DeepMapper runs the arrange -> train -> attribute loop N times and accumulates per-feature findings into robust, stability-scored attributions. These functions turn a list of per-pass results into the final ranking + stability statistics. No torch, no IO, stdlib only, so they are testable in isolation.

These functions are property-tested in isolation (selection frequency = the stability-selection statistic; median importance; Borda mean rank; the per-pass min_accuracy quality gate).

accept_pass

accept_pass(accuracy: float, min_accuracy: float) -> bool

The per-pass quality gate: a pass contributes only if it cleared the bar.

Source code in pydeepmapper/accumulate.py
def accept_pass(accuracy: float, min_accuracy: float) -> bool:
    """The per-pass quality gate: a pass contributes only if it cleared the bar."""
    return accuracy >= min_accuracy

ranks_from_importances

ranks_from_importances(importances: Sequence[float]) -> List[int]

Convert an importance vector to ranks (rank 1 = most important).

Ties are broken by original index (stable), so the result is a permutation of 1..n and feeds :func:mean_rank / top-k selection directly.

Source code in pydeepmapper/accumulate.py
def ranks_from_importances(importances: Sequence[float]) -> List[int]:
    """Convert an importance vector to ranks (rank 1 = most important).

    Ties are broken by original index (stable), so the result is a permutation of
    ``1..n`` and feeds :func:`mean_rank` / top-k selection directly.
    """
    order = sorted(range(len(importances)), key=lambda i: (-importances[i], i))
    ranks = [0] * len(importances)
    for r, i in enumerate(order, start=1):
        ranks[i] = r
    return ranks

top_k_set

top_k_set(importances: Sequence[float], k: int) -> List[int]

Indices of the k most important features in one pass (descending).

Source code in pydeepmapper/accumulate.py
def top_k_set(importances: Sequence[float], k: int) -> List[int]:
    """Indices of the ``k`` most important features in one pass (descending)."""
    order = sorted(range(len(importances)), key=lambda i: (-importances[i], i))
    return order[:k]

selection_frequency

selection_frequency(top_sets: Sequence[Sequence[int]], n_features: int) -> List[float]

Per-feature fraction of passes that selected it (the stability statistic).

top_sets: one selected-index list per accepted pass. Returns a value in [0, 1] per feature: how often it landed in the top-k. Empty input -> all 0.

Source code in pydeepmapper/accumulate.py
def selection_frequency(top_sets: Sequence[Sequence[int]], n_features: int) -> List[float]:
    """Per-feature fraction of passes that selected it (the stability statistic).

    ``top_sets``: one selected-index list per accepted pass. Returns a value in
    ``[0, 1]`` per feature: how often it landed in the top-k. Empty input -> all 0.
    """
    freq = [0.0] * n_features
    n = len(top_sets)
    if n == 0:
        return freq
    counts = [0] * n_features
    for s in top_sets:
        for i in s:
            counts[i] += 1
    return [c / n for c in counts]

mean_rank

mean_rank(rank_lists: Sequence[Sequence[float]]) -> List[float]

Borda aggregation: per-feature mean rank across passes (lower = better).

Source code in pydeepmapper/accumulate.py
def mean_rank(rank_lists: Sequence[Sequence[float]]) -> List[float]:
    """Borda aggregation: per-feature mean rank across passes (lower = better)."""
    n_passes = len(rank_lists)
    n_feat = len(rank_lists[0])
    out = [0.0] * n_feat
    for r in rank_lists:
        for i in range(n_feat):
            out[i] += r[i]
    return [v / n_passes for v in out]

median_importance

median_importance(importance_lists: Sequence[Sequence[float]]) -> List[float]

Per-feature median importance across passes (outlier-robust central estimate).

Source code in pydeepmapper/accumulate.py
def median_importance(importance_lists: Sequence[Sequence[float]]) -> List[float]:
    """Per-feature median importance across passes (outlier-robust central estimate)."""
    n_feat = len(importance_lists[0])
    out = [0.0] * n_feat
    for i in range(n_feat):
        col = sorted(lst[i] for lst in importance_lists)
        m = len(col)
        out[i] = col[m // 2] if m % 2 else 0.5 * (col[m // 2 - 1] + col[m // 2])
    return out

stability_selection_bound

stability_selection_bound(pi_thr: float, q: int, n_features: int) -> float

Meinshausen-Buhlmann expected-false-positives bound E[V] <= q^2 / ((2pi-1)F).

pi_thr: selection-frequency threshold (must be > 0.5). q: average number of features selected per pass. Returns the upper bound on expected false positives.

Source code in pydeepmapper/accumulate.py
def stability_selection_bound(pi_thr: float, q: int, n_features: int) -> float:
    """Meinshausen-Buhlmann expected-false-positives bound E[V] <= q^2 / ((2*pi-1)*F).

    ``pi_thr``: selection-frequency threshold (must be > 0.5). ``q``: average number
    of features selected per pass. Returns the upper bound on expected false positives.
    """
    if not 0.5 < pi_thr <= 1.0:
        raise ValueError("pi_thr must be in (0.5, 1.0]")
    return (q * q) / ((2.0 * pi_thr - 1.0) * n_features)

De-novo assembly

pydeepmapper.hybrid_assembly

De-novo-hybrid assembly, recover NON-DOCUMENTED transcripts and fold them into the expression "sc table" so DeepMapper sees features a reference-only pipeline discards.

Pipeline (validated on tissue-resident vs circulating memory T-cell bulk RNA, where the top discriminators turned out to be novel DENOVO transcripts):

reads --align(reference)--> UNALIGNED reads  (the non-documented fraction)
     --pool--> Trinity de-novo assembly --> novel transcripts, tagged ``DENOVO_*``
hybrid_ref = reference_cdna + DENOVO  ->  Salmon index + quant  ->  counts matrix
build_sc_table(...) -> transcripts × samples, each transcript flagged is_denovo

This is an optional DeepMapper data-ingestion path: instead of starting from a given matrix, build the matrix (incl. non-documented features) from raw reads. The PURE core below (namespacing + table assembly) is property-tested; the heavy tool orchestration (HISAT2/Trinity/Salmon/biopython) is gated behind dependency checks and requirements-bio.txt so importing this module stays cheap.

BioToolUnavailable

Bases: RuntimeError

A required external bioinformatics tool (hisat2/Trinity/salmon) is not on PATH.

Source code in pydeepmapper/hybrid_assembly.py
class BioToolUnavailable(RuntimeError):
    """A required external bioinformatics tool (hisat2/Trinity/salmon) is not on PATH."""

is_denovo

is_denovo(transcript_id: str) -> bool

True iff transcript_id is a de-novo (non-documented) transcript.

Source code in pydeepmapper/hybrid_assembly.py
def is_denovo(transcript_id: str) -> bool:
    """True iff ``transcript_id`` is a de-novo (non-documented) transcript."""
    return str(transcript_id).startswith(DENOVO_PREFIX)

tag_denovo

tag_denovo(transcript_ids: Sequence[str]) -> List[str]

Namespace de-novo transcript ids with DENOVO_ (idempotent, never double-tags), so they cannot collide with reference ids in the hybrid reference.

Source code in pydeepmapper/hybrid_assembly.py
def tag_denovo(transcript_ids: Sequence[str]) -> List[str]:
    """Namespace de-novo transcript ids with ``DENOVO_`` (idempotent, never double-tags),
    so they cannot collide with reference ids in the hybrid reference."""
    return [t if is_denovo(t) else DENOVO_PREFIX + t for t in transcript_ids]

merge_reference_and_denovo

merge_reference_and_denovo(reference_ids: Sequence[str], denovo_ids: Sequence[str]) -> List[str]

Combined transcript id list for the hybrid reference: reference ids untouched, de-novo ids tagged. Asserts (via the contract) that the namespaces don't collide.

Source code in pydeepmapper/hybrid_assembly.py
def merge_reference_and_denovo(reference_ids: Sequence[str],
                               denovo_ids: Sequence[str]) -> List[str]:
    """Combined transcript id list for the hybrid reference: reference ids untouched,
    de-novo ids tagged. Asserts (via the contract) that the namespaces don't collide."""
    return list(reference_ids) + tag_denovo(denovo_ids)

build_sc_table

build_sc_table(per_sample_counts: Mapping[str, Mapping[str, float]])

Assemble the expression "sc table" (transcripts × samples) from per-sample transcript->count maps (e.g. one Salmon quant.sf per sample).

Returns a pandas DataFrame indexed by transcript, one column per sample (column order = insertion order of per_sample_counts), missing transcripts filled 0. An extra boolean column is NOT added here (keep the matrix numeric); use :func:is_denovo on the index.

Source code in pydeepmapper/hybrid_assembly.py
def build_sc_table(per_sample_counts: Mapping[str, Mapping[str, float]]):
    """Assemble the expression "sc table" (transcripts × samples) from per-sample
    transcript->count maps (e.g. one Salmon ``quant.sf`` per sample).

    Returns a pandas DataFrame indexed by transcript, one column per sample (column order =
    insertion order of ``per_sample_counts``), missing transcripts filled 0. An extra boolean
    column is NOT added here (keep the matrix numeric); use :func:`is_denovo` on the index.
    """
    import pandas as pd
    cols = {s: pd.Series(counts, dtype="float64") for s, counts in per_sample_counts.items()}
    mat = pd.DataFrame(cols).fillna(0.0)
    mat = mat[list(per_sample_counts.keys())]          # preserve sample order
    return mat

denovo_fraction

denovo_fraction(transcript_ids: Sequence[str]) -> float

Fraction of ids that are de-novo, a quick diagnostic of how much non-documented signal the hybrid step recovered.

Source code in pydeepmapper/hybrid_assembly.py
def denovo_fraction(transcript_ids: Sequence[str]) -> float:
    """Fraction of ids that are de-novo, a quick diagnostic of how much non-documented
    signal the hybrid step recovered."""
    ids = list(transcript_ids)
    return sum(is_denovo(t) for t in ids) / len(ids) if ids else 0.0

align_capture_unaligned

align_capture_unaligned(reads_fastq: str, hisat2_index: str, out_dir: str, strandness: str = 'R') -> str

Align reads_fastq to the reference with HISAT2 and capture the UNALIGNED reads, the non-documented fraction. Returns the path to the unaligned FASTA.

Source code in pydeepmapper/hybrid_assembly.py
def align_capture_unaligned(reads_fastq: str, hisat2_index: str, out_dir: str,
                            strandness: str = "R") -> str:
    """Align ``reads_fastq`` to the reference with HISAT2 and capture the UNALIGNED reads, the non-documented fraction. Returns the path to the unaligned FASTA."""
    _require("hisat2")
    os.makedirs(out_dir, exist_ok=True)
    base = os.path.splitext(os.path.basename(reads_fastq))[0]
    un_fastq = os.path.join(out_dir, f"unaligned_{base}.fastq")
    sam = os.path.join(out_dir, f"aligned_{base}.sam")
    subprocess.run(["hisat2", "-x", hisat2_index, "-U", reads_fastq, "--rna-strandness",
                    strandness, "-S", sam, "--un", un_fastq], check=True)
    un_fasta = os.path.join(out_dir, f"unaligned_{base}.fasta")
    _fastq_to_fasta(un_fastq, un_fasta)
    return un_fasta

denovo_assemble

denovo_assemble(pooled_unaligned_fasta: str, out_dir: str, max_memory: str = '50G', cpu: int = 16) -> str

Trinity de-novo assembly of the pooled unaligned reads. Returns the path to the DENOVO-tagged FASTA (ready to concatenate into the hybrid reference).

Source code in pydeepmapper/hybrid_assembly.py
def denovo_assemble(pooled_unaligned_fasta: str, out_dir: str, max_memory: str = "50G",
                    cpu: int = 16) -> str:
    """Trinity de-novo assembly of the pooled unaligned reads. Returns the path to the
    DENOVO-tagged FASTA (ready to concatenate into the hybrid reference)."""
    _require("Trinity")
    subprocess.run(["Trinity", "--seqType", "fa", "--single", pooled_unaligned_fasta,
                    "--max_memory", max_memory, "--CPU", str(cpu), "--output", out_dir],
                   check=True)
    assembled = out_dir + ".Trinity.fasta" if not out_dir.endswith(".Trinity.fasta") else out_dir
    coded = os.path.join(os.path.dirname(out_dir) or ".", "denovo_coded.fasta")
    _prefix_fasta_ids(assembled, coded, DENOVO_PREFIX)
    return coded

build_hybrid_reference

build_hybrid_reference(reference_fasta: str, denovo_fasta: str, out_fasta: str, salmon_index_dir: str, kmer: int = 15) -> str

Concatenate reference + DENOVO into the hybrid reference and build a Salmon index. Returns the index directory.

Source code in pydeepmapper/hybrid_assembly.py
def build_hybrid_reference(reference_fasta: str, denovo_fasta: str, out_fasta: str,
                           salmon_index_dir: str, kmer: int = 15) -> str:
    """Concatenate reference + DENOVO into the hybrid reference and build a Salmon index.
    Returns the index directory."""
    _require("salmon")
    with open(out_fasta, "wb") as out:
        for src in (reference_fasta, denovo_fasta):
            with open(src, "rb") as f:
                shutil.copyfileobj(f, out)
    subprocess.run(["salmon", "index", "-t", out_fasta, "-i", salmon_index_dir,
                    "-k", str(kmer)], check=True)
    return salmon_index_dir

quantify

quantify(reads_by_sample: Mapping[str, str], salmon_index_dir: str, out_dir: str, threads: int = 16) -> Dict[str, str]

Salmon quant each sample against the hybrid index. Returns {sample: quant.sf path}.

Source code in pydeepmapper/hybrid_assembly.py
def quantify(reads_by_sample: Mapping[str, str], salmon_index_dir: str, out_dir: str,
             threads: int = 16) -> Dict[str, str]:
    """Salmon quant each sample against the hybrid index. Returns {sample: quant.sf path}."""
    _require("salmon")
    out = {}
    for sample, fastq in reads_by_sample.items():
        qd = os.path.join(out_dir, f"quant_{sample}")
        subprocess.run(["salmon", "quant", "-i", salmon_index_dir, "-l", "A", "-r", fastq,
                        "-p", str(threads), "--minAssignedFrags", "1", "-o", qd], check=True)
        out[sample] = os.path.join(qd, "quant.sf")
    return out

read_quant

read_quant(quant_sf: str) -> Dict[str, float]

Read a Salmon quant.sf into {transcript: NumReads}.

Source code in pydeepmapper/hybrid_assembly.py
def read_quant(quant_sf: str) -> Dict[str, float]:
    """Read a Salmon ``quant.sf`` into {transcript: NumReads}."""
    import pandas as pd
    df = pd.read_csv(quant_sf, sep="\t", usecols=["Name", "NumReads"])
    return dict(zip(df["Name"], df["NumReads"]))

run_hybrid_assembly

run_hybrid_assembly(reads_by_sample: Mapping[str, str], hisat2_index: str, reference_fasta: str, work_dir: str) -> 'object'

End-to-end orchestration → the expression sc table (transcripts × samples), with the recovered DENOVO transcripts included. Each step is idempotent enough to resume.

Source code in pydeepmapper/hybrid_assembly.py
def run_hybrid_assembly(reads_by_sample: Mapping[str, str], hisat2_index: str,
                        reference_fasta: str, work_dir: str) -> "object":
    """End-to-end orchestration → the expression sc table (transcripts × samples), with the
    recovered DENOVO transcripts included. Each step is idempotent enough to resume."""
    os.makedirs(work_dir, exist_ok=True)
    un_dir = os.path.join(work_dir, "denovo")
    unaligned = [align_capture_unaligned(fq, hisat2_index, un_dir)
                 for fq in reads_by_sample.values()]
    pooled = os.path.join(un_dir, "pooled_unaligned.fasta")
    with open(pooled, "wb") as out:
        for u in unaligned:
            with open(u, "rb") as f:
                shutil.copyfileobj(f, out)
    denovo = denovo_assemble(pooled, os.path.join(work_dir, "trinity_denovo"))
    index = build_hybrid_reference(reference_fasta, denovo,
                                   os.path.join(work_dir, "combined.fasta"),
                                   os.path.join(work_dir, "combined_index"))
    quants = quantify(reads_by_sample, index, un_dir)
    table = build_sc_table({s: read_quant(q) for s, q in quants.items()})
    return table