Source code for charger.variant

import re
from contextlib import closing
from enum import Enum, Flag, auto
from pathlib import Path
from typing import Any, Dict, Generator, List, Optional, Type, TypeVar

import attr
from cyvcf2 import VCF
from cyvcf2 import Variant as CyVCF2Variant
from loguru import logger

from .csq import CSQ

logger.disable("charger")  # Disable emit logs by default


# A type hint variable to annotated the Factory method
# See https://github.com/python/typing/issues/58 for details
V = TypeVar("V", bound="Variant")


[docs]@attr.s(auto_attribs=True, repr=False, eq=False, order=False, slots=True) class Variant: """ Biallelic variant. For normal usage, consider using :meth:`read_and_parse_vcf` to construct the objects from a VEP annotated VCF. Examples: >>> variant = Variant('13', 32340300, 32340301, 'GT', 'G', id='rs80359550') >>> variant Variant(13:32340300GT>G info: ) >>> v.is_snp() False >>> v.is_sv() False >>> v.is_indel() True >>> v.is_deletion() True Annotate it with online VEP, >>> v = next(Variant.read_and_parse_vcf('rs80359550.vcf')) >>> v Variant(13:32340300GT>G info: CSQ[4 parsed]) """ chrom: str #: Chromosome. start_pos: int #: Start position (1-based closed). Same as POS in the VCF record. end_pos: int #: End position (1-based closed). ref_allele: str #: Reference allele sequence. alt_allele: str #: Alternative allele sequence (currently only allow one possible allele). id: Optional[str] = None """ID in the VCF record. `None` when the original value is ``.``.""" filter: Optional[List[str]] = None """FILTER in the VCF record. `None` when the original value is ``PASS``.""" info: Dict[str, Any] = attr.Factory(dict) """INFO in the VCF record.""" parsed_csq: Optional[List["CSQ"]] = None """ All parsed CSQ annotations of the variant as a list of :class:`.CSQ` objects. Use :meth:`read_and_parse_vcf` to automatically parse CSQ while reading an annotated VCF. """ _most_severe_csq: Optional["CSQ"] = attr.ib(default=None, init=False) """Cached most severe CSQ based on the consequence type.""" def __attrs_post_init__(self): # Variant must be normalized if self.ref_allele == ".": raise ValueError( "ref_allele cannot be missing ('.'). Try normalize the variant first." ) if self.alt_allele is None or self.alt_allele == ".": raise ValueError( "alt_allele cannot be missing ('.' or None). Try normalize the variant first." )
[docs] def is_snp(self) -> bool: """`True` if the variant is a SNP.""" if len(self.ref_allele) > 1: return False elif self.alt_allele not in ("A", "C", "G", "T"): return False return True
[docs] def is_sv(self) -> bool: """`True` if the variant ia an SV.""" return "SVTYPE" in self.info and self.info["SVTYPE"] is not None
[docs] def is_indel(self) -> bool: """`True` if the variant ia an INDEL.""" is_sv = self.is_sv() if len(self.ref_allele) > 1 and not is_sv: return True if self.alt_allele == ".": return False elif len(self.alt_allele) != len(self.ref_allele) and not is_sv: return True return False
[docs] def is_deletion(self) -> bool: """`True` if the variant is a deletion.""" if not self.is_indel(): return False else: return len(self.ref_allele) > len(self.alt_allele)
[docs] def get_most_severe_csq(self) -> CSQ: """Get the most severe CSQ based on the consequence type. If multiple CSQs have the same consequence type, the canonical CSQ determined by VEP will be selected. """ # Return the cached result if available if self._most_severe_csq is not None: return self._most_severe_csq if self.parsed_csq is None: raise ValueError( "Variant {self!r} CSQ is not parsed or it does not have any annotation." ) # Order all CSQ based on: # - The most severe consequence type # - The transcript is canonical # - The CSQ comes first rank_and_canonical_per_csq = [] for csq_ix, csq in enumerate(self.parsed_csq): # Severe consequence have smaller rank rank_consequence_type = csq.rank_consequence_type() # Canonical transcript wins whenever there is a tie in the ranks of consequence type. try: if csq["CANONICAL"] == "YES": rank_canonical = 0 else: rank_canonical = 1 except KeyError: rank_canonical = 1 rank_and_canonical_per_csq.append( (rank_consequence_type, rank_canonical, csq_ix) ) rank_ct, rank_canonical, csq_ix = min(rank_and_canonical_per_csq) self._most_severe_csq = self.parsed_csq[csq_ix] return self._most_severe_csq
def _parse_csq(self, csq_fields: List[str]): """Parse the CSQ info string based on the CSQ field spec. It returns a list of consequences per annotation(transcript) as a list of :class:`.CSQ` objects. """ all_csq = self.info["CSQ"].split(",") parsed_csq_per_annotation = [] for one_csq in all_csq: csq_values = one_csq.split("|") if len(csq_values) != len(csq_fields): raise ValueError( f"Number of CSQ values (n={len(csq_values)}) and columns (n={len(csq_fields)})" f"don't match. columns: {csq_fields!r}, values: {csq_values!r}" ) parsed_csq_per_annotation.append(CSQ(dict(zip(csq_fields, csq_values)))) self.parsed_csq = parsed_csq_per_annotation self.info["CSQ"] = self.parsed_csq def __repr__(self): type_name = type(self).__name__ ref = limit_seq_display(self.ref_allele) alt = limit_seq_display(self.alt_allele) info_keys = set(self.info.keys()) if self.parsed_csq is not None: info_keys.discard("CSQ") info_keys.add(f"CSQ[{len(self.parsed_csq)} parsed]") info_repr = f"info: {','.join(info_keys)}" return f"{type_name}({self.chrom}:{self.start_pos}{ref}>{alt} {info_repr})"
[docs] @classmethod def from_cyvcf2(cls: Type[V], variant: CyVCF2Variant) -> V: """ Create one Variant object based on the given :class:`cyvcf2.Variant <cyvcf2.cyvcf2.Variant>` VCF record. """ return cls( chrom=variant.CHROM, start_pos=variant.start + 1, end_pos=variant.end, ref_allele=variant.REF, alt_allele=variant.ALT[0], id=variant.ID, filter=variant.FILTER, info=dict(variant.INFO), )
[docs] @classmethod def get_vep_version(cls: Type[V], vcf_raw_headers: List[str]) -> str: """Extract the VEP version in the given VCF.""" # Find VEP version # Reverse the header order because the newer header appears later try: vep_header = next( line for line in reversed(vcf_raw_headers) if line.startswith("##VEP=") ) vep_version = re.match(r"^##VEP=['\"]?v(\d+)['\"]?", vep_header).group(1) # type: ignore except (StopIteration, AttributeError): logger.warning("Cannot find VEP version in the VCF header") vep_version = "UNKNOWN" return vep_version
[docs] @classmethod def get_vep_csq_fields(cls: Type[V], vcf_raw_headers: List[str]) -> List[str]: """Extract the CSQ fields VEP output in the given VCF.""" # Get CSQ spec # Reverse the header order because the newer header appears later try: csq_info_header = next( line for line in reversed(vcf_raw_headers) if line.startswith("##INFO=<ID=CSQ,") ) except StopIteration: raise ValueError("Cannot find CSQ format in the VCF header") m = re.search(r"Format: ([\w\|]+)['\"]", csq_info_header) if m: csq_format = m.group(1) else: raise ValueError( f"Cannot parse the CSQ field format from its INFO VCF header: {csq_info_header}" ) csq_fields = csq_format.split("|") return csq_fields
[docs] @classmethod def read_vcf(cls: Type[V], path: Path) -> Generator[V, None, None]: """ Read VCF record from `path`. This function walks through each variant record in the given VCF using :class:`cyvcf2.VCF <cyvcf2.cyvcf2.VCF>`, and yields the record as a :class:`Variant` object. See also :meth:`read_and_parse_vcf` to read and parse the VCF. Args: path: Path to the VCF. Returns: An generator walking through all variants per record. """ with closing(VCF(str(path))) as vcf: for cy_variant in vcf: variant = cls.from_cyvcf2(cy_variant) yield variant
[docs] @classmethod def read_and_parse_vcf(cls: Type[V], path: Path) -> Generator[V, None, None]: """ Read and parse VCF record with its VEP-annotated CSQ from `path`. This function walks through each variant record in the given VCF using :class:`cyvcf2.VCF <cyvcf2.cyvcf2.VCF>`, and yields the record as a :class:`Variant` object. The parsed CSQ will be stored in the generated :attr:`Variant.parsed_csq`. Args: path: Path to the VCF. Returns: An generator walking through all variants per record. Examples: Read an annotated VCF:: >>> vcf_reader = Variant.read_and_parse_vcf('my.vcf') >>> variant = next(vcf_reader) >>> variant Variant(14:45658326C>T info: CSQ[5 parsed]) >>> variants[4].parsed_csq[0] CSQ(SYMBOL='FANCM', HGVSc='ENST00000267430.5:c.5101N>T', Consequence='stop_gained', …) Iterate all the VCF variants records:: >>> for variant in vcf_reader: ... print(variant.chrom, variant.parsed_csq[0]['Allele']) """ with closing(VCF(str(path))) as vcf: # Get the CSQ field definition and VEP version from the VCF header vcf_raw_headers = vcf.raw_header.splitlines() vep_version = cls.get_vep_version(vcf_raw_headers) csq_fields = cls.get_vep_csq_fields(vcf_raw_headers) logger.debug( f"VEP version {vep_version} with CSQ format [{len(csq_fields)} fields]: {','.join(csq_fields)}" ) for cy_variant in vcf: variant = cls.from_cyvcf2(cy_variant) variant._parse_csq(csq_fields) yield variant
[docs]def limit_seq_display(seq: str, limit: int = 5) -> str: """Limit the display of the sequence. Examples: >>> limit_seq_display('ATATCCG') 'ATATC…' >>> limit_seq_display('ATA') 'ATA' >>> limit_seq_display('ATA', limit=1) 'A…' """ if len(seq) > limit: seq = seq[:limit] + "…" return seq
[docs]class GeneInheritanceMode(Flag): """All possible modes of the gene inheritance dominance. Used by :attr:`CharGerConfig.inheritance_gene_table <charger.config.CharGerConfig.inheritance_gene_table>`. """ AUTO_DOMINANT = auto() #: The gene is autosomal dominant. AUTO_RECESSIVE = auto() #: The gene is autosomal recessive. X_LINKED_DOMINANT = auto() #: The gene is X-linked dominant. X_LINKED_RECESSIVE = auto() #: The gene is X-linked recessive. Y_LINKED = auto() #: The gene is Y-linked.
[docs] @classmethod def parse( cls: Type["GeneInheritanceMode"], value: str ) -> Optional["GeneInheritanceMode"]: """Parse the inheritance modes from the given string. Multiple modes are comma separated. >>> m = GeneInheritanceMode.parse("autosomal dominant, autosomal recessive") >>> m <GeneInheritanceMode.AUTO_RECESSIVE|AUTO_DOMINANT: 3> >>> bool(m & GeneInheritanceMode.AUTO_RECESSIVE) True >>> bool(m & GeneInheritanceMode.Y_LINKED) False >>> GeneInheritanceMode.parse("unknown") is None True """ MODE_TO_FLAG = { "autosomal dominant": cls.AUTO_DOMINANT, "autosomal recessive": cls.AUTO_RECESSIVE, "x-linked dominant": cls.X_LINKED_DOMINANT, "x-linked recessive": cls.X_LINKED_RECESSIVE, "y-linked": cls.Y_LINKED, } mode_flags = [] # Convert the input value to be lower case only for mode in value.lower().split(","): mode = mode.strip() if mode == "unknown": # Unknown has no flag continue try: mode_flags.append(MODE_TO_FLAG[mode]) except KeyError as e: raise ValueError( f"Invalid variant inheritance mode {mode} from {value}" ) from e if mode_flags: # Combined all the flags combined_mode = mode_flags[0] for flag in mode_flags[1:]: combined_mode |= flag return combined_mode else: return None
[docs]class ClinicalSignificance(Enum): """All possible clinical significance types of a variant.""" PATHOGENIC = "Pathogenic" LIKELY_PATHOGENIC = "Likely Pathogenic" LIKELY_BENIGN = "Likely Benign" BENIGN = "Benign" UNCERTAIN = "Uncertain Significance"
[docs] @classmethod def parse_clinvar_record( cls: Type["ClinicalSignificance"], record: Dict[str, str] ) -> "ClinicalSignificance": """Determine the pathogenicity of a ClinVar record.""" # The default pathogenicity clin_sig = cls.UNCERTAIN is_pathogenic = ( int(record["pathogenic"]) > 0 or int(record["likely_pathogenic"]) > 0 ) is_benign = int(record["benign"]) > 0 or int(record["likely_benign"]) > 0 if int(record["conflicted"]): # Conflicted record has uncertain clinical significance return cls.UNCERTAIN # There may be multiple assertions all_clin_sig_assertions = record["clinical_significance"].lower().split("/") if is_benign and is_pathogenic: # Fix parsing of conflicting ClinVar assertion # by checking the ClinVar assertion for assertion in all_clin_sig_assertions: if "likely benign" in assertion: clin_sig = cls.LIKELY_BENIGN elif "likely pathogenic" in assertion: clin_sig = cls.LIKELY_PATHOGENIC elif "benign" in assertion: clin_sig = cls.BENIGN break elif "pathogenic" in assertion: clin_sig = cls.PATHOGENIC break elif is_benign: for assertion in all_clin_sig_assertions: if "likely benign" in assertion: clin_sig = cls.LIKELY_BENIGN elif "benign" in assertion: clin_sig = cls.BENIGN break elif is_pathogenic: for assertion in all_clin_sig_assertions: if "likely pathogenic" in assertion: clin_sig = cls.LIKELY_PATHOGENIC elif "pathogenic" in assertion: clin_sig = cls.PATHOGENIC break return clin_sig