Source code for seqann.sequence_annotation

# -*- coding: utf-8 -*-

#
#    seqann Sequence Annotation
#    Copyright (c) 2017 Be The Match operated by National Marrow Donor Program. All Rights Reserved.
#
#    This library is free software; you can redistribute it and/or modify it
#    under the terms of the GNU Lesser General Public License as published
#    by the Free Software Foundation; either version 3 of the License, or (at
#    your option) any later version.
#
#    This library is distributed in the hope that it will be useful, but WITHOUT
#    ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
#    FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
#    License for more details.
#
#    You should have received a copy of the GNU Lesser General Public License
#    along with this library;  if not, write to the Free Software Foundation,
#    Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA.
#
#    > http://www.fsf.org/licensing/licenses/lgpl.html
#    > http://www.opensource.org/licenses/lgpl-license.php
#
import logging
import warnings

from Bio import BiopythonExperimentalWarning
warnings.simplefilter("ignore", BiopythonExperimentalWarning)

from Bio.Seq import Seq
from Bio import pairwise2
from Bio.Alphabet import IUPAC
from BioSQL.BioSeq import DBSeq
from BioSQL import BioSeqDatabase
from Bio.SeqRecord import SeqRecord
from Bio.SeqFeature import SeqFeature
from Bio.SeqFeature import ExactPosition
from Bio.SeqFeature import FeatureLocation

from seqann.models.annotation import Annotation
from seqann.models.reference_data import ReferenceData
from seqann.blast_cmd import blastn
from seqann.blast_cmd import get_locus
from seqann.seq_search import SeqSearch
from seqann.models.base_model_ import Model
from seqann.align import align_seqs
from seqann.util import randomid
from seqann.util import get_seqs
from seqann.util import checkseq
from seqann.util import isexon
from seqann.util import isfive
from seqann.util import isutr
from seqann.util import is_classII
from seqann.gfe import GFE

from itertools import repeat
from typing import Dict
from typing import List
import re


