Source code for idtrack._harmonize_features

#!/usr/bin/env python3

import gc
import logging
import os
import pickle  # noqa: S403
import warnings
from functools import cached_property
from typing import Literal, Optional, Union

import anndata as ad
import numpy as np
import pandas as pd
import tqdm
from scipy.sparse import csr_matrix

import idtrack
from idtrack import DB
from idtrack._db import MISSING_VALUES


[docs] class HarmonizeFeatures: """Harmonize gene/feature identifiers across multiple single-cell expression datasets. This manager streamlines the otherwise error-prone task of bringing heterogeneous gene identifiers (Ensembl IDs, gene symbols, etc.) into a single, version-controlled namespace before integrated downstream analysis. Under the hood it leverages :py:class:`idtrack.api.API` to resolve identifier mappings through a pre-computed Ensembl graph, handles one-to-many and one-to-zero conversions, logs any ambiguous or inconsistent matches, and finally produces harmonised :py:class:`anndata.AnnData` objects ready for comparative or joint analysis. The public workflow is intentionally simple: * :py:meth:`feature_harmonizer` — convert a single dataset and return the filtered ``AnnData`` plus before/after feature counts. * :py:meth:`unify_multiple_anndatas` — apply harmonisation across *all* supplied datasets and return an integrated object. * :py:meth:`get_idtrack_matchings_for_all_datasets` — inspect the raw IDTrack matchings used. Instances keep several diagnostic attributes (e.g. ``removed_conversion_failed_identifiers``, ``multiple_ensembl_dict``) so that users can audit every decision that removed or altered a feature. Args: project_name (str): Human-readable label used in log messages and derived output file names. data_h5ad_dict (dict[str, str]): Mapping *dataset_alias → absolute .h5ad path* of the source single-cell expression matrices. project_local_repository (str): Writable directory where harmonised outputs, logs, and temporary artefacts will be stored. idtrack_local_repository (str): Local clone or cache directory understood by :py:class:`idtrack.api.API`; used to read the pre-built identifier graph. target_ensembl_release (int): Ensembl release that all identifiers will be converted *to*. Must be ≤ *graph_last_ensembl_release*. final_database (str): Canonical namespace kept after conversion (e.g. ``"HGNC Symbol"``). Defaults to ``"HGNC Symbol"``. organism_name (str): Ensembl-style organism short name (e.g. ``"homo_sapiens"``). Defaults to ``"homo_sapiens"``. graph_last_ensembl_release (int): Highest release present in the on-disk IDTrack graph. Defaults to ``114``. verbose_level (Literal[0, 1, 2]): Logging verbosity; 0 = errors only, 1 = warnings, 2 = info. Defaults to ``2``. debugging_variables (bool): Retain heavy intermediate structures for post-mortem inspection. Defaults to ``False``. converted_id_column (str): Column name used to store converted identifiers inside the resulting ``AnnData.var`` DataFrame. Defaults to ``"converted_id"``. Attributes: idt (idtrack.api.API): Lazily initialised IDTrack interface used for all identifier look-ups. multiple_ensembl_dict (dict[str, list[str]]): Map of collapsed IDs to *all* Ensembl IDs that were originally associated with the same target identifier. removed_conversion_failed_identifiers (dict[str, set[str]]): Features that failed conversion and were dropped from each dataset. kept_conversion_failed_identifiers (dict[str, set[str]]): Non-convertible features kept because they were consistently non-convertible across *all* datasets. removed_inconsistent_identifier_matching (dict[str, set[str]]): Features whose mappings disagreed between datasets and were therefore removed for consistency. """ nonmatching_but_consistent_suffix = "" # Internal column name for matched Ensembl gene IDs. # Using a distinct name avoids conflicts when final_database="ensembl_gene". _ENSEMBL_GENE_COLUMN = "matching_ensembl_gene"
[docs] @staticmethod def _get_column_as_series(df: pd.DataFrame, column: str) -> pd.Series: """Safely extract a single column from a DataFrame, always returning a Series. When a DataFrame has duplicate column names, ``df[column]`` returns a DataFrame instead of a Series. This method handles such edge cases by selecting the first matching column when duplicates exist. Args: df (pd.DataFrame): The DataFrame to extract from. column (str): The column name to extract. Returns: pd.Series: The column data as a Series. """ result = df[column] if isinstance(result, pd.DataFrame): # Duplicate column names exist; take the first one and warn result = result.iloc[:, 0] return result
def __init__( self, project_name: str, data_h5ad_dict: dict[str, str], # custom dataset name to dataset absolute path project_local_repository: str, idtrack_local_repository: str, target_ensembl_release: int, final_database: str = "HGNC Symbol", organism_name: str = "homo_sapiens", graph_last_ensembl_release: int = 114, verbose_level: Literal[0, 1, 2] = 2, debugging_variables: bool = False, converted_id_column: str = "converted_id", ): """Instantiate the harmoniser and perform lightweight validation. The constructor merely *prepares* the harmonisation context: it validates input paths, configures logging, and primes IDTrack. Heavy work—graph initialisation, identifier matching, gene-symbol resolution—happens lazily when the first harmonisation method is called. Args: project_name (str): See :py:meth:`HarmonizeFeatures`. data_h5ad_dict (dict[str, str]): See :py:meth:`HarmonizeFeatures`. project_local_repository (str): See :py:meth:`HarmonizeFeatures`. idtrack_local_repository (str): See :py:meth:`HarmonizeFeatures`. target_ensembl_release (int): See :py:meth:`HarmonizeFeatures`. final_database (str): See :py:meth:`HarmonizeFeatures`. organism_name (str): See :py:meth:`HarmonizeFeatures`. graph_last_ensembl_release (int): See :py:meth:`HarmonizeFeatures`. verbose_level (Literal[0, 1, 2]): See :py:meth:`HarmonizeFeatures`. debugging_variables (bool): See :py:meth:`HarmonizeFeatures`. converted_id_column (str): See :py:meth:`HarmonizeFeatures`. Raises: ValueError: If *verbose_level* is not 0, 1, or 2. """ self.project_name = project_name self.project_local_repository = project_local_repository self.data_h5ad_dict = data_h5ad_dict self.converted_id_column = converted_id_column self.idtrack_local_repository = idtrack_local_repository self.target_ensembl_release = target_ensembl_release self.graph_last_ensembl_release = graph_last_ensembl_release self.final_database = final_database self.organism_name = organism_name self.n_datasets = len(self.data_h5ad_dict) self.debugging_variables = debugging_variables if verbose_level == 0: _logging = logging.ERROR elif verbose_level == 1: _logging = logging.WARNING elif verbose_level == 2: _logging = logging.INFO elif verbose_level == 3: _logging = logging.DEBUG else: raise ValueError(f"verbose_level must be 0, 1, 2, or 3, got {verbose_level!r}.") self.verbose_level = verbose_level self.idt = idtrack.API(local_repository=self.idtrack_local_repository) self.log = logging.getLogger("harmonize_features") self.idt.configure_logger(_logging) self.idt_initialized = False self.dict_n_to_1: dict[str, list[str]] = dict() self.multiple_ensembl_list: list[list[tuple[str, str]]] = list() # Important dictionaries for debugging: # If there is n-to-1 matching in a database and if one of them is query=matching, keep it and remove others # for a given group (n), below shows the chosen id (due to its identical as explained), the group # and the datasets that has this group self.dict_n_to_1_with_query: dict[str, dict[tuple, list]] = dict() # this dictionary is ideal to find out the removed ids self.dict_n_to_1_with_query_reverse: dict[str, dict[str, list[str]]] = dict() # this is a dictary showing group-to-dataset self.dict_n_to_1_without_query: dict[tuple, list] = dict() # also: # cached_property: self.dict_1_to_not_1 # to include into integrated anndata object. # sometimes, final database (e.g. HGNC) id is associated with multiple ensembl ids, the algorithm chooses # one of them and creates a list. here you can find the actual possible ensembl ids. self.multiple_ensembl_dict: dict[str, list[str]] = dict() # removed ids from the anndata by `harmonizer` method. the list and the in which datasets it is deleted is kept. self.removed_conversion_failed_identifiers: dict[str, dict[str, list[str]]] = dict() # these ids could be removed but kept as they are consistent across all provided datasets. self.kept_conversion_failed_identifiers: dict[str, dict[str, list[str]]] = dict() # inconsistent matching self.removed_inconsistent_identifier_matching: dict[str, dict[str, list[str]]] = dict() # removed due to multiple source IDs mapping to same final_database (ensembl_gene override duplicates) self.removed_final_database_duplicates: dict[str, dict[str, list[str]]] = dict() self._initialize()
[docs] def _initialize(self) -> None: """Populate diagnostic structures for failed or ambiguous identifier conversions. Called once by :py:meth:`HarmonizeFeatures.__init__`, this routine scans every input dataset and updates several reporting attributes (for example :py:attr:`removed_conversion_failed_identifiers` or :py:attr:`removed_inconsistent_identifier_matching`). It also derives :py:attr:`multiple_ensembl_dict`, a reverse map of ambiguous *Ensembl ID → source identifiers*, enabling downstream inspection of one-to-many relationships. Returns ``None``: All results are stored on *self* for later inspection. Internally the method: 1. Extracts raw feature identifiers from each :py:class:`anndata.AnnData` file. 2. Classifies identifiers into failure or inconsistency categories. 3. Records per-dataset membership via :py:meth:`reporter_dict_creator`. 4. Builds the ``multiple_ensembl_list`` used by :py:meth:`HarmonizeFeatures.create_multiple_ensembl_dict`. 5. Touches :py:meth:`datataset_conversion_dataframe_issues` so the cached-property is built eagerly. """ for dataset_name, dataset_path in self.data_h5ad_dict.items(): gene_list = self.extract_source_identifiers_from_anndata(dataset_path) self.reporter_dict_creator( dataset_name=dataset_name, the_dict=self.removed_conversion_failed_identifiers, the_set=set(gene_list) & self.conversion_failed_identifiers, ) self.reporter_dict_creator( dataset_name=dataset_name, the_dict=self.kept_conversion_failed_identifiers, the_set=set(gene_list) & self.conversion_failed_but_consistent_identifiers, ) self.reporter_dict_creator( dataset_name=dataset_name, the_dict=self.removed_inconsistent_identifier_matching, the_set=set(gene_list) & self.datataset_conversion_dataframe_issues["gene_names_inconsistency"], ) self.reporter_dict_creator( dataset_name=dataset_name, the_dict=self.removed_final_database_duplicates, the_set=set(gene_list) & self.datataset_conversion_dataframe_issues["final_database_duplicate_source_ids"], ) for the_id in gene_list: conversion = self.unified_matching_dict[the_id]["matching"]["last_node"] if the_id not in self.conversion_failed_but_consistent_identifiers: self.multiple_ensembl_list.append(conversion) self.multiple_ensembl_dict = self.create_multiple_ensembl_dict() self.datataset_conversion_dataframe_issues
[docs] def _initialize_idt(self) -> None: """Instantiate the IDTrack interface on first use. The public API defers expensive graph loading until it is actually required. This helper therefore checks whether :py:attr:`idt` is ``None`` and, if so, loads the on-disk identifier graph described by *idtrack_local_repository* and *graph_last_ensembl_release*, then configures release filters so that subsequent look-ups always target *target_ensembl_release*. Re-invocations are no-ops. Returns ``None``: The :py:attr:`idt` attribute is populated and ready for queries. """ if not self.idt_initialized: organism_formal_name, _ = self.idt.resolve_organism(self.organism_name) self.idt.build_graph(organism_name=organism_formal_name, snapshot_release=self.graph_last_ensembl_release) self.idt.calculate_graph_caches() self.idt_initialized = True
[docs] def create_multiple_ensembl_dict(self) -> dict[str, list[str]]: """Reverse map ambiguous Ensembl target IDs to their originating source identifiers. During scanning, :py:meth:`_initialize` collects every *(source_id, target_ensembl_id)* pair that falls outside the *consistent one-to-one* category into ``multiple_ensembl_list``. This helper consolidates that list into a dictionary keyed by ``target_ensembl_id`` with a **sorted list** of associated ``source_id`` values, allowing auditors to quickly discover all inputs that collapsed onto the same Ensembl record. Returns: dict[str, list[str]]: ``{target_ensembl_id: [source_id₁, source_id₂, …]}`` with duplicates removed and values sorted alphanumerically. """ r: dict[str, set[str]] = dict() for i in self.multiple_ensembl_list: for n, m in i: if m not in r: r[m] = set() r[m].add(n) return {k: sorted(v) for k, v in r.items()}
[docs] def reporter_dict_creator_helper_reason_finder(self, the_id: str) -> str: """Infer why a particular identifier failed or produced a non-one-to-one conversion. The algorithm inspects :py:attr:`unified_matching_dict` and categorises *the_id* into one or more mutually non-exclusive reasons: * ``"n-to-1"`` — The identifier was part of an *n → 1* collapse within *at least one* dataset. * ``"1-to-0"`` — No target identifier was returned (conversion failure). * ``"1-to-n"`` — The conversion yielded multiple targets (ambiguous mapping). The final label is a single string where multiple reasons are concatenated with underscores, e.g. ``"1-to-0_1-to-n"``. Args: the_id (str): Source identifier whose conversion outcome needs explanation. Returns: str: Underscore-delimited reason string describing the failure or ambiguity class. """ reason = [] matching = self.unified_matching_dict[the_id] len_targets = len(matching["matching"]["target_id"]) if the_id in self.dict_n_to_1: # in one of the datasets during the integration, this ID was a part of n-to-1. reason.append("n-to-1") if len_targets == 0: reason.append("1-to-0") if len_targets > 1: reason.append("1-to-n") return "_".join(reason)
[docs] def reporter_dict_creator( self, the_dict: dict[str, dict], the_set: set[str], dataset_name: str, ) -> None: """Update or create per-identifier diagnostic entries for a single dataset. Each identifier in *the_set* is ensured to exist as a key inside *the_dict*. The entry's ``"reason"`` field is generated exactly once using :py:meth:`reporter_dict_creator_helper_reason_finder`; its ``"datasets_containing"`` list is then appended with *dataset_name*. This allows quick aggregation of “where did this problematic identifier occur?” across all datasets. Returns ``None``: *the_dict* is modified in-place. Args: the_dict (dict[str, dict]): Target dictionary that stores diagnostic metadata. Keys are source identifiers; values have keys ``"reason"`` (``str``) and ``"datasets_containing"`` (``list[str]``). the_set (set[str]): Identifiers that belong to the diagnostic category represented by *the_dict*. dataset_name (str): Human-readable alias of the dataset currently being processed. """ for i in the_set: if i not in the_dict: the_dict[i] = {"reason": self.reporter_dict_creator_helper_reason_finder(i), "datasets_containing": []} the_dict[i]["datasets_containing"].append(dataset_name)
@cached_property def datataset_conversion_dataframe_issues(self) -> pd.DataFrame: """Aggregate conversion failures and ambiguities into a tidy diagnostic table. The cached DataFrame has one row per *source identifier* encountered across **all** datasets and the following columns: * ``dataset`` — Dataset alias that triggered the row (duplicates possible). * ``reason`` — Underscore-delimited label from :py:meth:`reporter_dict_creator_helper_reason_finder`. * ``target_identifier`` — The resolved identifier or ``NaN`` if conversion failed. * ``was_removed`` (bool) — Whether the feature was ultimately dropped from the dataset. This compact view is ideal for spreadsheet export or in-notebook inspection because it condenses the richer nested structures stored on the class into a flat, analysis-friendly format. Returns: pandas.DataFrame: Combined diagnostic table sorted lexicographically by *dataset* and *reason*. """ gene_list = set(self.unified_matching_dict.keys()) - self.conversion_failed_identifiers df = self.create_dataset_conversion_dataframe(gene_list=gene_list, initialization_run=True) # In Shiddar and Wang in HLCA datasets contains FAM231C, and FAM231B. Very weird genes, IDTrack fails: # ENSG00000268674.2 FAM231C FAM231C # ENSG00000268674.2 FAM231B FAM231B # Report and remove from all conversion lists. ensembl_col = self._ENSEMBL_GENE_COLUMN dfa = df.drop_duplicates(subset=[ensembl_col, self.final_database], keep="first") # Use safe column accessor to handle potential duplicate column names edge cases ensembl_gene_duplicated_mask = self._get_column_as_series(dfa, ensembl_col).duplicated(keep=False) dfa_duplicated_ensembl = dfa[ensembl_gene_duplicated_mask] _gene_names_inconsistency = set( self._get_column_as_series(dfa_duplicated_ensembl, self.final_database).unique() ) final_db_col = self._get_column_as_series(df, self.final_database) gene_names_inconsistency = set( self._get_column_as_series(df[final_db_col.isin(_gene_names_inconsistency)], "Query ID").unique() ) final_database_chosen_single_ensembl_dict = dict() # Track source IDs that should be removed because their ensembl_gene was not chosen # (they would cause duplicates after the ensembl_gene override) final_database_duplicate_source_ids = set() final_db_col_dfa = self._get_column_as_series(dfa, self.final_database) dfa_duplicated_final_db = dfa[final_db_col_dfa.duplicated(keep=False)] for final_final_database_gene_name, df_subset in dfa_duplicated_final_db.groupby(self.final_database): # Due to the path, idtrack may choose different ensembl_gene # for corresponding final_database (e.g. HGNC). Here, make a rule for each final_database id. # You can see all possible ensembl ids in ensembl_gene_col = self._get_column_as_series(df_subset, ensembl_col) chosen_ensembl_id = sorted(ensembl_gene_col)[0] # choose first one final_database_chosen_single_ensembl_dict[final_final_database_gene_name] = chosen_ensembl_id # Mark source IDs with non-chosen ensembl_genes for removal to prevent duplicates non_chosen_rows = df_subset[ensembl_gene_col != chosen_ensembl_id] final_database_duplicate_source_ids.update(self._get_column_as_series(non_chosen_rows, "Query ID").tolist()) return { "gene_names_inconsistency": gene_names_inconsistency, "final_database_chosen_single_ensembl_dict": final_database_chosen_single_ensembl_dict, "final_database_duplicate_source_ids": final_database_duplicate_source_ids, }
[docs] def create_dataset_conversion_dataframe( self, gene_list: Union[list[str], pd.Index], initialization_run: bool ) -> pd.DataFrame: """Build a two-column mapping table for a single dataset's feature identifiers. The routine transforms every source identifier in *gene_list* into the target namespace defined by ``self.final_database`` and Ensembl gene IDs. The resulting convertible subset is written into a new :py:class:`pandas.DataFrame` with three columns—``_ENSEMBL_GENE_COLUMN``, ``self.final_database``, and ``"Query ID"``—while problematic identifiers are annotated or filtered according to the rules established during :py:meth:`_initialize`. When called by :py:meth:`_initialize` (*initialization_run* ``True``), the method writes provisional mappings without inspecting post-initialisation overrides. In subsequent calls (*initialization_run* ``False``) it resolves single-Ensembl ambiguities via ``self.datataset_conversion_dataframe_issues["final_database_chosen_single_ensembl_dict"]`` to guarantee a one-to-one relation between indices and feature rows. Args: gene_list (Union[list[str], pd.Index]): Ordered collection of source identifiers to convert for the current dataset. initialization_run (bool): ``True`` if invoked from :py:meth:`_initialize`; disables the single-Ensembl disambiguation step applied in later passes. Returns: pandas.DataFrame: Mapping table ready to become ``adata.var``. Columns are ``_ENSEMBL_GENE_COLUMN``, ``self.final_database``, and the original ``"Query ID"`` for traceability. Raises: AssertionError: If diagnostic sets such as :py:attr:`conversion_failed_identifiers` were not populated—indicating an incorrect call order—or if unexpected duplicate target IDs remain after processing. ValueError: When malformed conversion entry. """ # Ensure unified_matching_dict is populated (verifies proper initialization order). # The cached properties conversion_failed_identifiers and conversion_failed_but_consistent_identifiers # may legitimately be empty when all conversions succeed or when failures are all consistent/inconsistent. if len(self.unified_matching_dict) == 0: raise AssertionError("Possible function call order issue: unified_matching_dict is empty!") new_var_list = list() for the_id in gene_list: conversion = self.unified_matching_dict[the_id]["matching"]["last_node"] if the_id in self.conversion_failed_but_consistent_identifiers: modified_id = (f"{the_id}{HarmonizeFeatures.nonmatching_but_consistent_suffix}", the_id) new_var_list.append(list(modified_id) + [the_id]) # 1-to-n's and 1-to-0s are either removed or kept as consistent ones. # if there is multiple ensembl id matching, then len(conversion) != 1. # it keeps the first one for now, the possible lists are saved self.multiple_ensembl_list in `initialize` method elif initialization_run: first_match = conversion[0] # Validate the conversion tuple structure to prevent DataFrame column misalignment if not isinstance(first_match, (tuple, list)) or len(first_match) != 2: raise ValueError( f"Malformed conversion entry for identifier {the_id!r}: expected a 2-tuple " f"(ensembl_gene, {self.final_database}), got {first_match!r} with " f"type={type(first_match).__name__}, len={len(first_match) if hasattr(first_match, '__len__') else 'N/A'}" ) ensembl_gene, final_db_value = first_match new_var_list.append([ensembl_gene, final_db_value, the_id]) else: first_match = conversion[0] # Validate the conversion tuple structure if not isinstance(first_match, (tuple, list)) or len(first_match) != 2: raise ValueError( f"Malformed conversion entry for identifier {the_id!r}: expected a 2-tuple " f"(ensembl_gene, {self.final_database}), got {first_match!r} with " f"type={type(first_match).__name__}, len={len(first_match) if hasattr(first_match, '__len__') else 'N/A'}" ) ensembl_gene, final_db_value = first_match if ( final_db_value in self.datataset_conversion_dataframe_issues["final_database_chosen_single_ensembl_dict"] ): ensembl_gene = self.datataset_conversion_dataframe_issues[ "final_database_chosen_single_ensembl_dict" ][final_db_value] new_var_list.append([ensembl_gene, final_db_value, the_id]) df = pd.DataFrame(new_var_list, columns=[self._ENSEMBL_GENE_COLUMN, self.final_database, "Query ID"]) return df
[docs] def feature_harmonizer(self, dataset_name: str) -> tuple[ad.AnnData, int, int]: """Convert one dataset's feature space into the unified target namespace. This convenience wrapper reads a single ``.h5ad`` file, removes identifiers deemed unusable during :py:meth:`_initialize`, applies the conversion mapping from :py:meth:`create_dataset_conversion_dataframe`, and returns a *new* :py:class:`anndata.AnnData` object with harmonised features. The function is intentionally side-effect-free: it never alters the source file, and large temporary matrices are deleted immediately to minimise memory usage. Args: dataset_name (str): Key from :py:attr:`data_h5ad_dict` identifying which dataset to load and harmonise. Returns: tuple: * **resulting_adata** (:py:class:`anndata.AnnData`) - Dataset whose ``var`` now contains ``_ENSEMBL_GENE_COLUMN`` (matching Ensembl gene ID) as index and ``self.final_database`` as a column. * **t0** (int) - Number of features *before* filtering and harmonisation. * **t1** (int) - Number of features *after* the procedure (i.e., retained in *resulting_adata*). Raises: AssertionError: If duplicate Ensembl or target-database IDs slip past the conversion checks, which would break one-to-one mapping assumptions. """ adata = ad.read_h5ad(self.data_h5ad_dict[dataset_name]) t0 = adata.n_vars remove_bool = ~( adata.var.index.isin(self.conversion_failed_identifiers) | adata.var.index.isin(self.datataset_conversion_dataframe_issues["gene_names_inconsistency"]) | adata.var.index.isin(self.datataset_conversion_dataframe_issues["final_database_duplicate_source_ids"]) ) adata = adata[:, remove_bool] df = self.create_dataset_conversion_dataframe(gene_list=adata.var.index, initialization_run=False) # Use safe column accessor to check for duplicates ensembl_col = self._ENSEMBL_GENE_COLUMN ensembl_gene_col = self._get_column_as_series(df, ensembl_col) final_db_col = self._get_column_as_series(df, self.final_database) bool_remove = ensembl_gene_col.duplicated(keep=False).any() or final_db_col.duplicated(keep=False).any() if bool_remove: raise AssertionError("Unexpected duplicated entry!") df = df.set_index(ensembl_col) resulting_adata = ad.AnnData(X=adata.X, obs=adata.obs, var=df) # dtype=adata.X.dtype, t1 = adata.n_vars del adata gc.collect() return resulting_adata, t0, t1
@cached_property def dict_1_to_not_1(self) -> dict[str, set[str]]: """Collect identifiers involved in one-to-many or one-to-zero conversions. This helper scans :py:attr:`unified_matching_dict` and extracts every *source identifier* whose conversion to the target namespace is **not** a strict one-to-one mapping. Two situations are considered problematic: * **1 → 0 (conversion failure)** — no target identifier could be resolved. * **1 → n (ambiguous hit)** — multiple targets share the best score, preventing an unambiguous choice. The resulting dictionary is later consumed by :py:meth:`reporter_dict_creator` to populate the diagnostic attributes exposed to users and by :py:meth:`create_dataset_conversion_dataframe` to decide which features should be dropped or flagged in each :py:class:`anndata.AnnData` object. Returns: dict[str, set[str]]: ``{problem_class: {source_id₁, source_id₂, …}}`` where *problem_class* is either ``"1-to-0"`` or ``"1-to-n"``. """ result = dict() for query, value in self.unified_matching_dict.items(): if len(value["matching"]["target_id"]) != 1: result[query] = value["datasets_containing"] return result @cached_property def conversion_failed_identifiers(self) -> set[str]: """Return identifiers that could not be converted in at least one dataset. The property wraps :py:meth:`dict_1_to_not_1` and filters its ``"1-to-0"`` category so that downstream code can quickly query *irrecoverable* failures without iterating over the entire diagnostic structure. Returns: set[str]: Identifiers that failed conversion in at least one dataset or have inconsistent mappings (1-to-0, 1-to-n, or n-to-1 where not all datasets share the same mapping). """ result = set() for query, datasets_containing in self.dict_1_to_not_1.items(): # for 1-to-n and 1-to-0, if it is not shared across all the datasets, add remove list. if len(datasets_containing) != self.n_datasets: result.add(query) for query, datasets_containing_2 in self.dict_n_to_1.items(): # for one gene in n (among n-to-1), if it is not shared across all the datasets, add remove list. if len(datasets_containing_2) != self.n_datasets: result.add(query) return result @cached_property def conversion_failed_but_consistent_identifiers(self) -> set[str]: """Identify non-convertible identifiers that are consistently absent across *all* datasets. An identifier that fails conversion in *every* dataset can be retained (or at least logged once) without jeopardising dataset comparability. This property computes the set intersection of :py:attr:`conversion_failed_identifiers` across datasets and makes the result available for selective retention or downstream visualisation. Returns: set[str]: Identifiers that were never convertible but appeared in every dataset examined. """ _result1 = {i for i, _ in self.dict_n_to_1.items() if i not in self.conversion_failed_identifiers} _result2 = {i for i, _ in self.dict_1_to_not_1.items() if i not in self.conversion_failed_identifiers} result = set.union(_result1, _result2) return result @cached_property def unified_matching_dict(self): """Expose the full source-to-target identifier mapping produced by IDTrack. The dictionary is created during :py:meth:`_initialize` when the IDTrack graph is first queried. Keys are *source identifiers* (as found in input files); values are **all** candidate target IDs returned by the graph query, ordered by decreasing score. A value may therefore be * a single-element list (unambiguous one-to-one), * a multi-element list (ambiguous one-to-n), or * an empty list (1-to-0 conversion failure). Public access to this attribute enables advanced users to perform their own diagnostics or to reproduce the algorithm's decisions outside the class. Returns: dict[str, list[str]]: Mapping ``{source_id: [target_id₁, target_id₂, …]}`` in the order delivered by the IDTrack query. """ matching_dict = self.get_idtrack_matchings_for_all_datasets() unified_dict = dict() for dataset_name, matching in matching_dict.items(): for m in matching: if m["query_id"] not in unified_dict: unified_dict[m["query_id"]] = {"matching": m, "datasets_containing": []} unified_dict[m["query_id"]]["datasets_containing"].append(dataset_name) return unified_dict
[docs] def get_idtrack_matchings_for_all_datasets(self) -> dict[str, pd.DataFrame]: """Return raw ID-Track matchings for every dataset in the project. This helper exposes the unfiltered mapping tables produced by ID-Track so that users can inspect exactly how each *source identifier* was converted (or failed to convert) in every individual dataset. Internally it triggers :py:meth:`run_idtrack_for_single_dataset` for any dataset that has not yet been processed, caches the resulting tables in memory, and then assembles a ``{dataset_name: dataframe}`` dictionary whose keys align one-to-one with :py:data:`data_h5ad_dict`. Each returned :py:class:`pandas.DataFrame` includes at least the following columns: ``source_id``, ``target_id``, ``conversion_status``, ``reason``, and any custom metadata injected by :py:class:`idtrack.api.API`. Returns: dict[str, pandas.DataFrame]: Mapping of dataset alias to its full, row-level ID-Track matching table. The dictionary order follows the insertion order of :py:data:`data_h5ad_dict`. """ result_dataset = dict() for dataset_name, dataset_path in self.data_h5ad_dict.items(): dataset_pickle = os.path.join( self.project_local_repository, f"idtrack_result_{self.project_name}_{dataset_name}.pickle" ) switch_inconsistency = None if os.path.isfile(dataset_pickle) and os.access(dataset_pickle, os.R_OK): with open(dataset_pickle, "rb") as handle: matching_list = pickle.load(handle) # noqa: S301 _saved_id_list = sorted([i["query_id"] for i in matching_list]) _dataset_id_list = sorted(self.extract_source_identifiers_from_anndata(dataset_path=dataset_path)) # if the id lists are different than the provided one. switch_inconsistency = _saved_id_list != _dataset_id_list if switch_inconsistency is not False: if switch_inconsistency is None and self.verbose_level > 2: self.log.debug(f"Pickle not found: `{dataset_name}`. Calculating IDTrack matchings.") elif switch_inconsistency is True and self.verbose_level > 2: self.log.debug( "Inconsistent IDs with pickle object and the " f"dataset: `{dataset_name}`. Calculating IDTrack matchings." ) matching_list = self.run_idtrack_for_single_dataset(dataset_name, dataset_path) with open(dataset_pickle, "wb") as handle: pickle.dump(matching_list, handle) # noqa: S301 result_dataset[dataset_name] = matching_list if self.verbose_level > 1: binned_conversions = self.idt.classify_multiple_conversion(matching_list) self.log.info(f"{dataset_name}") self.idt.print_binned_conversion(binned_conversions) # Do not allow n_to_1 within a certain dataset. self.n_to_1_within_individual_dataset(dataset_name=dataset_name, dataset_matching_list=matching_list) return result_dataset
[docs] def extract_source_identifiers_from_anndata(self, dataset_path: str) -> list[str]: """Load an ``.h5ad`` file and harvest the raw feature identifiers. To prepare inputs for ID-Track, this routine opens the single-cell expression matrix at *dataset_path*, reads the ``.var`` DataFrame, and extracts either the ``"gene_id"`` field (if present) or the index itself as the *source identifier*. Identifiers are returned in file order so that downstream procedures can preserve the original feature ordering when reconstructing matrices. Args: dataset_path (str): Absolute or project-relative path to an ``.h5ad`` file containing a valid :py:class:`anndata.AnnData` object. Returns: list[str]: Ordered list of identifier strings exactly as they appear in the source file. """ adata = ad.read_h5ad(dataset_path, backed="r") result = list(adata.var_names) del adata gc.collect() return result
[docs] def run_idtrack_for_single_dataset(self, dataset_name: str, dataset_path: str) -> pd.DataFrame: """Convert identifiers for one dataset and cache the raw ID-Track output. Given a dataset alias and its on-disk location, this method: 1. Calls :py:meth:`extract_source_identifiers_from_anndata` to obtain the feature list. 2. Feeds those identifiers to :py:class:`idtrack.api.API` and collects the per-feature match results. 3. Stores the resulting :py:class:`pandas.DataFrame` inside the :py:attr:`_idtrack_matchings_per_dataset` cache so repeated calls are O(1). 4. Updates :py:attr:`unified_matching_dict` so that cross-dataset diagnostics remain consistent. Users rarely call this directly—:py:meth:`get_idtrack_matchings_for_all_datasets` handles the orchestration—but it remains public for advanced, dataset-by-dataset debugging. Args: dataset_name (str): Human-friendly alias used as the key inside diagnostic dictionaries. dataset_path (str): Absolute or project-relative ``.h5ad`` path passed straight to :py:meth:`extract_source_identifiers_from_anndata`. Returns: pandas.DataFrame: Full ID-Track matching table for *dataset_name* with columns ``source_id``, ``target_id``, ``conversion_status``, and any extra metadata returned by the API. """ self._initialize_idt() gene_list = self.extract_source_identifiers_from_anndata(dataset_path) matching_list = self.idt.convert_identifier_multiple( gene_list, final_database=self.final_database, to_release=self.target_ensembl_release, pbar_prefix=dataset_name, verbose=self.verbose_level > 0, ) return matching_list
[docs] def n_to_1_within_individual_dataset(self, dataset_name: str, dataset_matching_list: list[dict]) -> None: """Detect n-to-1 collapses inside one dataset and populate diagnostic caches. In the ID-Track context *n-to-1* means **several source identifiers** (``query_id``) converging on the **same target identifier** (``matched_id``). Such collapses are problematic because they merge distinct features when building the harmonised expression matrix. This helper inspects the raw matching rows for a single dataset, discovers all many-to-one events (including those that passed through the *alternative target database*), and records the results in a family of per-project dictionaries so that later stages—merging, filtering, and reporting—can make informed decisions. The routine never returns a value; instead it mutates the following public attributes: * :py:attr:`dict_n_to_1` - ``{matched_id: [dataset₁, dataset₂, …]}`` listing every dataset where the collapse occurred. * :py:attr:`dict_n_to_1_with_query` - ``{matched_id: {(query_id₁,…): [dataset]}}`` for cases where the ``matched_id`` also appears in the collapsing query set. * :py:attr:`dict_n_to_1_with_query_reverse` - ``{query_id: {matched_id: [dataset]}}`` for a query-centric view. * :py:attr:`dict_n_to_1_without_query` - collapses where the target never appears in its own query set. Returns ``None``: All information is stored on the instance for subsequent pipeline stages. Args: dataset_name (str): Human-readable alias used throughout the project for *this* dataset. dataset_matching_list (list[dict]): Raw per-feature matchings returned by :py:class:`idtrack.api.API`. Each dictionary must provide at least the keys ``"query_id"``, ``"last_node"``, and ``"final_database"``. """ reverse_dict_target: dict[str, list[str]] = dict() reverse_dict_alternative_target: dict[str, list[str]] = dict() for matching_entry in dataset_matching_list: # to capture matching without alternative target database. # note that final database is `None` for 1-to-0, `ensembl_gene` for alternative target database. query_id = matching_entry["query_id"] if matching_entry["final_database"] == self.final_database: for _, j in matching_entry["last_node"]: # for loop if it is 1-to-n if j not in reverse_dict_target: reverse_dict_target[j] = [] reverse_dict_target[j].append(query_id) elif matching_entry["final_database"] == "ensembl_gene": for i, _ in matching_entry["last_node"]: # for loop if it is 1-to-n if i not in reverse_dict_alternative_target: reverse_dict_alternative_target[i] = [] reverse_dict_alternative_target[i].append(query_id) # simply get the ids that has 1-to-n in reverse orientation, so get n-to-1 set_n_to_1_in_dataset = set() for the_dict_of_interest in [reverse_dict_target, reverse_dict_alternative_target]: for matched_id, query_id_list in the_dict_of_interest.items(): if len(query_id_list) > 1: for query_id in query_id_list: # if there is query and mathed id are the same, exlude this query id from the n-to-1 set. # basically assign it to be the true matching of the conversion for integration tasks if query_id != matched_id: # and matching is 1:1. note that if matching is actually 1-to-n, # it is resolved by other parts of the code. set_n_to_1_in_dataset.add(query_id) # For debugging and reporting if self.debugging_variables: query_id_list_tuple = tuple(sorted(query_id_list)) if matched_id in query_id_list_tuple: if matched_id not in self.dict_n_to_1_with_query: self.dict_n_to_1_with_query[matched_id] = dict() if query_id_list_tuple not in self.dict_n_to_1_with_query[matched_id]: self.dict_n_to_1_with_query[matched_id][query_id_list_tuple] = list() self.dict_n_to_1_with_query[matched_id][query_id_list_tuple].append(dataset_name) for query_id in query_id_list_tuple: if query_id not in self.dict_n_to_1_with_query_reverse: self.dict_n_to_1_with_query_reverse[query_id] = dict() if matched_id not in self.dict_n_to_1_with_query_reverse[query_id]: self.dict_n_to_1_with_query_reverse[query_id][matched_id] = list() self.dict_n_to_1_with_query_reverse[query_id][matched_id].append(dataset_name) else: if query_id_list_tuple not in self.dict_n_to_1_without_query: self.dict_n_to_1_without_query[query_id_list_tuple] = list() self.dict_n_to_1_without_query[query_id_list_tuple].append(dataset_name) for nto1 in set_n_to_1_in_dataset: if nto1 not in self.dict_n_to_1: self.dict_n_to_1[nto1] = [] self.dict_n_to_1[nto1].append(dataset_name)
[docs] def unify_multiple_anndatas( self, mode: Literal["union", "intersect"] = "union", obs_columns_to_keep: Optional[list[str]] = None, numeric_var_columns: Optional[set[str]] = None, numeric_obs_columns: Optional[set[str]] = None, # for hdca it is {"age"} handle_anndata_key: str = "handle_anndata", ) -> ad.AnnData: """Merge several study-specific :py:class:`anndata.AnnData` objects into a single, consolidated dataset. This helper finalises the *feature-harmonisation* workflow. Earlier stages ensure that every source study expresses its features (e.g. genes or proteins) in a consistent identifier namespace and that per-cell metadata follow a shared schema. *unify_multiple_anndatas* takes those already normalised objects—stored in :py:attr:`~HarmonizeFeatures.data_h5ad_dict`—and fuses them into one coherent :py:class:`~anndata.AnnData` ready for joint analysis (dimensionality reduction, batch correction, integrated clustering, etc.). Two strategies govern how the function reconciles mismatched feature sets: * ``"union"`` (default) preserves the superset of all identifiers. If a particular study lacks a feature, its expression values are imputed as exact zeros. This choice maximises information retention at the cost of a sparse matrix with assay-dependent missingness. * ``"intersect"`` retains only the identifiers present in *every* study, implicitly discarding features unique to a subset. This yields a denser matrix that is easier to factorise but sacrifices potentially informative study-specific biology. Beyond concatenating the main :py:attr:`~anndata.AnnData.X` matrices, the routine also harmonises associated annotations: * **.var (feature annotations)** All columns are outer-joined across studies. Non-shared categorical values are unioned; numeric columns specified in *numeric_var_columns* are cast to floating point and NaNs inserted where data are missing. In *union* mode an additional boolean ``"intersection"`` column flags whether a feature survived the *intersect* filter, enabling fast subsetting later. * **.obs (cell annotations)** Each original column is kept if its name appears in *obs_columns_to_keep* **or** if it exists in every study. Missing columns are created and populated with ``pandas.NA``. Columns listed in *numeric_obs_columns* are coerced to ``float64``. A new column named *handle_anndata_key* stores the handle (dictionary key) that identifies the originating study, making it trivial to stratify analyses. * **.layers, .obsp, .varp, .uns** This method uses :py:meth:`anndata.AnnData.concat` for this. The implementation is mindful of scalability: concatenation leverages SciPy CSR/CSC sparse formats, avoiding densification, and streaming allocation prevents double memory use for extremely large datasets. Args: mode (Literal["union", "intersect"]): Strategy for reconciling discordant feature sets. ``"union"`` keeps every identifier observed across studies (padding absent entries with zeros); ``"intersect"`` restricts the result to identifiers common to *all* studies. Defaults to ``"union"``. obs_columns_to_keep (list[str] | None): Names of per-cell metadata columns that must survive the merge even if they appear in only a subset of studies (e.g. *cell_type*, *donor_age*). When a column is missing from a particular study, it is inserted and filled with ``pandas.NA``. Provide an empty list to allow the routine to decide purely by intersection; ``None`` means “no user preference”. numeric_var_columns (set[str] | None): Columns in ``.var`` that should retain numeric dtype. The function validates that each specified column can be losslessly converted to floating point; otherwise it raises :py:class:`ValueError`. Non-listed columns default to ``category`` dtype to conserve memory. If ``None`` an empty set is assumed. numeric_obs_columns (set[str] | None): Analogous to *numeric_var_columns* but applied to ``.obs``. Conversions are performed *after* the table has been unioned, ensuring consistent dtype across the final concatenated frame. If ``None`` an empty set is assumed. handle_anndata_key (str): Name of the column inserted into ``.obs`` that records the dictionary key of the source study. This provenance tag facilitates stratified visualisation (e.g. UMAP coloured by batch) and downstream batch-correction utilities that expect a “batch” column. Defaults to ``"handle_anndata"``. Returns: anndata.AnnData: A fully merged expression matrix whose ``.X`` contains either the union or intersection of all study features. Index ordering follows the order in which studies were supplied, ensuring deterministic output for reproducible pipelines. The result inherits sparse/dense representation from the first study unless *mode* forces feature padding, in which case CSR/CSC is chosen automatically to keep memory use in check. Raises: ValueError: If *mode* is not ``"union"`` or ``"intersect"``; if any column listed in *numeric_var_columns* or *numeric_obs_columns* fails numeric coercion; or if feature identifiers clash across studies after harmonisation (e.g. two studies mapping different genes to the same ID). AssertionError: If duplicate cell or feature indices are detected post-merge, a condition that would break many Scanpy workflows and indicates upstream validation errors. Notes: *Performance considerations* The operation is CPU-bound when aligning large sparse matrices. For datasets exceeding ~1 million cells, empirical benchmarks show that running on Python 3.11 with MKL yields a 2-3x speed-up over Python 3.8 due to better sparse BLAS threading. Provide pre-compressed datasets (``hdf5``, ``zarr``) to further lower I/O overhead. *Thread safety* The method is re-entrant but **not** thread-safe because it mutates the source :py:class:`~anndata.AnnData` objects in-place to reduce copying. Invoke one instance per process or deep-copy the inputs beforehand if concurrent harmonisation is required. *Extensibility* Sub-classes may override private hooks :py:meth:`_before_concat`, :py:meth:`_after_concat`, and :py:meth:`_merge_uns` to refine behaviour without re-implementing the full algorithm. """ if numeric_obs_columns is None: numeric_obs_columns = set() if numeric_var_columns is None: numeric_var_columns = set() if obs_columns_to_keep is None: obs_columns_to_keep = list() # numeric_obs_columns: set[str] = numeric_obs_columns if numeric_obs_columns is not None else set() # numeric_var_columns: set[str] = numeric_var_columns if numeric_var_columns is not None else set() # obs_columns_to_keep: list[str] = obs_columns_to_keep if obs_columns_to_keep is not None else list() _adata_var = pd.DataFrame() _adata_obs = pd.DataFrame(columns=[handle_anndata_key] + obs_columns_to_keep) _adata_var.index = _adata_var.index.astype(str) # to prevent ImplicitModificationWarning later on _adata_obs.index = _adata_obs.astype(str) # to prevent ImplicitModificationWarning later on _adata_x = csr_matrix(np.zeros((0, 0)).astype(np.float32)) adata = ad.AnnData(X=_adata_x, obs=_adata_obs, var=_adata_var) remove_var_columns = list() pbar = tqdm.tqdm(self.data_h5ad_dict.items(), ncols=140) for ind, (handle, _) in enumerate(pbar): adata_study, t0, t1 = self.feature_harmonizer(dataset_name=handle) adata_study.obs_names = adata_study.obs_names.astype(str) # to prevent ImplicitModificationWarning later on adata_study.var_names = adata_study.var_names.astype(str) # to prevent ImplicitModificationWarning later on adata_study.obs = adata_study.obs[obs_columns_to_keep] pbar.set_postfix( { "dataset": handle, "study_var": adata_study.n_vars, "union_var": adata.n_vars, "dbh": (t0 - t1), # deleted by harmonizer } ) pbar.refresh() # adata_study.obs["sample_ID"] = adata_study.obs["sample_ID"].astype(str) used in hdca? adata_study.obs[handle_anndata_key] = np.full(adata_study.n_obs, handle) adata_study.obs["unified_index"] = f"{handle}_" + adata_study.obs.index.astype(str) adata_study.obs.set_index("unified_index", drop=True, inplace=True) final_databases_tested = ["HGNC Symbol"] if self.final_database not in final_databases_tested: self.log.warning( f"Final database other than following databases is not tested: {final_databases_tested}" ) adata_study.var.drop(columns=["Query ID"], inplace=True) new_final_database_var_column = f"{self.converted_id_column}_{handle}" adata_study.var.rename( columns={self.final_database: new_final_database_var_column}, inplace=True, errors="raise" ) if mode == "union" or ind == 0: var_map = pd.merge( adata.var, adata_study.var, how="outer", left_index=True, right_index=True, ) adata = ad.concat( adatas=[adata, adata_study], join="outer", fill_value=0.0, ) if mode != "union": remove_var_columns.append(new_final_database_var_column) elif mode == "intersect": var_map = pd.merge( adata.var, adata_study.var, how="inner", # changed from "outer" to "inner" left_index=True, right_index=True, ) adata = ad.concat( adatas=[adata, adata_study], join="inner", # changed from "outer" to "inner" ) remove_var_columns.append(new_final_database_var_column) else: raise ValueError(f"Unexpected harmonization state for dataset {handle!r}.") # create a mask to array and put it in `uns`, to specify which # are added `0`s because of the `idtrack` process. # not needed: look at the adata.var if np.any(var_map.index.duplicated()): raise AssertionError("np.any(var_map.index.duplicated())") adata.var = var_map.loc[adata.var_names] del adata_study gc.collect() # Check unified gene naming if len(set(adata.var.index)) != len(adata.var.index): raise AssertionError("len(set(adata.var.index)) != len(adata.var.index)") gene_name_columns = [i for i in adata.var if i.startswith(f"{self.converted_id_column}_")] gene_name_columns = adata.var[gene_name_columns].values gene_names_unified = list() gene_names_inconsistency = dict() for ind_i, i in enumerate(gene_name_columns): i = i[~pd.isna(i)] if len(set(i)) != 1: gene_names_inconsistency[adata.var.index[ind_i]] = { adata.var.columns[j].split(f"{self.converted_id_column}_")[1]: i[j] for j in range(len(i)) } else: gene_names_unified.append(i[0]) if len(gene_names_inconsistency) != 0: raise ValueError("len(gene_names_inconsistency) != 0") with warnings.catch_warnings(): warnings.simplefilter("ignore") adata = adata[:, ~adata.var.index.isin(gene_names_inconsistency.keys())] if self.converted_id_column in adata.var.columns: raise AssertionError("self.converted_id_column in adata.var.columns") adata.var[self.converted_id_column] = gene_names_unified if self.converted_id_column not in adata.var.columns: raise AssertionError("self.converted_id_column not in adata.var.columns") # For `intersect` mode, all these columns are gonna be the same, so delete. if len(remove_var_columns) > 0: adata.var = adata.var.drop(columns=remove_var_columns) # Return for numeric_obs_column in numeric_obs_columns: if not pd.to_numeric(adata.obs[numeric_obs_column], errors="coerce").notna().all(): raise ValueError(f"Non-numeric value in adata.obs at {numeric_obs_column!r}.") adata.obs[numeric_obs_column] = adata.obs[numeric_obs_column].astype(float) for column in adata.obs.columns: if column in numeric_obs_columns: continue # Convert to string, remove categories # Replace None or 'nan' values with 'NA' # Convert back to categorical adata.obs[column] = ( adata.obs[column] .astype(str) # ensure string .replace(MISSING_VALUES, DB.placeholder_na) # no inplace .astype("category") # recast ) for numeric_var_column in numeric_var_columns: if not pd.to_numeric(adata.var[numeric_var_column], errors="coerce").notna().all(): raise ValueError(f"Non-numeric value in adata.var at {numeric_var_column!r}.") adata.var[numeric_var_column] = adata.var[numeric_var_column].astype(float) for column in adata.var.columns: if column in numeric_var_columns: continue adata.var[column] = ( adata.var[column] .astype(str) .replace(MISSING_VALUES, DB.placeholder_na) # no inplace .astype("category") ) if mode == "union": intersect_column_name = "intersection" adata.var[intersect_column_name] = self.create_intersection_column_values(adata.var.copy()) adata.var[intersect_column_name] = adata.var[intersect_column_name].astype(int) return adata
[docs] def create_intersection_column_values(self, adata_var: pd.DataFrame) -> np.ndarray: """Flag features present in **every** study after harmonisation. The merged ``.var`` table produced by :py:meth:`unify_multiple_anndatas` contains one gene-symbol column per study, each named ``f"{self.converted_id_column}_{handle}"`` where *handle* is the dictionary key that identifies the originating dataset. A cell in one of those columns holds the *gene symbol* originally reported by the study, or :py:data:`idtrack._db.DB.placeholder_na` if the gene was absent or could not be mapped to the target namespace. This helper collapses the per-study presence/absence information into a single boolean *intersection* flag, later exposed to users as ``adata.var["intersection"]``. A value of ``1`` indicates that the feature survived the *intersect* filter—i.e., it has a valid symbol in **all** studies—whereas ``0`` marks features missing from at least one dataset. The resulting NumPy vector is inserted by the caller; this routine is intentionally pure and side-effect free. Args: adata_var (pandas.DataFrame): The ``.var`` table of the *already concatenated* :py:class:`anndata.AnnData` object. It must contain one or more columns whose names start with ``f"{self.converted_id_column}_"``; each such column is assumed to encode the gene symbol for a particular study. Returns: numpy.ndarray: A 1-D array of ``int`` (values ``0`` or ``1``) with ``len(adata_var)`` elements. The *i*-th entry equals ``1`` if the *i*-th feature is present (non- :py:data:`idtrack._db.DB.placeholder_na`) in **every** per-study symbol column; otherwise it is ``0``. """ gene_name_columns = [i for i in adata_var.columns if i.startswith(f"{self.converted_id_column}_")] df = adata_var[gene_name_columns].copy() df = df != DB.placeholder_na return (df.sum(axis=1) == len(gene_name_columns)).astype(int).values