#!/usr/bin/env python3
"""Utility functions for the _external_mappers module.
This module provides:
- Database and species name canonicalization
- ID version stripping for Ensembl and RefSeq identifiers
- DataFrame utilities for standardizing output format
- Helper functions for chunking, JSON serialization, etc.
"""
# Kemal Inecik
# k.inecik@gmail.com
from __future__ import annotations
import contextlib
import io
import json
import logging
import re
import typing
from collections.abc import Iterable
from typing import Any
import pandas as pd
from idtrack._external_mappers._constants import _DB_ALIASES, _SPECIES_ALIASES, SUPPORTED_DBS
__all__ = [
"canonical_db",
"canonical_species",
"_species_for_mygene",
"strip_version",
"_as_list",
"_unique_not_null",
"_chunker",
"_json",
"_is_bare_numeric",
"_empty_result",
"_add_mapping_column",
"_ensure_all_inputs",
"_suppress_stdout_stderr",
"logger",
"check_optional_dependencies",
"raise_missing_dependency",
]
# ----------------------------- Dependency Info ------------------------------ #
# Registry of optional dependencies for this module
OPTIONAL_DEPENDENCIES: dict[str, dict[str, Any]] = {
"gget": {
"import_name": "gget",
"pip_name": "gget",
"features": ["gget backend", "ortholog utilities"],
"description": "Query Ensembl REST API for gene information",
},
"mygene": {
"import_name": "mygene",
"pip_name": "mygene",
"features": ["mygene backend"],
"description": "Query MyGene.info API for ID mapping",
},
"pybiomart": {
"import_name": "pybiomart",
"pip_name": "pybiomart",
"features": ["pybiomart backend"],
"description": "Query Ensembl BioMart for ID mapping",
},
"gprofiler-official": {
"import_name": "gprofiler",
"pip_name": "gprofiler-official",
"features": ["gprofiler backend"],
"description": "Query g:Profiler API for ID mapping",
},
"biopython": {
"import_name": "Bio",
"pip_name": "biopython",
"features": ["ortholog utilities", "sequence alignment"],
"description": "Biological sequence analysis tools",
},
}
def _check_dependency(dep_key: str) -> bool:
"""Check if a single dependency is available."""
if dep_key not in OPTIONAL_DEPENDENCIES:
return False
import_name = OPTIONAL_DEPENDENCIES[dep_key]["import_name"]
try:
__import__(import_name)
return True
except ImportError:
return False
[docs]
def check_optional_dependencies(warn: bool = True) -> dict[str, bool]:
"""Check which optional dependencies are installed.
Args:
warn: When ``True``, emit a warning summarizing missing packages.
Returns:
Mapping from dependency key to availability.
"""
import warnings
status: dict[str, bool] = {}
missing: list[str] = []
for dep_key in OPTIONAL_DEPENDENCIES:
available = _check_dependency(dep_key)
status[dep_key] = available
if not available:
missing.append(dep_key)
if warn and missing:
pip_names = [OPTIONAL_DEPENDENCIES[d]["pip_name"] for d in missing]
warning_msg = (
f"\n"
f"{'=' * 70}\n"
f"idtrack._external_mappers: Missing optional dependencies\n"
f"{'=' * 70}\n"
f"\n"
f"The following optional packages are not installed:\n"
)
for dep_key in missing:
info = OPTIONAL_DEPENDENCIES[dep_key]
features = ", ".join(info["features"])
warning_msg += f" - {info['pip_name']}: {info['description']} (used by: {features})\n"
warning_msg += (
f"\n"
f"To install all external mapper dependencies:\n"
f" pip install {' '.join(pip_names)}\n"
f"\n"
f"Or install individually as needed.\n"
f"{'=' * 70}\n"
)
warnings.warn(warning_msg, UserWarning, stacklevel=2)
return status
[docs]
def raise_missing_dependency(
dep_key: str,
feature: str | None = None,
original_error: BaseException | None = None,
) -> typing.NoReturn:
"""Raise a detailed error for a missing optional dependency.
Args:
dep_key: Key in :py:data:`OPTIONAL_DEPENDENCIES` (e.g. ``"gget"``, ``"mygene"``).
feature: Description of the feature that requires the dependency.
original_error: Optional original ImportError to chain.
Raises:
RuntimeError: Always, with detailed installation instructions.
"""
if dep_key not in OPTIONAL_DEPENDENCIES:
# Fallback for unknown dependencies
msg = (
f"\n"
f"{'=' * 70}\n"
f"Missing dependency: {dep_key}\n"
f"{'=' * 70}\n"
f"\n"
f"Please install the required package:\n"
f" pip install {dep_key}\n"
f"{'=' * 70}\n"
)
raise RuntimeError(msg) from original_error
info = OPTIONAL_DEPENDENCIES[dep_key]
pip_name = info["pip_name"]
description = info["description"]
features = ", ".join(info["features"])
if feature is None:
feature = features
msg = (
f"\n"
f"{'=' * 70}\n"
f"Missing optional dependency: {pip_name}\n"
f"{'=' * 70}\n"
f"\n"
f"The {feature!r} feature requires {pip_name!r}.\n"
f"\n"
f"Package info:\n"
f" - Name: {pip_name}\n"
f" - Description: {description}\n"
f" - Used by: {features}\n"
f"\n"
f"To install this dependency:\n"
f" pip install {pip_name}\n"
f"\n"
f"To install all external mapper dependencies:\n"
f" pip install gget mygene pybiomart gprofiler-official biopython\n"
f"{'=' * 70}\n"
)
raise RuntimeError(msg) from original_error
# ------------------------------- Logging ------------------------------------ #
def _setup_logger() -> logging.Logger:
"""Set up and return the module logger with proper configuration."""
log = logging.getLogger("idtrack.external_mappers")
if not log.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("[%(levelname)s] %(message)s"))
log.addHandler(handler)
log.setLevel(logging.INFO)
return log
logger = _setup_logger()
[docs]
@contextlib.contextmanager
def _suppress_stdout_stderr(enabled: bool):
"""Context manager to squelch noisy stdout/stderr emissions."""
if not enabled:
yield
return
with contextlib.redirect_stdout(io.StringIO()), contextlib.redirect_stderr(io.StringIO()):
yield
# ----------------------------- Canonical DBs -------------------------------- #
[docs]
def canonical_db(db: str) -> str:
"""Return canonical DB key given a user-friendly/alias string."""
if not isinstance(db, str) or not db.strip():
raise ValueError("db must be a non-empty string")
db_norm = db.strip().lower()
if db_norm in _DB_ALIASES:
return _DB_ALIASES[db_norm]
if db_norm.startswith("ensg"):
return "ensembl_gene"
if db_norm.startswith("enst"):
return "ensembl_transcript"
if db_norm.startswith("ensp"):
return "ensembl_protein"
raise ValueError(f"Unsupported or unknown db alias: {db!r}. " f"Supported canonical DBs: {sorted(SUPPORTED_DBS)}")
# -------------------------- Species normalization --------------------------- #
[docs]
def canonical_species(species: str | None) -> str:
"""Return canonical organism code (g:Profiler / Ensembl style).
Supported out-of-the-box: human → ``hsapiens``, mouse → ``mmusculus``, pig → ``sscrofa``.
Args:
species: Species code/alias. If ``None`` or empty, defaults to ``"hsapiens"``.
Returns:
str: Canonical organism code.
"""
if not species:
return "hsapiens"
s = species.strip().lower()
return _SPECIES_ALIASES.get(s, s)
[docs]
def _species_for_mygene(species: str | None) -> str:
"""Return the MyGene-compatible common name for a canonical species code."""
cs = canonical_species(species)
if cs == "hsapiens":
return "human"
if cs == "mmusculus":
return "mouse"
if cs == "sscrofa":
return "pig"
return cs
# ----------------------------- Helper utils -------------------------------- #
# Regex patterns for ID version stripping
_ENS_RE = re.compile(r"^(ENS[A-Z]*\d+)")
_REFSEQ_VER_RE = re.compile(r"^([NX][MRP]_\d+)")
[docs]
def strip_version(ididid: str) -> str:
r"""Strip version suffixes from Ensembl and RefSeq identifiers.
Args:
ididid: Identifier to strip.
Returns:
Identifier without a version suffix (or unchanged if none is present).
"""
if not isinstance(ididid, str):
return ididid # type: ignore[return-value]
x = ididid.strip()
m = _ENS_RE.match(x)
if m:
return m.group(1)
m = _REFSEQ_VER_RE.match(x)
if m:
return m.group(1)
return x
[docs]
def _as_list(v: Any) -> list[Any]:
"""Coerce a value to a list.
- ``None`` returns ``[]``
- list/tuple/set returns ``list(v)``
- scalar values return ``[v]``
Args:
v: Value to coerce.
Returns:
list[Any]: ``v`` represented as a list.
"""
if v is None:
return []
if isinstance(v, (list, tuple, set)):
return list(v)
return [v]
[docs]
def _unique_not_null(seq: Iterable[Any]) -> list[str]:
"""Return unique non-null string values from a sequence, preserving order.
Filters out:
- ``None`` values
- empty / whitespace-only strings
- stringified null values (``"nan"``, ``"none"``, ``"null"``; case-insensitive)
Args:
seq: Sequence of values to normalize and filter.
Returns:
list[str]: Unique non-null string values in first-seen order.
"""
seen: set[str] = set()
out: list[str] = []
for v in seq:
if v is None:
continue
s = str(v).strip()
if not s or s.lower() in {"nan", "none", "null"}:
continue
if s not in seen:
seen.add(s)
out.append(s)
return out
[docs]
def _chunker(items: list[Any], size: int) -> typing.Iterator[list[Any]]:
"""Yield successive chunks of a list."""
for i in range(0, len(items), size):
yield items[i : i + size]
[docs]
def _json(obj: Any) -> str:
"""Serialize an object to a compact JSON string (unicode preserved)."""
return json.dumps(obj, separators=(",", ":"), ensure_ascii=False)
[docs]
def _is_bare_numeric(s: str) -> bool:
"""Return ``True`` if a string consists entirely of digits."""
return bool(re.fullmatch(r"\d+", str(s).strip()))
# ---------------------------- Utilities/Finalizers -------------------------- #
[docs]
def _empty_result() -> pd.DataFrame:
"""Return an empty standardized mapping result DataFrame."""
return pd.DataFrame(
columns=[
"input_id",
"input_db",
"output_id",
"output_db",
"method",
"release_used",
"mapping",
"metadata_json",
]
)
[docs]
def _add_mapping_column(df: pd.DataFrame) -> pd.DataFrame:
"""Add or recompute the ``mapping`` column on a standardized mapping DataFrame.
The ``mapping`` value is the per-input cardinality:
``1:0`` (no outputs), ``1:1`` (one unique output), ``1:n`` (multiple outputs).
Args:
df: Standardized mapping DataFrame.
Returns:
pd.DataFrame: DataFrame with a (re)computed ``mapping`` column.
"""
if df is None:
return _empty_result()
if df.empty:
if "mapping" not in df.columns:
df["mapping"] = pd.Series(dtype=object)
return df
if "output_id" not in df.columns or "input_id" not in df.columns:
# Fallback: mark everything as unmapped
df["mapping"] = "1:0"
return df
# Drop any existing mapping column; we'll recompute it from scratch.
if "mapping" in df.columns:
df = df.drop(columns=["mapping"])
inputs = df["input_id"].astype(str)
outputs = df["output_id"]
out_str = outputs.astype(str)
valid_mask = ~outputs.isna() & out_str.str.strip().ne("") & ~out_str.str.lower().isin({"nan", "none", "null"})
# Number of unique valid outputs per input
counts_by_input = df[valid_mask].groupby(inputs[valid_mask])["output_id"].nunique(dropna=True)
mapping_by_input: dict[str, str] = {}
for inp_val in inputs.unique():
key = str(inp_val)
n = int(counts_by_input.get(key, 0) or 0)
if n == 0:
mapping_by_input[key] = "1:0"
elif n == 1:
mapping_by_input[key] = "1:1"
else:
mapping_by_input[key] = "1:n"
df["mapping"] = inputs.map(mapping_by_input)
return df