[docs]class BioSeqAnn(Model): ''' :: from seqann import BioSeqAnn seqann1 = BioSeqAnn() seqann2 = BioSeqAnn(dbversion="3300", verbose=True, verbosity=3) seqann3 = BioSeqAnn(debug={"align":4}, safe) :param server: A BioSQL database to use for retriving the sequence features. Using a BioSQL DB will speed up the annotations dramatically. :type server: :ref:`bio` :param dbversion: The IPD-IMGT/HLA or KIR database release. :type dbversion: ``str`` :param datfile: The IPD-IMGT/HLA or KIR dat file to use in place of the server parameter. :type datfile: ``str`` :param pid: A process label that can be provided to help track the logging output. :type pid: ``str`` :param load_features: Flag for downloading all gene features and accessions from the feature service. :type load_features: ``bool`` :param store_features: Flag for caching all features and their corresponding accessions. :type store_features: ``bool`` :param cached_features: Dictionary containing all the features from the feature service. :type cached_features: ``dict`` :param kir: Flag for indicating the input sequences are from the KIR gene system. :type kir: ``bool`` :param align: Flag for producing the alignments along with the annotations. :type align: ``bool`` :param verbose: Flag for running in verbose mode. :type verbose: ``bool`` :param verbosity: Numerical value to indicate how verbose the output will be in verbose mode. :type verbosity: ``int`` :param debug: Dictionary containing names of steps that you want to debug. :type debug: ``dict`` :param safemode: Flag for running the annotations in safemode. No alignments will be done if no feature matches were made. This can prevent the alignment step for running for too long on bad sequences. :type safemode: ``bool`` ''' def __init__(self, server: BioSeqDatabase=None, dbversion: str='3310', datfile: str='', verbose: bool=False, verbosity: int=0, pid: str='NA', kir: bool=False, align: bool=False, load_features: bool=False, store_features: bool=False, refdata: ReferenceData=None, cached_features: Dict=None, safemode: bool=False, debug: Dict=None): self.kir = kir self.align = align self.debug = debug self.server = server self.verbose = verbose self.safemode = safemode self.verbosity = verbosity self.align_verbose = verbose self.align_verbosity = verbosity self.dbversion = dbversion gfe_verbose = verbose gfe_verbosity = verbosity refdata_verbose = verbose seqsearch_verbose = verbose refdata_verbosity = verbosity seqsearch_verbosity = verbosity # Run with the most possible amount # of logging for the provided steps if self.debug: if 'seq_search' in self.debug: seqsearch_verbose = True seqsearch_verbosity = debug['seq_search'] else: seqsearch_verbose = False seqsearch_verbosity = 0 if 'refdata' in self.debug: refdata_verbose = True refdata_verbosity = debug['refdata'] else: refdata_verbose = False refdata_verbosity = 0 if 'seqann' in self.debug: self.verbose = True self.verbosity = debug['seqann'] else: self.verbose = False self.verbosity = 0 if 'align' in self.debug: self.align_verbose = True self.align_verbosity = debug['align'] else: self.align_verbose = False self.align_verbosity = 0 if 'gfe' in self.debug: gfe_verbose = True gfe_verbosity = debug['gfe'] else: gfe_verbose = False gfe_verbosity = 0 self.logger = logging.getLogger("Logger." + __name__) self.logname = "ID {:<10} -".format(str(pid)) # Initalize GFE self.gfe = GFE(pid=pid, verbose=gfe_verbose, verbosity=gfe_verbosity, load_features=load_features, store_features=store_features, cached_features=cached_features) if refdata: # Initialize with preloaded reference object self.refdata = refdata else: # Create new reference data object self.refdata = ReferenceData(server=server, dbversion=dbversion, alignments=align, verbose=refdata_verbose, verbosity=refdata_verbosity, kir=kir) # Initalize SeqSearch for matching features self.seqsearch = SeqSearch(verbose=seqsearch_verbose, verbosity=seqsearch_verbosity)
[docs] def annotate(self, sequence: Seq=None, locus: str=None, nseqs: int=20, alignseqs: int=10, skip: List=[], rerun: bool=True, full: bool=True) -> Annotation: """ annotate - method for annotating a BioPython sequence :param sequence: The input consensus sequence. :type sequence: Seq :param locus: The gene locus associated with the sequence. :type locus: ``str`` :param nseqs: The number of blast sequences to use. :type nseqs: ``int`` :param alignseqs: The number of sequences to use for targeted alignments. :type alignseqs: ``int`` :param skip: A list of alleles to skip for using as a reference. This is used for validation and testing. :type skip: ``List`` :rtype: :ref:`ann` Returns: The annotate function return an ``Annotation`` object that contains the sequence features and names associated with them. Example output:: { 'complete_annotation': True, 'annotation': {'exon_1': SeqRecord(seq=Seq('AGAGACTCTCCCG', SingleLetterAlphabet()), id='HLA:HLA00630', name='HLA:HLA00630', description='HLA:HLA00630 DQB1*03:04:01 597 bp', dbxrefs=[]), 'exon_2': SeqRecord(seq=Seq('AGGATTTCGTGTACCAGTTTAAGGCCATGTGCTACTTCACCAACGGGACGGAGC...GAG', SingleLetterAlphabet()), id='HLA:HLA00630', name='HLA:HLA00630', description='HLA:HLA00630 DQB1*03:04:01 597 bp', dbxrefs=[]), 'exon_3': SeqRecord(seq=Seq('TGGAGCCCACAGTGACCATCTCCCCATCCAGGACAGAGGCCCTCAACCACCACA...ATG', SingleLetterAlphabet()), id='HLA:HLA00630', name='<unknown name>', description='HLA:HLA00630', dbxrefs=[])}, 'features': {'exon_1': SeqFeature(FeatureLocation(ExactPosition(0), ExactPosition(13), strand=1), type='exon_1'), 'exon_2': SeqFeature(FeatureLocation(ExactPosition(13), ExactPosition(283), strand=1), type='exon_2') 'exon_3': SeqFeature(FeatureLocation(ExactPosition(283), ExactPosition(503), strand=1), type='exon_3')}, 'method': 'nt_search and clustalo', 'gfe': 'HLA-Aw2-1-1-1-1-1-1-1-1-1-1-1-1-1-1-1-4', 'seq': SeqRecord(seq=Seq('AGAGACTCTCCCGAGGATTTCGTGTACCAGTTTAAGGCCATGTGCTACTTCACC...ATG', SingleLetterAlphabet()), id='HLA:HLA00630', name='HLA:HLA00630', description='HLA:HLA00630 DQB1*03:04:01 597 bp', dbxrefs=[]) } Example usage: >>> from Bio.Seq import Seq >>> from seqann import BioSeqAnn >>> sequence = Seq('AGAGACTCTCCCGAGGATTTCGTGTACCAGTTTAAGGCCATGTGCTACTTCACC') >>> seqann = BioSeqAnn() >>> ann = seqann.annotate(sequence) >>> for f in ann.annotation: ... print(f, ann.method, str(ann.annotation[f].seq), sep="\t") exon_2 nt_search and clustalo AGGATTTCGTGTACCAGTTTAAGGCCATGTGCTACTTCACCAACGGGACGGAGCGCGTGCGTTATGTGACCAGATACATCTATAACCGAGAGGAGTACGCACGCTTCGACAGCGACGTGGAGGTGTACCGGGCGGTGACGCCGCTGGGGCCGCCTGCCGCCGAGTACTGGAACAGCCAGAAGGAAGTCCTGGAGAGGACCCGGGCGGAGTTGGACACGGTGTGCAGACACAACTACCAGTTGGAGCTCCGCACGACCTTGCAGCGGCGAG exon_3 nt_search and clustalo TGGAGCCCACAGTGACCATCTCCCCATCCAGGACAGAGGCCCTCAACCACCACAACCTGCTGGTCTGCTCAGTGACAGATTTCTATCCAGCCCAGATCAAAGTCCGGTGGTTTCGGAATGACCAGGAGGAGACAACCGGCGTTGTGTCCACCCCCCTTATTAGGAACGGTGACTGGACCTTCCAGATCCTGGTGATGCTGGAAATGACTCCCCAGCATGGAGACGTCTACACCTGCCACGTGGAGCACCCCAGCCTCCAGAACCCCATCACCGTGGAGTGGC exon_1 nt_search and clustalo AGAGACTCTCCCG exon_4 nt_search and clustalo GGGCTCAGTCTGAATCTGCCCAGAGCAAGATG """ # If sequence is now a biopython # sequence record convert it to one if isinstance(sequence, Seq): sequence = SeqRecord(seq=sequence, id="NO_ID") if isinstance(sequence, str): sequence = SeqRecord(seq=Seq(sequence), id="NO_ID") # If sequence contains any characters # other than ATCG then the GFE notation # can not be created create_gfe = checkseq(sequence.seq) if self.verbose and not create_gfe: self.logger.warning(self.logname + " Sequence alphabet " + "contains non DNA") self.logger.warning(self.logname + " No GFE string will be generated") # Check it the locus exists if not locus: if self.verbose: self.logger.info(self.logname + " No locus provided! ") # Guessing locus with blastn locus = get_locus(sequence, kir=self.kir, refdata=self.refdata) if locus and self.verbose: self.logger.info(self.logname + " Locus prediction = " + locus) if not locus: if self.verbose: self.logger.error(self.logname + " Locus could not be determined!") # TODO: Raise exception #raise NoLocusException("") return # Exact match found matched_annotation = self.refdata.search_refdata(sequence, locus) if matched_annotation and not skip: matched_annotation.exact = True # TODO: return name of allele if self.verbose: self.logger.info(self.logname + " exact match found") # Create GFE if create_gfe: feats, gfe = self.gfe.get_gfe(matched_annotation, locus) matched_annotation.gfe = gfe matched_annotation.structure = feats return matched_annotation # Run blast to get ref sequences blast = blastn(sequence, locus, nseqs, kir=self.kir, verbose=self.verbose, refdata=self.refdata) # If the blastn fails.. if blast.failed: if self.verbose: self.logger.info(self.logname + " Blastn failed!!") # Try and determine the locus and rerun. This is # useful for cases when the sequences is associated # with the wrong locus. locus = get_locus(sequence, kir=self.kir, refdata=self.refdata) if locus and self.verbose: self.logger.info(self.logname + " Locus prediction = " + locus) # Check if the locus could not be found if not locus: self.logger.error(self.logname + " Locus could not be determined!") # TODO: Raise exception return return self.annotate(sequence, locus, nseqs=nseqs, alignseqs=alignseqs) return # Do seq_search first on all blast sequences # retain what the largest ref seq is leastmissing = 100 partial_ann = None leastmissing_feat = None found = blast.match_seqs for i in range(0, len(found)): run = 0 if i == len(found)-1: run = 1 # Skip a reference # * For validation * if found[i].name in skip: continue if self.verbose: self.logger.info(self.logname + " Running seq_search with " + found[i].name) # * Running sequence search * # This does a simple string search for the # reference features within the provided sequence ann = self.seqsearch.search_seqs(found[i], sequence, locus, partial_ann=partial_ann, run=run) if ann.complete_annotation: if self.verbose: self.logger.info(self.logname + " Finished annotation using " + found[i].name) # Add alignment flag is specified if self.align: if self.verbose: self.logger.info(self.logname + " Adding alignment") ann = self.add_alignment(found[i], ann) if self.verbose and self.verbosity > 0: self.logger.info(self.logname + " Features annotated = " + ",".join(list(ann .annotation.keys()))) if self.verbosity > 3: for f in ann.features: self.logger.info(self.logname + " " + f + " = " + str(ann .annotation[f].seq)) # Create GFE if create_gfe: feats, gfe = self.gfe.get_gfe(ann, locus) ann.gfe = gfe ann.structure = feats ann.clean() return ann else: partial_ann = ann if hasattr(partial_ann, 'refmissing'): if len(partial_ann.refmissing) < leastmissing: leastmissing_feat = found[i] leastmissing = len(partial_ann.refmissing) else: leastmissing_feat = found[i] leastmissing = 0 if self.verbose and self.verbosity > 0: self.logger.info(self.logname + " Using partial annotation * run " + str(i) + " *") self.logger.info(self.logname + " Features found (" + str(len(ann.features.keys())) + ") = " + ",".join(list(ann .features.keys()))) self.logger.info(self.logname + " Features missing (" + str(len(ann.missing.keys())) + ") = " + ",".join(list(ann .missing.keys()))) self.logger.info(self.logname + " Sequence unmapped = " + str(ann.covered)) self.logger.info(self.logname + " ############" + "##################") # The number of sequences being used for alignment # can't be greater than the number of sequences # to be returned from the blast results if alignseqs > len(found): alignseqs = len(found)-1 # * HARD CODED LOGIC * # # > After testing with multiple thresholds # this value seemed to work best. # # Aligned % cutoff align_cutoff = .90 if((not hasattr(partial_ann, 'features') or len(partial_ann.features) == 0) and len(sequence) > 700 and self.safemode): self.logger.error("No feature matches!") self.logger.error("Running in safe mode. " + "No alignments will be done!") if rerun: # Check to see if reverse comp # TODO: Add note for reverse complement self.logger.info("Running with reverse complement.") sequence = sequence.reverse_complement() return self.annotate(sequence=sequence, locus=locus, rerun=False) return # Now loop through doing alignment for i in range(0, alignseqs): # Skip a reference # * For validation * if found[i].name in skip: continue if self.verbose: self.logger.info(self.logname + " running ref_align with " + found[i].name) aligned_ann = self.ref_align(found[i], sequence, locus, annotation=partial_ann, run=i, cutoff=align_cutoff) # * HARD CODED LOGIC * # # > If sequences are very novel, then the alignment # cutoff may be to stringent. Incrementally decreasing # the cutoff improves the likelihood of these sequences # being annotated. align_cutoff -= .01 if aligned_ann and aligned_ann.complete_annotation: if self.align: if self.verbose: self.logger.info(self.logname + " Adding alignment") aligned_ann = self.add_alignment(found[i], aligned_ann) if self.verbose: self.logger.info(self.logname + " Finished ref_align annotation using " + found[i].name) if self.verbose and self.verbosity > 0: self.logger.info(self.logname + " Features annotated = " + ",".join(list(aligned_ann .annotation.keys()))) if self.verbosity > 2: for f in aligned_ann.annotation: self.logger.info(self.logname + " " + f + " = " + str(aligned_ann .annotation[f].seq)) # Create GFE if create_gfe: feats, gfe = self.gfe.get_gfe(aligned_ann, locus) aligned_ann.gfe = gfe aligned_ann.structure = feats aligned_ann.clean() return aligned_ann elif(aligned_ann): if self.verbose and self.verbosity > 0: self.logger.info(self.logname + " Using partial annotation " + "for alignment * run " + str(i) + " - cutoff = " + str(align_cutoff) + " *") self.logger.info(self.logname + " Features found = " + ",".join(list(aligned_ann .features.keys()))) self.logger.info(self.logname + " Features missing = " + ",".join(list(aligned_ann .missing.keys()))) self.logger.info(self.logname + " ############" + "##################") partial_ann = aligned_ann if(hasattr(partial_ann, 'annotation') and partial_ann.annotation): exon_only = True for f in partial_ann.annotation: if re.search("intron", f) or re.search("UTR", f): exon_only = False if(is_classII(locus) and exon_only and len(partial_ann.annotation.keys()) > 0 and align_cutoff < .9): align_cutoff = .80 if not is_classII(locus) and align_cutoff < .88: align_cutoff = .88 # Don't run full # annotation if flag is passed if not full: return if self.verbose: self.logger.info(self.logname + " running full alignment") # Try doing full alignment full_align = self.ref_align(leastmissing_feat, sequence, locus, partial_ann=partial_ann, cutoff=.80) if self.verbose: self.logger.info(self.logname + " Finished ref_align annotation using full " + leastmissing_feat.name) # Check to see if an annotation was returned if(not isinstance(full_align, Annotation) or isinstance(full_align, str)): if(not rerun or len(sequence) > 4000): self.logger.info(self.logname + " Failed annotation!") return Annotation() else: if self.verbose and self.verbosity > 0: self.logger.info(self.logname + " Reruning annotation!") return self.annotate(sequence=sequence, locus=locus, alignseqs=2, nseqs=nseqs+1, skip=[found[0].name], rerun=False) # Check if the annotation is complete if not full_align.complete_annotation and self.verbose: self.logger.info(self.logname + " Incomplete annotation!") # Add the alignment to the annotation if self.align and full_align.complete_annotation: if self.verbose: self.logger.info(self.logname + " Adding alignment") full_align = self.add_alignment(leastmissing_feat, full_align) if self.verbose and self.verbosity > 0: self.logger.info(self.logname + " Features annotated = " + ",".join(list(full_align .annotation.keys()))) if self.verbosity > 2: for f in full_align.annotation: self.logger.info(self.logname + " " + f + " = " + str(full_align .annotation[f].seq)) # Create GFE if create_gfe and full_align.complete_annotation: feats, gfe = self.gfe.get_gfe(full_align, locus) full_align.gfe = gfe full_align.structure = feats full_align.clean() if(full_align.complete_annotation or not rerun or len(sequence) > 4000): return full_align else: if self.verbose and self.verbosity > 0: self.logger.info(self.logname + " Reruning annotation!") return self.annotate(sequence=sequence, locus=locus, nseqs=nseqs+1, alignseqs=2, skip=[found[0].name], rerun=False)
[docs] def ref_align(self, found_seqs, sequence: Seq=None, locus: str=None, annotation: Annotation=None, partial_ann: Annotation=None, run: int=0, cutoff: float=.90) -> Annotation: """ ref_align - Method for doing targeted alignments on partial annotations :param found_seqs: The input sequence record. :type found_seqs: Seq :param sequence: The input sequence record. :type sequence: Seq :param locus: The gene locus associated with the sequence. :type locus: ``str`` :param annotation: The incomplete annotation from a previous iteration. :type annotation: :ref:`ann` :param partial_ann: The partial annotation after looping through all of the blast sequences. :type partial_ann: :ref:`ann` :rtype: :ref:`ann` """ if annotation and isinstance(annotation, Annotation): if 0 in annotation.mapping \ and not isinstance(annotation.mapping[0], int): ft = annotation.mapping[0] start_order = self.refdata.structures[locus][ft] else: start_order = 0 # Check whether it's exons only exon_only = True if hasattr(annotation, 'annotation') and annotation.annotation: for f in annotation.annotation: if re.search("intron", f) or re.search("UTR", f): exon_only = False elif(len(sequence.seq) > 900): exon_only = False annoated = [] if hasattr(annotation, 'annotation') and annotation.annotation: annoated = list(annotation.annotation.keys()) # Extract the missing blocks and # only align those blocks to the known # missing features # Start with all blocks missing # and then delete block if it is found tmp_missing = [] missing_blocks = annotation.blocks for b in sorted(annotation.blocks): # **** Check if block equals full input sequence *** # # - If it does, then just align the ful start = b[0]-1 if b[0] != 0 else 0 seq_feat = \ SeqFeature( FeatureLocation( ExactPosition(start), ExactPosition(b[len(b)-1]), strand=1), type="unmapped") feat = seq_feat.extract(annotation.seq) combosrecs, exons, fullrec = self._refseqs(locus, start, annotation, feat, b) # Print out different sequence types being align if self.verbose and self.verbosity > 3: for combseqr in combosrecs: self.logger.info(self.logname + " " + combseqr.id) ic = 0 # Looping through the generated combos # from the reference sequence. (ex. intron1-exon1) for combseqr in combosrecs: if self.verbose: self.logger.info(self.logname + " aligning ->" + combseqr.id) # Running the alignment an, ins, dels = align_seqs(combseqr, feat, locus, start, annotation.missing, len(annoated), cutoff=cutoff, verbose=self.align_verbose, verbosity=self.align_verbosity) # Checking if any features were mapped mapped_feat = list(an.annotation.keys()) if len(mapped_feat) >= 1: # loop through the annotated features for f in an.annotation: f_order = self.refdata.structures[locus][f] # Only map features if they are in the correct order if f in annotation.missing \ and f_order >= start_order: # * HARD CODED LOGIC * # Only accept the alignments if they are # somewhat similar to what's been observed already length, lengthsd = 0, 0 length = float(self.refdata.feature_lengths[locus][f][0]) lengthsd = float(self.refdata.feature_lengths[locus][f][1]) incr = 3 if not is_classII(locus) else 4 max_length = length + (lengthsd*incr) + ins min_length = length - (lengthsd*incr) - dels if f == "exon_8" and not is_classII(f): max_length = 10 # Check ordering when it's only exon sequences if exon_only: f_order = self.refdata.structures[locus][f] endp = an.features[f].location.end + 1 #Make sure order of alignment make sense if an.features[f].location.start == 0 \ and f != "five_prime_UTR" \ and not isexon(f): del an.features[f] continue if endp in annotation.mapping and not isinstance(annotation.mapping[endp], int): mf = annotation.mapping[endp] expected_order = f_order + 1 expected_order2 = f_order + 2 if expected_order != self.refdata.structures[locus][mf] and expected_order2 != self.refdata.structures[locus][mf]: self.logger.info(self.logname + " out of order1 -> " + mf + " " + f) del an.features[f] continue startp = an.features[f].location.start - 1 if startp in annotation.mapping and not isinstance(annotation.mapping[startp], int): mf = annotation.mapping[startp] expected_order = f_order - 1 expected_order2 = f_order - 2 if expected_order != self.refdata.structures[locus][mf] and expected_order2 != self.refdata.structures[locus][mf]: self.logger.info(self.logname + " out of order2 -> " + mf + " " + f) del an.features[f] continue else: ##Make sure order of alignment make sense if an.features[f].location.start == 0 \ and f != "five_prime_UTR" \ and 'three_prime_UTR' in annotation.annotation: del an.features[f] continue f_order = self.refdata.structures[locus][f] endp = an.features[f].location.end + 1 if endp in annotation.mapping and not isinstance(annotation.mapping[endp], int): mf = annotation.mapping[endp] expected_order = f_order + 1 if expected_order != self.refdata.structures[locus][mf]: self.logger.info(self.logname + " out of order12 -> " + mf + " " + f) del an.features[f] continue startp = an.features[f].location.start - 1 if startp in annotation.mapping and not isinstance(annotation.mapping[startp], int): mf = annotation.mapping[startp] expected_order = f_order - 1 if expected_order != self.refdata.structures[locus][mf]: self.logger.info(self.logname + " out of order22 -> " + mf + " " + f) del an.features[f] continue if self.verbose and self.verbosity > 0: sl = str(len(an.annotation[f])) self.logger.info(self.logname + " " + locus + " " + f + " len = " + sl + " | max = " + str(max_length) + " | min = " + str(min_length)) if len(an.annotation[f]) <= max_length: self.logger.info(self.logname + " " + locus + " " + f + " " + sl + " <= " + str(max_length)) else: self.logger.info(self.logname + " " + locus + " " + f + " " + sl + " ! <= !" + str(max_length)) if len(an.annotation[f]) >= min_length: self.logger.info(self.logname + " " + locus + " " + f + " " + sl + " >= " + str(min_length)) else: self.logger.info(self.logname + " " + locus + " " + f + " " + sl + " ! >= !" + str(min_length)) # Update the original annotation # with the returned annotation if(len(an.annotation[f]) <= max_length and len(an.annotation[f]) >= min_length): if self.verbose and self.verbosity > 0: self.logger.info(self.logname + " Annotated " + f + " with clustalo using " + combseqr.id) self.logger.info(self.logname + " Coordinates for " + f + str(an.features[f].location.start) + " - " + str(an.features[f].location.end) ) if annotation.annotation: annotation.annotation.update({f: an.annotation[f] }) annotation.features.update({f: an.features[f] }) else: annotation.annotation = {} annotation.annotation.update({f: an.annotation[f] }) annotation.features.update({f: an.features[f] }) if f in annotation.refmissing: i = annotation.refmissing.index(f) del annotation.refmissing[i] if f in annotation.missing: del annotation.missing[f] #if b in annotation.blocks: # del annotation.blocks[annotation.blocks.index(b)] # print(annotation.blocks) # if an.blocks: # #print("PARTIAL BLOCK") # #print(an.blocks) # if b in missing_blocks: # del missing_blocks[missing_blocks.index(b)] # if self.verbose and self.verbosity > 0: # self.logger.info(self.logname # + " Part of block mapped") # else: # if self.verbose and self.verbosity > 0: # self.logger.info(self.logname # + " All blocks mapped") # del annotation.blocks[annotation.blocks.index(b)] # if b in missing_blocks: # del missing_blocks[missing_blocks.index(b)] else: self.logger.info(self.logname + " FAILED HERE!!!!!!!!!") #elif b not in mbtmp and b in missing_blocks: # mbtmp.append(b) else: self.logger.info(self.logname + " OUT OF ORDER !!!!!!!!!!!!!!!!!!!") # Update the coordinates coordinates = dict(map(lambda x: [x, 1], [i for i in range(0, len(sequence.seq)+1)])) for f in annotation.features: s = annotation.features[f].location.start e = annotation.features[f].location.end if s != 0: s += 1 e += 1 else: e += 1 for i in range(s, e): annotation.mapping[i] = f if i in coordinates: del coordinates[i] # Get any remaining blocks after updated # annotation blocks = getblocks(coordinates) annotation.blocks = blocks # Check to see if that annotation is complete annotation.check_annotation() if annotation.complete_annotation: if self.verbose: self.logger.info(self.logname + " Completed annotation" + " with targeted ref_align") return annotation else: if an.features: # for f in an.features: # f_order = self.refdata.structures[locus][f] # # Only add features if they are after the # # first feature mapped # if f_order >= start_order and f not in annotation.features \ # and f in annotation.annotation: # annotation.features[f] = an.features[f] # Rerunning seqsearch with # new annotation from alignment tmpann = self.seqsearch.search_seqs(found_seqs, sequence, locus, partial_ann=annotation, run=run) if tmpann.complete_annotation: for f in tmpann.annotation: if f not in annotation.annotation: annotation.annotation[f] = tmpann.annotation[f] if self.verbose: self.logger.info(self.logname + " Completed annotation" + " with targeted ref_align and seqsearch!") return tmpann annotation = tmpann ic += 1 # Has to be missing exons exons_n = 0 for f in annotation.missing: if re.search("intron", f) or re.search("UTR", f): exons_n += 1 # Run exon only alignment if len(exons.seq) >= 4 and exons_n > 0: exonan, ins, dels = align_seqs(exons, feat, locus, start, annotation.missing, len(annoated), cutoff=cutoff, verbose=self.align_verbose, verbosity=self.align_verbosity) mapped_exons = list(exonan.annotation.keys()) if len(mapped_exons) >= 1: if self.verbose: self.logger.info(self.logname + " Annotated exons with align") for f in exonan.annotation: if self.verbose and self.verbosity > 0: self.logger.info(self.logname + " Annotated " + f + " len = " + str(len(exonan .annotation[f]))) annotation.annotation.update({f: exonan.annotation[f]}) annotation.features.update({f: exonan.features[f]}) coordinates = dict(map(lambda x: [x, 1], [i for i in range(0, len(sequence.seq)+1)])) for f in annotation.features: s = annotation.features[f].location.start e = annotation.features[f].location.end if s != 0: s += 1 e += 1 else: e += 1 for i in range(s, e): annotation.mapping[i] = f if i in coordinates: del coordinates[i] blocks = getblocks(coordinates) annotation.blocks = blocks annotation.check_annotation() if annotation.complete_annotation: if self.verbose: self.logger.info(self.logname + " Completed annotation with targeted exons ref_align") return annotation return annotation elif partial_ann: annoated = [] if hasattr(partial_ann, 'annotation') and partial_ann.annotation: annoated = list(partial_ann.annotation.keys()) # Do full sequence alignments # any only extract out the part # that couldn't be explained from above if 0 in partial_ann.mapping \ and not isinstance(partial_ann.mapping[0], int): ft = partial_ann.mapping[0] start_order = self.refdata.structures[locus][ft] else: start_order = 0 # Extract the missing blocks and # only align those blocks to the known # missing features # Start with all blocks missing # and then delete block if it is found tmp_missing = [] missing_blocks = partial_ann.blocks for b in sorted(partial_ann.blocks): # **** Check if block equals full input sequence *** # # - If it does, then just align the ful start = b[0]-1 if b[0] != 0 else 0 seq_feat = \ SeqFeature( FeatureLocation( ExactPosition(start), ExactPosition(b[len(b)-1]), strand=1), type="unmapped") feat = seq_feat.extract(partial_ann.seq) combosrecs, exons, fullrec = self._refseqs(locus, start, partial_ann, feat, b) if len(fullrec.seq) >= 4: fullref, ins, dels = align_seqs(fullrec, feat, locus, start, partial_ann.missing, len(annoated), cutoff=cutoff, verbose=self.align_verbose, verbosity=self.align_verbosity) if hasattr(fullref, 'features') and fullref.features: mapped_full = list(fullref.annotation.keys()) if len(mapped_full) >= 1: if self.verbose: self.logger.info(self.logname + " Annotated fullrec" + " with clustalo") # If it wasn't found del missing_blocks[missing_blocks.index(b)] for f in fullref.annotation: if self.verbose and self.verbosity > 0: self.logger.info(self.logname + " Annotated " + f + " len = " + str(len(fullref.annotation[f]))) partial_ann.annotation.update({f: fullref.annotation[f]}) if b in missing_blocks: del missing_blocks[missing_blocks.index(b)] else: for bm in tmp_missing: if bm in missing_blocks: del missing_blocks[missing_blocks.index(bm)] for f in fullref.features: f_order = self.refdata.structures[locus][f] # Only add features if they are after the # first feature mapped if f_order >= start_order and f not in partial_ann.features \ and f in partial_ann.annotation: partial_ann.features[f] = fullref.features[f] coordinates = dict(map(lambda x: [x, 1], [i for i in range(0, len(sequence.seq)+1)])) for f in partial_ann.features: s = partial_ann.features[f].location.start e = partial_ann.features[f].location.end if s != 0: s += 1 e += 1 else: e += 1 for i in range(s, e): partial_ann.mapping[i] = f if i in coordinates: del coordinates[i] blocks = getblocks(coordinates) partial_ann.check_annotation() if partial_ann.complete_annotation: if self.verbose: self.logger.info(self.logname + " Annotated all features with clustalo") return partial_ann if self.verbose: self.logger.info(self.logname + " Failed to annotate features") return ''
# else: # self.logger.error(self.logname + " invalid use") # # Option for user that just want to # # align a set of sequences # return
[docs] def add_alignment(self, ref_seq, annotation) -> Annotation: """ add_alignment - method for adding the alignment to an annotation :param ref_seq: List of reference sequences :type ref_seq: List :param annotation: The complete annotation :type annotation: Annotation :rtype: Annotation """ seq_features = get_seqs(ref_seq) annoated_align = {} allele = ref_seq.description.split(",")[0] locus = allele.split("*")[0].split("-")[1] for feat in seq_features: if feat in annotation.annotation: if isinstance(annotation.annotation[feat], DBSeq): seq_len = len(str(annotation.annotation[feat])) ref_len = len(seq_features[feat]) else: seq_len = len(str(annotation.annotation[feat].seq)) ref_len = len(seq_features[feat]) if seq_len == ref_len: seq = list(annotation.annotation[feat].seq) gaps = self.refdata.annoated_alignments[locus][allele][feat]['Gaps'] if self.verbose and self.verbosity > 0: self.logger.info(self.logname + " Lengths match for " + feat) self.logger.info(self.logname + " Gaps at " + feat) self.logger.info(self.logname + "-".join([",".join([str(s) for s in g]) for g in gaps])) for i in range(0, len(gaps)): for j in gaps[i]: loc = j seq.insert(loc, '-') nseq = ''.join(seq) annoated_align.update({feat: nseq}) else: in_seq = str(annotation.annotation[feat].seq) ref_seq = self.refdata.annoated_alignments[locus][allele][feat]['Seq'] alignment = pairwise2.align.globalxx(in_seq, ref_seq) if self.verbose and self.verbosity > 0: self.logger.info(self.logname + " Align2 -> in_seq != ref_len " + feat) self.logger.info(self.logname + " " + str(len(in_seq)) + " == " + str(ref_len)) annoated_align.update({feat: alignment[0][0]}) else: nseq = ''.join(list(repeat('-', len(seq_features[feat])))) annoated_align.update({feat: nseq}) annotation.aligned = annoated_align return annotation
def _refseqs(self, locus, start_pos, annotation, feat, block): # refseqs = _refseqs(locus, start, annotation, # sequence.seq, feat, # b) # - what is size of input seq # - whst is size of block # - what is features have been found # - Of the features that have not been found # and follow these rules: # ** ONLY align features that are expected # this block # - if 5'UTR and exon1 are ambigous # but intron1 is known, only align to # 5'UTR and exon1 # - LOOK for any features found above and # behind block. Only USE features within that # span. # # - if it's at the start of the input seq # then skip anything that comes after the # block and is missing # - if it's at the end of the input seq # - then skip anything that comes before the # block and is missing # - Try first all combos of features where # len(block) <= ave len blocks - sd or # len(block) >= len blocks + sd # # return blank if missing features # If exon only, then only extract exon sequences end_pos = start_pos + len(feat.seq) missing_feats = annotation.missing mapping = annotation.mapping # TODO: Do not use combos that # don't have missing features if start_pos == 0: prv_order = -1 else: prev_feat = mapping[start_pos-1] if isinstance(prev_feat, int): prev_feat = mapping[start_pos] prv_order = self.refdata.structures[locus][prev_feat] if len(mapping) == end_pos+1: nxt_order = len(self.refdata.structures[locus])+1 else: next_feat = mapping[end_pos+1] if isinstance(next_feat, int): nxt_order = prv_order + 2 else: nxt_order = self.refdata.structures[locus][next_feat] start = 0 exstart = 0 all_seqs = [] all_feats = [] exons_only = [] exon_feats = [] all_seqrecs = {} for f in sorted(missing_feats, key=lambda k: self.refdata.structures[locus][k]): mis_order = self.refdata.structures[locus][f] if mis_order > nxt_order \ or mis_order < prv_order: continue else: all_seqs.append(annotation.missing[f]) seq_feat, start = self._make_seqfeat(start, annotation.missing[f], f) all_feats.append(seq_feat) one_seqfeat, x = self._make_seqfeat(0, annotation.missing[f], f) ref1 = Seq(str(annotation.missing[f]), IUPAC.unambiguous_dna) rid1 = "Ref1_" + str(f) + "_" + str(randomid(N=2)) refrec1 = SeqRecord(seq=ref1, features=[one_seqfeat], id=rid1) all_seqrecs.update({f: refrec1}) if isexon(f): ftx, exstart = self._make_seqfeat(exstart, annotation.missing[f], f) exons_only.append(annotation.missing[f]) exon_feats.append(ftx) # Exons only exononly_seq = "".join([str(ft) for ft in exons_only]) refseq_exons = Seq(exononly_seq, IUPAC.unambiguous_dna) refid_exons = "RefExons_" + str(randomid()) refrec_exons = SeqRecord(seq=refseq_exons, features=exon_feats, id=refid_exons) # All seqs seq = "".join([str(ft) for ft in all_seqs]) refseq = Seq(seq, IUPAC.unambiguous_dna) refid = "RefAll_" + str(randomid()) refrec = SeqRecord(seq=refseq, features=all_feats, id=refid) # ** Creat all combos of seqrecs and sort by # which are most similar in length, gc content and molecular weight combos = [] for f in sorted(all_seqrecs, key=lambda k: self.refdata.structures[locus][k]): start_ord = self.refdata.structures[locus][f] if start_ord == nxt_order-1: nfeat = self.refdata.struct_order[locus][start_ord] if nfeat not in all_seqrecs: continue rec = all_seqrecs[nfeat] #length = float(self.refdata.feature_lengths[locus][f][0]) #lengthsd = float(self.refdata.feature_lengths[locus][f][1]) #max_length = length + lengthsd #min_length = length - lengthsd #if len(rec.seq) <= max_length \ # and len(rec.seq) >= min_length: recf, x = self._make_seqfeat(0, rec.seq, nfeat) ctmpid1 = "Ref1_" + str(nfeat) + "_" + str(randomid(N=2)) tmprec1 = SeqRecord(seq=rec.seq, features=[recf], id=ctmpid1) combos.append(tmprec1) else: if isexon(f): cstart = 0 ref_feats = [] combo_seq = [] feat_names = [] combo_feats = [] for i in range(start_ord, nxt_order): nfeat = self.refdata.struct_order[locus][i] if nfeat not in all_seqrecs or not isexon(nfeat): continue ref_feats.append(nfeat) length, lengthsd = 0, 0 #for fr in ref_feats: # length += float(self.refdata.feature_lengths[locus][fr][0]) # lengthsd += float(self.refdata.feature_lengths[locus][fr][1]) # min and max lengths expected #max_length = length + lengthsd #min_length = length - lengthsd rec = all_seqrecs[nfeat] feat_names.append(nfeat) cfeat, cstart = self._make_seqfeat(cstart, rec.seq, nfeat) combo_feats.append(cfeat) combo_seq.append(rec.seq) seqtmp = "".join([str(ft) for ft in combo_seq]) ctmpseq = Seq(seqtmp, IUPAC.unambiguous_dna) ctmpid = "|".join([str(ft) for ft in feat_names]) \ + "_" + str(randomid(N=2)) #if len(ctmpseq) <= max_length \ # and len(ctmpseq) >= min_length: tmprec = SeqRecord(seq=ctmpseq, features=combo_feats, id=ctmpid) combos.append(tmprec) cstart = 0 ref_feats = [] combo_seq = [] feat_names = [] combo_feats = [] for i in range(start_ord, nxt_order): nfeat = self.refdata.struct_order[locus][i] if nfeat not in all_seqrecs: continue ref_feats.append(nfeat) length, lengthsd = 0, 0 #for fr in ref_feats: # length += float(self.refdata.feature_lengths[locus][fr][0]) # lengthsd += float(self.refdata.feature_lengths[locus][fr][1]) # min and max lengths expected #max_length = length + lengthsd #min_length = length - lengthsd rec = all_seqrecs[nfeat] feat_names.append(nfeat) cfeat, cstart = self._make_seqfeat(cstart, rec.seq, nfeat) combo_feats.append(cfeat) combo_seq.append(rec.seq) seqtmp = "".join([str(ft) for ft in combo_seq]) ctmpseq = Seq(seqtmp, IUPAC.unambiguous_dna) ctmpid = "|".join([str(ft) for ft in feat_names]) \ + "_" + str(randomid(N=2)) #if len(ctmpseq) <= max_length \ # and len(ctmpseq) >= min_length: tmprec = SeqRecord(seq=ctmpseq, features=combo_feats, id=ctmpid) combos.append(tmprec) if combos: # Sort combos by how close they # are in length to the input sequence block # * may need to reverse sort * sorted_combos = sorted(combos, key=lambda rec: abs(len(rec.seq) - len(feat))) else: sorted_combos = [] return sorted_combos, refrec_exons, refrec def _make_seqfeat(self, start, sequence, featname): if isutr(featname): ftype = "5UTR" if isfive(featname) else "3UTR" end = start + len(sequence) seq_feat = \ SeqFeature( FeatureLocation( ExactPosition(start), ExactPosition(end), strand=1), type=ftype) start = end return seq_feat, start else: ftype, rank = featname.split("_") end = start + len(sequence) quals = {'number': [str(rank)]} seq_feat = \ SeqFeature( FeatureLocation( ExactPosition(start), ExactPosition(end), strand=1), type=ftype, qualifiers=quals) start = end return seq_feat, start
# TODO: Add to util
[docs]def getblocks(coords): block = [] blocks = [] sorted_i = sorted(coords.keys()) if len(sorted_i) == 1: return [sorted_i] for i in range(0, len(sorted_i)-1): j = i+1 if i == 0: block.append(sorted_i[i]) if(j <= len(sorted_i)-1): if(sorted_i[i] == sorted_i[j]-1): block.append(sorted_i[j]) else: blocks.append(block) block = [] block.append(sorted_i[j]) else: block.append(sorted_i[j]) if len(block) > 1: blocks.append(block) return blocks