#!/usr/bin/env python3
# Kemal Inecik
# k.inecik@gmail.com
import copy
import itertools
import logging
import os
import random
import time
import traceback
from abc import ABC
from math import ceil
from typing import Any, Optional, Union
import networkx as nx
import numpy as np
from tqdm import tqdm
from idtrack._db import DB, EmptyConversionMetricsError
from idtrack._track import Track
[docs]
class TrackTests(Track, ABC):
"""Developer-facing integrity-test harness for :py:class:`~idtrack.Track`.
This module defines :py:class:`TrackTests`, a mix-in that adds an extensive
white-box test suite to a populated :py:class:`idtrack.Track` instance. The
class is **for developers only**; it should never be used in production
pipelines. Every public method beginning with ``is_`` returns a boolean
that tells whether a specific invariant holds. Methods beginning with
``history_`` execute heavier, end-to-end conversions and collect rich
statistics. The class is intended to be *mixin-ed* into a concrete Track
subclass—or instantiated standalone—**after** the underlying graph and
lookup tables have been fully built. It performs purely read-only
operations and therefore imposes no risk of mutating state.
All test methods share the following contract:
* They never raise on failure—*return-value only*—so they can be run in
bulk without interrupting your session.
* A return value of ``True`` means the invariant holds; ``False`` means
a violation was detected.
* Where useful, a *verbose* flag gives a tqdm progress bar so long-
running checks remain user-friendly.
Typical use::
tests = TrackTests(...)
tests.is_id_functions_consistent_ensembl() # Raises if inconsistent.
Note:
The class is *not* designed for production; instantiate it only in test
suites or interactive debugging sessions.
"""
def __init__(self, *args, **kwargs):
"""Initialize the test harness.
All positional and keyword arguments are forwarded verbatim to
:py:class:`~idtrack.Track.__init__`. Besides constructing the underlying
graph, the initializer sets up a dedicated :py:data:`logging.Logger`
named ``"track_tests"`` so individual test routines can emit structured
diagnostics without polluting the main application log.
Args:
args: Positional arguments accepted by :py:class:`~idtrack.Track.__init__`.
kwargs: Keyword arguments accepted by :py:class:`~idtrack.Track.__init__`.
"""
super().__init__(*args, **kwargs) # Sub-class initialization
self.log = logging.getLogger("track_tests")
[docs]
def is_id_functions_consistent_ensembl(self, verbose: bool = True):
"""Ensure Ensembl ID-list helpers agree with the SQL back-end.
For every release listed in ``graph.graph["confident_for_release"]``
the test compares two independent sources of Ensembl-gene IDs for the
*current genome assembly*:
1. IDs retrieved directly from MySQL via
:py:class:`~idtrack.DatabaseManager`.
2. IDs returned by :py:meth:`idtrack.Track.get_id_list` from the graph.
A mismatch means either the graph was built incompletely or the
helper functions drift out of sync with the database schema.
Args:
verbose: If ``True`` (default) show a tqdm progress-bar while
iterating through the releases.
Returns:
bool: ``True`` when **all** releases produce identical sets; else
``False`` (a descriptive warning is logged).
"""
assembly = self.graph.graph["genome_assembly"]
# Others should not have consistent as only the main assembly is added fully.
# In other assemblies, the Ensembl IDs are added if there is an external connection.
switch = True
with tqdm(
self.graph.graph["confident_for_release"], mininterval=0.25, disable=not verbose, ncols=100
) as loop_obj:
for ens_rel in loop_obj:
loop_obj.set_postfix_str(f"Item:{ens_rel}", refresh=False)
if self.db_manager.check_if_change_assembly_works(
db_manager=self.db_manager.change_release(ens_rel), target_assembly=assembly
):
db_from = self.db_manager.change_release(ens_rel).change_assembly(assembly)
ids_from = set(db_from.id_ver_from_df(db_from.get_db("ids", save_after_calculation=False)))
ids_from_graph = set(
self.graph.get_id_list(DB.nts_assembly[assembly][DB.backbone_form], assembly, ens_rel)
)
if ids_from != ids_from_graph:
switch = False
self.log.warning(
f"Inconsistent results for ID list functions (ensembl), at release `{ens_rel}`."
)
return switch
[docs]
def is_id_functions_consistent_ensembl_2(self, verbose: bool = True):
"""Cross-check Ensembl ID *range* helpers against raw edge data.
For every **backbone** Ensembl-gene node the routine computes the list
of active release ranges in two distinct ways:
1. **Raw computation** - by flattening
``graph.combined_edges_genes`` and compacting the releases with
:py:meth:`idtrack.Track.list_to_ranges`.
2. **Cached lookup** - via the lazily built dictionary
``graph.get_active_ranges_of_id``.
The two lists must match exactly. A divergence would indicate that
the cached helper is out of sync with the authoritative edge
structure.
Args:
verbose: If ``True`` (default) wrap the iteration in a tqdm bar.
Returns:
bool: ``True`` if *all* gene nodes pass; ``False`` after the first
failure (a warning is emitted).
"""
the_ids = list()
for i in self.graph.nodes:
if (
self.graph.nodes[i]["node_type"] == DB.nts_ensembl[DB.backbone_form]
and self.graph.nodes[i]["Version"] not in DB.alternative_versions
):
the_ids.append(i)
assembly = self.graph.graph["genome_assembly"]
with tqdm(the_ids, mininterval=0.25, disable=not verbose, ncols=100) as loop_obj:
for ti in loop_obj:
loop_obj.set_postfix_str(f"Item:{ti}", refresh=False)
# (1) raw recomputation
data1_raw = self.graph.combined_edges_genes[ti]
data1_ = sorted({k for i in data1_raw for j in data1_raw[i] for k in data1_raw[i][j] if j == assembly})
data1 = self.graph.list_to_ranges(data1_)
# (2) cached view
data2 = [
[i, j if not np.isinf(j) else max(self.graph.graph["confident_for_release"])]
for i, j in self.graph.get_active_ranges_of_id[ti]
]
if data1 != data2:
self.log.warning(f"Inconsistency between ID range functions: `{ti, data1, data2, data1_raw}`.")
return False
return True
[docs]
def is_range_functions_robust(self, verbose: bool = True):
"""Detect overlapping release ranges among sibling Ensembl IDs.
A *base* Ensembl-gene ID is the stable identifier that groups multiple
versioned Ensembl-gene records (siblings). The gene-history model
requires that the release ranges of sibling IDs **never overlap** -
each release must be covered by *exactly one* child stable ID.
This method traverses every base-gene node and checks that condition.
Args:
verbose: If ``True`` (default) display a tqdm progress-bar.
Returns:
bool: ``True`` when no overlaps are found; ``False`` otherwise.
Each offending base ID triggers a warning with the conflicting
ranges.
"""
base_ids = set()
for i in self.graph.nodes:
if self.graph.nodes[i]["node_type"] == DB.nts_base_ensembl[DB.backbone_form]:
base_ids.add(i)
switch = True
with tqdm(base_ids, mininterval=0.25, disable=not verbose, ncols=100) as loop_obj:
for i in loop_obj:
loop_obj.set_postfix_str(f"Item:{i}", refresh=False)
id_family = list(self.graph.neighbors(i))
id_ranges = [self.graph.get_active_ranges_of_id[j] for j in id_family]
for r1, r2 in itertools.combinations(id_ranges, 2):
r12 = self.graph.get_intersecting_ranges(r1, r2, compact=False)
if len(r12) > 1:
self.log.warning(f"For Base Ensembl ID {i}: Two associated Ensembl IDs cover the same area")
switch = False
return switch
[docs]
def is_base_is_range_correct(self, verbose: bool = True):
"""Verify consistency of *base-gene* active-range calculations.
Each "base Ensembl gene" node (``node_type == 'base_ensembl_gene'``)
has an *active release range*—the list of Ensembl releases during
which descendants of the gene were present. There are two
independent ways to obtain this information:
1. **High-level helper**
``graph.get_active_ranges_of_base_id_alternative`` - a cached
convenience wrapper.
2. **Low-level reconstruction** by aggregating the *combined_edges*
table and converting the set of releases into compact
``[start, end]`` slices via ``graph.list_to_ranges``.
This test iterates through **all** base-gene nodes and asserts that
the two methods deliver byte-identical results.
Args:
verbose (bool): If *True* (default) show a tqdm progress
bar that updates with the current node under inspection.
Returns:
bool: ``True`` if *every* base-gene has matching ranges; ``False``
as soon as a single mismatch is encountered.
"""
base_ids = set()
# Gather all base-gene identifiers up-front to avoid repeated lookups.
for i in self.graph.nodes:
if self.graph.nodes[i]["node_type"] == "base_ensembl_gene":
base_ids.add(i)
switch = True
with tqdm(base_ids, mininterval=0.25, disable=not verbose, ncols=100) as loop_obj:
for bi in loop_obj:
loop_obj.set_postfix_str(f"Item:{bi}", refresh=False)
# 1. High-level cached helper. get_base_id_range form function
bi_fun = self.graph.get_active_ranges_of_base_id_alternative(bi)
# 2. Low-level reconstruction from edge metadata. get range directly from graph
rd = self.graph.combined_edges[bi]
bi_dir = self.graph.list_to_ranges(sorted({s for p in rd for r in rd[p] for s in rd[p][r]}))
# Replace "inf" sentinel with highest known release. The
# helper already performs this normalisation so we mimic it
# here for a fair comparison.
bi_fun = [
[i, j if not np.isinf(j) else max(self.graph.graph["confident_for_release"])] for i, j in bi_fun
]
if bi_dir != bi_fun:
switch = False
return switch
[docs]
def is_combined_edges_dicts_overlapping_and_complete(self):
"""Check edge-dictionary partitioning invariants.
The Track graph materialises three *edge caches*—``combined_edges``
and its two specialised siblings—each storing adjacency and release
metadata for a different subset of nodes:
* ``combined_edges`` - all nodes, including backbone genes.
* ``combined_edges_genes`` - stable Ensembl genes (non-assembly-specific).
* ``combined_edges_assembly_specific_genes`` - genes that exist only
on a single assembly.
The design contract says:
1. **Disjointness** - No node key may appear in more than one
dictionary.
2. **Completeness** - The *union* of the dictionaries must cover all
graph nodes **except** those that represent *alternative database
versions* (e.g. "EnsemblMetazoa") which are intentionally kept
separate.
This routine enforces both rules.
Returns:
bool: ``True`` if the dictionaries are pair-wise disjoint **and**
collectively cover every eligible node; ``False`` otherwise.
"""
the_dicts = [
self.graph.combined_edges,
self.graph.combined_edges_genes,
self.graph.combined_edges_assembly_specific_genes,
]
# 1. Disjointness - if any intersection is non-empty the invariant is violated.
for d1, d2 in itertools.permutations(iterable=the_dicts, r=2):
if len(d1.keys() & d2.keys()) != 0:
return False # There are some overlapping identifiers.
# 2. Completeness - every node (minus alternative DB versions) must be represented in exactly one cache.
covered_nodes = set.union(*map(set, the_dicts))
uncovered_nodes = self.graph.nodes - covered_nodes
for un in uncovered_nodes:
node_data = self.graph.nodes[un]
if "Version" not in node_data or node_data["Version"] not in DB.alternative_versions:
return False # There are identifiers that is not covered with these dicts.
return True
[docs]
def is_edge_with_same_nts_only_at_backbone_nodes(self):
"""Assert *same-node-type* edges exist **only** between backbone genes.
The graph is a *multilayer* network where nodes of different
This method traverses every base-gene node and checks that condition.
Returns:
bool: ``True`` when no overlaps are found; ``False`` otherwise.
Each offending base ID triggers a warning with the conflicting
ranges.
"""
for n1 in self.graph.nodes:
nts1 = self.graph.nodes[n1][DB.node_type_str]
for n2 in self.graph.neighbors(n1):
nts2 = self.graph.nodes[n2][DB.node_type_str]
if nts1 == nts2 and nts1 != DB.external_search_settings["nts_backbone"]:
return False
return True
[docs]
def is_id_functions_consistent_external(self, verbose: bool = True):
"""Check external-ID list helpers against the raw MySQL tables.
For **every** combination of *assembly*, *Ensembl release* (limited to
:py:attr:`graph.graph["confident_for_release"]`) and *external
database* this test performs the following steps:
1. Query the authoritative list of external IDs directly from the
MySQL snapshot via :py:class:`~idtrack.database_manager.DatabaseManager`.
2. Ask the in-memory graph for the same list via
:py:meth:`idtrack.Track.get_id_list`.
3. Normalise node names through
:py:meth:`idtrack.Track.node_name_alternatives` to cope with the
occasional “_1” suffix.
4. Compare the two sets. A mismatch is logged and the method returns
``False`` immediately.
The exhaustive traversal is expensive (minutes for large genomes) but
ensures the graph`s indexing helpers never drift from the actual
database content.
Args:
verbose: If *True* (default) display a tqdm progress bar and emit
log messages at INFO level. When *False* the method runs
silently.
Returns:
bool: *True* when every single comparison matched, *False* as soon
as an inconsistency is encountered.
"""
narrow_external = self.graph.graph["narrow_external"]
misplace_entries = self.graph.graph["misplaced_external_entry"]
for assembly in self.graph.available_genome_assemblies:
with tqdm(
self.graph.graph["confident_for_release"],
mininterval=0.25,
desc=f"Assembly {assembly}",
disable=not verbose,
ncols=100,
) as loop_obj:
for release in loop_obj:
loop_obj.set_postfix_str(f"Item:{release}", refresh=False)
if self.db_manager.check_if_change_assembly_works(
db_manager=self.db_manager.change_release(release), target_assembly=assembly
):
dm = self.db_manager.change_release(release).change_assembly(assembly)
ex_rel_d = {
f: dm.change_form(f).get_db("external_relevant" if narrow_external else "external")
for f in self.graph.available_forms
}
for database in self.graph.available_external_databases_assembly[assembly]:
form = self.graph.external_database_connection_form[database]
ex_rel = ex_rel_d[form]
from_dm_ = set(ex_rel["id_db"][ex_rel["name_db"] == database])
from_dm = set()
for nd in from_dm_:
l1, l2 = self.graph.node_name_alternatives(nd)
from_dm.add(l1 if l2 else nd)
from_gr = set(self.graph.get_id_list(database, assembly, release))
if from_gr != from_dm and not all([i in misplace_entries for i in (from_dm - from_gr)]):
self.log.warning(
f"Inconsistent results for ID list functions (external) "
f"for: database, assembl, ensembl release: {(database, assembly, release)}"
)
return False
return True
[docs]
def how_many_corresponding_path_ensembl(
self, from_release: int, from_assembly: int, to_release: int, go_external: bool, verbose: bool = True
):
"""Count history paths between two Ensembl releases.
The method iterates over **all** Ensembl-gene stable IDs that exist in
*from_release*/*from_assembly*. For every ID that is present in the
graph it calls :py:meth:`idtrack.Track.get_possible_paths` and records
how many distinct paths the searcher finds to *to_release*.
The routine is **non-destructive**; it merely provides a quick way to
gauge the `density` of the history graph or to spot releases where
path-finding was unexpectedly difficult.
Args:
from_release: Source Ensembl release number.
from_assembly: Source genome assembly code.
to_release: Target Ensembl release number.
go_external: If *True* history paths are allowed to temporarily
leave the Ensembl lineage via external databases.
verbose: Show a tqdm progress bar (default *True*).
Returns:
list[list[Union[str, int, None]]]: A list of two-element sub-lists
``[stable_id, n_paths]`` where ``n_paths`` is
* an *int* ≥ 0 when the ID was in the graph, or
* *None* when the source ID was absent.
"""
db_from = self.db_manager.change_release(from_release).change_assembly(from_assembly)
ids_from = sorted(set(db_from.id_ver_from_df(db_from.get_db("ids", save_after_calculation=False))))
to_reverse = from_release > to_release
result = list()
with tqdm(ids_from, mininterval=0.25, disable=not verbose, ncols=100) as loop_obj:
for from_id in loop_obj:
loop_obj.set_postfix_str(f"Item:{from_id}", refresh=False)
if from_id in self.graph.nodes:
lfr = len(
self.get_possible_paths(
from_id, from_release, to_release, reverse=to_reverse, go_external=go_external
)
)
else:
lfr = None
result.append([from_id, lfr])
return result
[docs]
def history_travel_testing(
self,
from_release: int,
from_assembly: int,
from_database: str,
to_release: int,
to_database: str,
go_external: bool,
prioritize_to_one_filter: bool,
convert_using_release: bool,
from_fraction: float = 1.0,
verbose: bool = True,
verbose_detailed: bool = False,
return_ensembl_alternative: bool = False,
):
"""Run an end-to-end Ensembl-history conversion and collect granular QA metrics.
The routine samples identifiers from *from_database*/*from_release*
(optionally down-sampling via *from_fraction*) and converts each one to
*to_database*/*to_release* using :py:meth:`idtrack.Track.convert`. It
is intentionally **non-fatal**: every failure mode is caught, logged
and tallied so that large regression suites can run unattended. All
results are returned in a single nested *metrics* dictionary whose
structure mirrors the printable report produced by
:py:meth:`format_history_travel_testing_report`.
The statistics fall into **four conceptual groups**; each counter not only records an absolute event
count but also serves as a red-flag indicator for specific classes of mapping pathology. Use the
guidelines below to interpret the numbers and decide whether a run is *healthy*, *questionable*, or
*action-required*.
* **Failure / anomaly counters**
* ``history_voyage_failed_gracefully`` - the converter raised :py:data:`EmptyConversionMetricsError`.
* ``history_voyage_failed_unknown`` - any *other* unexpected exception.
* ``query_not_in_the_graph`` - source ID absent from the graph.
* ``lost_item`` - traversal finished but produced **no** final IDs.
* ``lost_item_but_the_same_id_exists`` - special case of *lost_item*
when the target and source DB are both Ensembl-gene and the target
ID still exists in the graph.
* ``found_ids_not_accurate`` - at least one returned target ID is not
part of the authoritative *ids_to* reference set.
* **Mapping quality**
* ``one_to_one_ids`` - queries that resolved to *exactly* one target ID.
* ``one_to_multiple_ids`` - queries with > 1 admissible targets.
* ``one_to_multiple_final_conversion`` - subset of the above where
*exactly one* traversal path was found (heuristics eliminated alternatives).
* **Collision analysis**
The list ``clashing_id_type == [clash_one_one, clash_multi_multi,
clash_multi_one]`` classifies *target IDs* that were reached by
more than one query:
* ``clash_one_one`` - every colliding query was 1→1.
* ``clash_multi_multi`` - every colliding query was 1→many.
* ``clash_multi_one`` - mixture of 1→1 and 1→many queries (most
alarming category).
* **Timings & book-keeping**
* ``time`` - wall-clock runtime in seconds.
* ``conversion`` - per-query mapping result for *all* successful traversals.
* ``converted_item_dict`` / ``converted_item_dict_reversed`` - raw
per-ID caches used to derive the higher-level counters above.
* ``parameters`` - echo of the function arguments.
* ``ids`` - the sampled *from* and reference *to* ID sets.
Args:
from_release (int): Ensembl release number of the *source* IDs.
from_assembly (int): Genome assembly code of the source IDs.
from_database (str): Node-type / database of the source IDs.
to_release (int): Ensembl release number of the *target* IDs.
to_database (str): Node-type / database to convert **into**.
go_external (bool): Permit temporary detours through external IDs when native Ensembl history edges break.
prioritize_to_one_filter (bool): Prefer 1→1 mappings over 1→many when multiple paths exist.
convert_using_release (bool): Pass *from_release* straight into
:py:meth:`idtrack.Track.convert` instead of letting it infer the starting point.
from_fraction (float): Fraction (0 < x ≤ 1) of the
*ids_from* population to sample; speeds up smoke tests.
verbose (bool): Show tqdm progress bar (coarse).
verbose_detailed (bool): Embed live metric counters in the tqdm postfix.
return_ensembl_alternative (bool): Forwarded to :py:meth:`idtrack.Track.convert`.
Raises:
ValueError: If either database argument refers to an Ensembl
node-type (must use backbone helpers instead) or if
*from_fraction* is outside the open interval (0, 1].
Returns:
dict: Nested *metrics* dictionary with the layout described above.
Use :py:meth:`format_history_travel_testing_report` for a
human-readable summary.
Notes:
* All counters are **absolute counts** - divide by
``len(metrics['ids']['from'])`` to obtain rates.
* The collision analysis is inspired by the *clash statistics*
logic implemented at the end of the function and helps spot
discrepant “unique” IDs that suddenly become ambiguous. Keeping
all three clash counters at zero is the gold standard for a
healthy build.
"""
if from_database in DB.nts_ensembl or to_database in DB.nts_ensembl:
raise ValueError(
f"Cannot use Ensembl node types directly. "
f"from_database={from_database!r}, to_database={to_database!r} must not be in {DB.nts_ensembl}."
)
ids_from = sorted(self.graph.get_id_list(from_database, from_assembly, from_release))
ids_to = set(self.graph.get_id_list(to_database, self.graph.graph["genome_assembly"], to_release))
ids_to_s = {self.graph.nodes[i]["ID"] for i in ids_to} if to_database in DB.nts_assembly_reverse else set()
# Input sampling
if from_fraction == 1.0:
pass
elif 0.0 < from_fraction < 1.0:
from_faction_count = ceil(len(ids_from) * from_fraction)
ids_from = sorted(random.sample(ids_from, from_faction_count)) # noqa: S311
else:
raise ValueError(f"from_fraction must be in range (0.0, 1.0], got {from_fraction!r}.")
# Metric scaffold
parameters: dict[str, Union[bool, str, int, float]] = {
"from_release": from_release,
"from_assembly": from_assembly,
"from_database": from_database,
"to_release": to_release,
"to_database": to_database,
"go_external": go_external,
"from_fraction": from_fraction,
"prioritize_to_one_filter": prioritize_to_one_filter,
}
metrics: dict[str, Any] = {
"parameters": parameters,
"ids": {"from": ids_from, "to": ids_to},
"lost_item": [],
"one_to_one_ids": {},
"query_not_in_the_graph": [],
"history_voyage_failed_gracefully": [],
"history_voyage_failed_unknown": [],
"lost_item_but_the_same_id_exists": [],
"found_ids_not_accurate": {},
"conversion": {},
"one_to_multiple_ids": {},
"one_to_multiple_final_conversion": [],
"converted_item_dict": {},
"converted_item_dict_reversed": {},
}
t1 = time.time()
with tqdm(ids_from, mininterval=0.25, disable=not verbose, desc="Mapping", ncols=100) as loop_obj:
for the_id in loop_obj:
if not verbose_detailed:
loop_obj.set_postfix_str(f"Item:{the_id}", refresh=False)
else:
suffix = (
f"Item:{the_id}, "
f"{len(metrics['one_to_one_ids'])},"
f"{len(metrics['one_to_multiple_ids'])},"
f"{len(metrics['lost_item'])},"
f"{len(metrics['found_ids_not_accurate'])},"
f"{len(metrics['query_not_in_the_graph']) + len(metrics['history_voyage_failed_unknown'])}"
)
loop_obj.set_postfix_str(suffix, refresh=False)
# Conversion attempt
try:
if convert_using_release:
converted_item = self.convert(
from_id=the_id,
from_release=from_release,
to_release=to_release,
final_database=to_database,
go_external=go_external,
prioritize_to_one_filter=prioritize_to_one_filter,
return_ensembl_alternative=return_ensembl_alternative,
)
else:
converted_item = self.convert(
from_id=the_id,
from_release=None,
to_release=to_release,
final_database=to_database,
go_external=go_external,
prioritize_to_one_filter=prioritize_to_one_filter,
return_ensembl_alternative=return_ensembl_alternative,
)
metrics["converted_item_dict"][the_id] = converted_item
except nx.exception.NetworkXError:
metrics["query_not_in_the_graph"].append(the_id)
continue
except EmptyConversionMetricsError:
full_tb = traceback.format_exc()
metrics["history_voyage_failed_gracefully"].append((the_id, full_tb))
continue
except Exception:
full_tb = traceback.format_exc()
metrics["history_voyage_failed_unknown"].append((the_id, full_tb))
continue
# Metrics aggregation
if converted_item is None:
if to_database == from_database == "ensembl_gene" and self.graph.nodes[the_id]["ID"] in ids_to_s:
metrics["lost_item_but_the_same_id_exists"].append(the_id)
metrics["lost_item"].append(the_id)
else:
covr = {j for i in converted_item for j in converted_item[i]["final_conversion"]["final_elements"]}
metrics["conversion"][the_id] = list(covr)
if len(covr) == 1:
metrics["one_to_one_ids"][the_id] = list(covr)
elif len(covr) > 1:
metrics["one_to_multiple_ids"][the_id] = list(covr)
if len(converted_item) == 1:
metrics["one_to_multiple_final_conversion"].append(the_id)
else:
raise ValueError(f"Unexpected empty conversion result for ID {the_id!r}.")
# How much of the converted IDs show the same ID
for c in covr:
if c in metrics["converted_item_dict_reversed"]:
metrics["converted_item_dict_reversed"][c].append(the_id)
else:
metrics["converted_item_dict_reversed"][c] = [the_id]
for c in covr:
if c not in ids_to:
if the_id not in metrics["found_ids_not_accurate"]:
metrics["found_ids_not_accurate"][the_id] = list()
metrics["found_ids_not_accurate"][the_id].append(c)
# ── Clash statistics:
# Each counter tracks how many *target* IDs were reached by more than
# one *source* ID, broken down by the **type of mapping** that led to
# the collision:
clash_multi_multi: int = 0 # all colliding sources were 1→many mappings
clash_multi_one: int = 0 # mix of 1→many and 1→1 sources
clash_one_one: int = 0 # all colliding sources were clean 1→1
# `converted_item_dict_reversed` maps every target ID to the list of
# source IDs that converted to it (built earlier inside the big mapping
# loop). We inspect only those targets hit by ≥ 2 sources - a “clash”.
for target_id, cidr_val in metrics["converted_item_dict_reversed"].items():
# Skip non-clashing targets (exactly one source mapped here).
if len(cidr_val) > 1:
# Does *any* of the sources belong to the 1→many bucket?
s1 = any([cv in metrics["one_to_multiple_ids"] for cv in cidr_val])
# Does *any* of the sources belong to the 1→1 bucket?
s2 = any([cv in metrics["one_to_one_ids"] for cv in cidr_val])
# Categorise clash by the mixture of mapping types.
if s1 and s2: # at least one 1→many **and** one 1→1
clash_multi_one += 1
elif s1: # only 1→many sources present
clash_multi_multi += 1
elif s2: # only 1→1 sources present
clash_one_one += 1
else:
raise ValueError(f"Unexpected state: no sources found for clashing ID {target_id!r}.")
metrics["clashing_id_type"] = [clash_one_one, clash_multi_multi, clash_multi_one]
t2 = time.time()
metrics["time"] = t2 - t1
return metrics
[docs]
def history_travel_testing_random_arguments_generator(self, strict_forward: bool, include_exclude_list: list[bool]):
"""Generate a plausible random parameter set for :py:meth:`history_travel_testing`.
The helper picks **compatible** source/target assemblies, releases and
databases so the subsequent conversion test has a realistic chance to
succeed. When *strict_forward* is *True* the target release is
guaranteed to be **≥** the source release (no time-travel back).
Args:
strict_forward (bool): Enforce a non-decreasing release direction.
include_exclude_list (list[bool]): A 4-element list of booleans controlling
inclusion of ``[include_ensembl_source, include_external_source,
include_ensembl_destination, include_external_destination]``.
Returns:
dict: Keys ``from_assembly``, ``from_release``, ``to_release``,
``from_database``, ``to_database`` ready to be splatted into
:py:meth:`history_travel_testing`.
"""
include_ensembl_1, include_external_1, include_ensembl_2, include_external_2 = include_exclude_list
only_backbone_tests_1 = True if not any([include_ensembl_1, include_external_1]) else False
only_backbone_tests_2 = True if not any([include_ensembl_2, include_external_2]) else False
main_assembly = int(self.graph.graph["genome_assembly"])
from_assembly = (
random.choice(sorted(self.graph.available_genome_assemblies)) # noqa: S311
if not only_backbone_tests_1
else main_assembly
)
# Final conversions are always evaluated on the graph's primary assembly.
to_assembly = main_assembly
the_key1, the_key2 = None, None
while the_key1 is None or the_key2 is None:
the_key1 = self.random_dataset_source_generator(
from_assembly,
include_ensembl=include_ensembl_1,
include_external=include_external_1,
only_backbone_tests=only_backbone_tests_1,
for_final_database=False,
)
if the_key1 != ("", -1, -1):
the_key2 = self.random_dataset_source_generator(
to_assembly,
include_ensembl=include_ensembl_2,
include_external=include_external_2,
for_final_database=True,
only_backbone_tests=only_backbone_tests_2,
release_lower_limit=None if not strict_forward else the_key1[2],
)
if the_key1 == ("", -1, -1) or the_key2 == ("", -1, -1):
self.log.warning("Recalculating parameters.")
return {
"from_assembly": from_assembly,
"from_release": the_key1[2],
"to_release": the_key2[2],
"from_database": the_key1[0],
"to_database": the_key2[0],
}
[docs]
def history_travel_testing_random(
self,
from_fraction: float,
include_ensembl_source=True,
include_external_source=True,
include_ensembl_destination=True,
include_external_destination=True,
verbose: bool = True,
verbose_detailed: bool = False,
strict_forward: bool = False,
convert_using_release: bool = False,
prioritize_to_one_filter: bool = True,
return_result: bool = False,
):
"""Convenience wrapper around :py:meth:`history_travel_testing`.
The routine generates a *random* but internally consistent test case
via :py:meth:`history_travel_testing_random_arguments_generator`, logs
the chosen parameters (unless *verbose* is *False*) and delegates the
heavy lifting to :py:meth:`history_travel_testing`.
Args:
from_fraction: Fraction of IDs to sample from the source set.
strict_forward: Forwarded to the argument generator.
convert_using_release: Forwarded to
:py:meth:`history_travel_testing`.
prioritize_to_one_filter: Forwarded to
:py:meth:`history_travel_testing`.
verbose (bool): Show coarse progress information.
verbose_detailed (bool): Include extended per-ID counters in the progress bar.
return_result (bool): If ``True``, return the metrics dictionary.
include_ensembl_source (bool): Include Ensembl databases as valid sources.
include_external_source (bool): Include external databases as valid sources.
include_ensembl_destination (bool): Include Ensembl databases as valid destinations.
include_external_destination (bool): Include external databases as valid destinations.
Returns:
dict: The *metrics* dictionary returned by
:py:meth:`history_travel_testing`.
"""
include_exclude_list = [
include_ensembl_source,
include_external_source,
include_ensembl_destination,
include_external_destination,
]
parameters = self.history_travel_testing_random_arguments_generator(
strict_forward=strict_forward, include_exclude_list=include_exclude_list
)
if verbose:
printable1 = os.linesep + os.linesep.join(self._format_history_travel_testing_report_header(parameters))
self.log.info(printable1)
res = self.history_travel_testing(
**parameters,
go_external=True,
prioritize_to_one_filter=prioritize_to_one_filter,
convert_using_release=convert_using_release,
from_fraction=from_fraction,
verbose=verbose,
verbose_detailed=verbose_detailed,
return_ensembl_alternative=False,
)
if verbose:
printable2 = os.linesep + os.linesep.join(
self._format_history_travel_testing_report(res, include_header=False)
)
self.log.info(printable2)
if return_result:
return res
[docs]
def is_final_external_conversion_robust(
self,
convert_using_release: bool = False,
database: Optional[str] = None,
ens_rel: Optional[int] = None,
verbose: bool = True,
from_fraction: float = 1.0,
prioritize_to_one_filter: bool = False,
):
"""Validate Ensembl→external conversion against MySQL ground truth.
A random external database is chosen for **every** genome assembly. For
the selected combination the method grabs the authoritative mapping
table (graph-ID → external ID set) from MySQL and converts the same
graph-IDs with :py:meth:`idtrack.Track.convert`.
Args:
convert_using_release (bool): Whether to pin the *from_release* when
calling the converter. Keeping this *True* usually speeds up
the search and mimics user-facing behaviour.
verbose (bool): Print the current assembly/database/release being tested.
prioritize_to_one_filter (bool): If ``True``, apply tie-breaking to
select a single best target when multiple candidates exist.
ens_rel (int | None): Specific Ensembl release to test. If ``None``,
a random release is chosen.
from_fraction (float): Fraction of identifiers to sample for testing
(0.0-1.0). Defaults to 1.0 (all identifiers).
database (str | None): Specific external database to test. If ``None``,
a random database is chosen.
Returns:
bool: *True* if every converted set equals the MySQL reference,
*False* upon the first deviation.
Raises:
ValueError: Raised when test parameters are invalid or incompatible.
"""
issues_t1 = []
issues_t2 = []
issues_t3 = []
assembly = int(self.graph.graph["genome_assembly"])
if database is None or ens_rel is None:
self.log.info("Random database and Ensembl release.")
database, _, ens_rel = self.random_dataset_source_generator(
include_ensembl=True,
include_external=True,
only_backbone_tests=False,
for_final_database=False,
assembly=assembly,
form=DB.backbone_form,
)
if not self.db_manager.check_if_change_assembly_works(
db_manager=self.db_manager.change_release(ens_rel), target_assembly=assembly
):
raise ValueError(
f"Cannot change to assembly {assembly} at release {ens_rel}. "
f"The target assembly is not available for this release."
)
dm = self.db_manager.change_release(ens_rel).change_assembly(assembly)
df = dm.get_db("external_relevant")
df = df[df["name_db"] == database]
base_dict: dict[str, set] = dict()
for _, item in df.iterrows():
if item["graph_id"] not in base_dict:
base_dict[item["graph_id"]] = set()
base_dict[item["graph_id"]].add(item["id_db"])
if verbose:
self.log.info(f"Assembly: {assembly}, Database: {database}, Release: {ens_rel}")
res = self.history_travel_testing(
from_fraction=from_fraction,
from_release=ens_rel,
from_assembly=assembly,
from_database=DB.nts_assembly[assembly][DB.backbone_form],
to_release=ens_rel,
to_database=database,
go_external=True,
prioritize_to_one_filter=prioritize_to_one_filter,
convert_using_release=convert_using_release,
verbose=verbose,
verbose_detailed=False,
)
for from_id in res["ids"]["from"]:
if from_id not in res["conversion"]:
issues_t3.append(from_id)
else:
issue_dict = {
"database": database,
"asym": assembly,
"ens_rel": ens_rel,
"id": from_id,
"converted": res["conversion"][from_id],
}
if from_id not in base_dict:
issues_t2.append(issue_dict)
elif {i.lower() for i in res["conversion"][from_id]} != {i.lower() for i in base_dict[from_id]}:
issue_dict["base_expectation"] = base_dict[from_id]
issues_t1.append(issue_dict)
# make sure issues_t3 is not found in base_dict_from_id
if len(issues_t1) == 0 and len(issues_t2) == 0:
return True, (issues_t1, issues_t2, issues_t3, res)
else:
return False, (issues_t1, issues_t2, issues_t3, res)
[docs]
def random_dataset_source_generator(
self,
assembly: int,
include_external: bool,
include_ensembl: bool,
for_final_database: bool,
only_backbone_tests: bool,
release_lower_limit: Optional[int] = None,
form: Optional[str] = None,
) -> tuple[str, int, int]:
"""Pick a random (<database>, <assembly>, <release>) tuple.
The function guarantees that the triple actually exists in the graph
and - if *release_lower_limit* is provided - honours the minimum
release constraint.
Args:
assembly (int): Genome assembly code used in Ensembl core schema names
(e.g. ``38`` = human GRCh38, ``39`` = mouse GRCm39, ``111`` = pig Sscrofa11.1).
include_ensembl (bool): Whether Ensembl backbone databases may be returned
as *database*.
release_lower_limit (int | None): Smallest permissible Ensembl release number
for the returned triple. *None* disables the filter.
form (str | None): Restrict the draw to a particular connection *form*
(protein/coding/gene). *None* means no restriction.
only_backbone_tests (bool): If ``True``, restrict selection to backbone-only
databases (skip external databases entirely).
include_external (bool): Whether external (non-Ensembl) databases may be
included in the selection pool.
for_final_database (bool): If ``True``, exclude Ensembl assembly-specific
databases from selection (useful when selecting final conversion targets).
Returns:
tuple | None: ``(<database>, <assembly>, <release>)`` or *None* when
no matching release exists.
Raises:
ValueError: Raised when no valid database/release combination can be found.
"""
if not only_backbone_tests:
if include_external:
all_possible_sources = copy.deepcopy(list(self.graph.available_external_databases_assembly[assembly]))
all_possible_sources = [i for i in all_possible_sources if not i.startswith("synonym_id")]
else:
all_possible_sources = []
if form is not None:
all_possible_sources = [
i for i in all_possible_sources if self.graph.external_database_connection_form[i] == form
]
if include_ensembl and not for_final_database:
if form is None:
all_possible_sources.extend(list(DB.nts_assembly[assembly].values()))
else:
all_possible_sources.append(DB.nts_assembly[assembly][form])
if include_ensembl and (form == DB.backbone_form or form is None):
all_possible_sources.append(DB.nts_ensembl[DB.backbone_form])
all_possible_sources.append(DB.nts_base_ensembl[DB.backbone_form])
if not all_possible_sources:
raise ValueError("There is nothing as possible sources.")
else:
all_possible_sources = [DB.nts_ensembl[DB.backbone_form]]
selected_database = random.choice(all_possible_sources) # noqa: S311
possible_releases = self.graph.available_releases_given_database_assembly[(selected_database, assembly)]
if release_lower_limit is not None:
possible_releases = {i for i in possible_releases if i >= release_lower_limit}
if len(possible_releases) > 1:
selected_release = random.choice(list(possible_releases)) # noqa: S311
the_key = (selected_database, assembly, selected_release)
return the_key
else:
return ("", -1, -1)
[docs]
def is_node_consistency_robust(self, verbose: bool = True):
"""Check for illegal neighbour relationships and multi-edges.
The graph may contain **exactly one** edge between nodes of *different*
node-types. Nodes of the *same* node-type are only allowed when that
type is the Ensembl backbone (``ensembl_gene``). Any deviation - a
lateral same-type connection or >1 multi-edge - is logged and aborts
the test.
Args:
verbose: Print offending nodes when a violation is detected.
Returns:
bool: *True* when the graph satisfies the topology rules, *False*
otherwise.
"""
with tqdm(self.graph.nodes, mininterval=0.25, disable=not verbose, ncols=100) as loop_obj:
for i in loop_obj:
for j in self.graph.neighbors(i):
ni = self.graph.nodes[i][DB.node_type_str]
nj = self.graph.nodes[j][DB.node_type_str]
if ni == nj and ni != DB.nts_ensembl[DB.backbone_form]:
if verbose:
self.log.warning(f"Neighbor nodes (not backbone) has similar node type: `{i}`, `{j}`")
return False
elif ni != nj and len(self.graph[i][j]) != 1:
if verbose:
self.log.warning(f"Multiple edges between `{i, ni}` and `{j, nj}`")
return False
return True
[docs]
def _format_history_travel_testing_report(
self, res: dict[str, Any], include_header=False, line_separation_at_end=True
) -> list[str]:
"""Format a complete history travel testing report with metrics.
Args:
res (dict[str, Any]): Results dictionary from history_travel_testing
containing conversion metrics and parameters.
include_header (bool): If ``True``, include the header section with
source/target information. Defaults to ``False``.
line_separation_at_end (bool): If ``True``, append a blank line
separator at the end of the report. Defaults to ``True``.
Returns:
list[str]: Lines of formatted text for the complete report.
"""
def cnt(key: str) -> int:
"""Return length of list/dict at res[key] or zero if absent."""
return len(res.get(key, []))
def block(title: str, rows: list[tuple]) -> list[str]:
pad = max(len(k) for k, _ in rows)
return [f"{title}:"] + [f" - {k.ljust(pad)} : {v:,}" for k, v in rows]
if include_header:
p = res.get("parameters", {})
header = self._format_history_travel_testing_report_header(p)
header_extension = [
f"External: {p.get('go_external')} " f"1→1-pref.: {p.get('prioritize_to_one_filter')}",
f"Sample : {p.get('from_fraction'):g} of source IDs",
]
header.extend(header_extension)
else:
header = []
failure_rows = [
("Voyage failed (graceful) ", cnt("history_voyage_failed_gracefully")),
("Voyage failed (unknown) ", cnt("history_voyage_failed_unknown")),
("Query not in graph ", cnt("query_not_in_the_graph")),
("Lost item ", cnt("lost_item")),
("Lost item, but ID exists ", cnt("lost_item_but_the_same_id_exists")),
("Found IDs not accurate ", cnt("found_ids_not_accurate")),
]
clashes = res.get("clashing_id_type", [0, 0, 0])
mapping_rows = [
("One→one IDs ", cnt("one_to_one_ids")),
("One→many IDs ", cnt("one_to_multiple_ids")),
("One→many (single conv.) ", cnt("one_to_multiple_final_conversion")),
("Successfully converted IDs ", cnt("conversion")),
("Clash one→one ", clashes[0]),
("Clash many→many ", clashes[1]),
("Clash mixed ", clashes[2]),
]
report_lines: list[str] = (
header
+ block("Failure / Anomaly Counts", failure_rows)
+ block("Mapping Statistics", mapping_rows)
+ [f"Total runtime: {res.get('time', 0):.2f} s"]
)
if line_separation_at_end:
report_lines.append("")
return report_lines