Source code for seqann.models.reference_data

# -*- coding: utf-8 -*-
#
#    seqann Sequence Annotation.
#    Copyright (c) 2017 Be The Match operated by National Marrow Donor Program. All Rights Reserved.
#
#    This library is free software; you can redistribute it and/or modify it
#    under the terms of the GNU Lesser General Public License as published
#    by the Free Software Foundation; either version 3 of the License, or (at
#    your option) any later version.
#
#    This library is distributed in the hope that it will be useful, but WITHOUT
#    ANY WARRANTY; with out even the implied warranty of MERCHANTABILITY or
#    FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
#    License for more details.
#
#    You should have received a copy of the GNU Lesser General Public License
#    along with this library;  if not, write to the Free Software Foundation,
#    Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA.
#
#    > http://www.fsf.org/licensing/licenses/lgpl.html
#    > http://www.opensource.org/licenses/lgpl-license.php
#
from __future__ import absolute_import

import re
import os
import sys
import csv
import glob
import pickle
import logging
import pymysql
import urllib.request
from typing import List, Dict

from seqann.util import get_structures
from seqann.util import get_structorder

from Bio import SeqIO
from BioSQL import BioSeqDatabase
from Bio.SeqRecord import SeqRecord

from seqann.util import get_features
from seqann.util import deserialize_model
from seqann.models.base_model_ import Model
from seqann.models.annotation import Annotation

biosqlpass = "my-secret-pw"
if os.getenv("BIOSQLPASS"):
    biosqlpass = os.getenv("BIOSQLPASS")

biosqluser = 'root'
if os.getenv("BIOSQLUSER"):
    biosqluser = os.getenv("BIOSQLUSER")

biosqlhost = "localhost"
if os.getenv("BIOSQLHOST"):
    biosqlhost = os.getenv("BIOSQLHOST")

biosqldb = "bioseqdb"
if os.getenv("BIOSQLDB"):
    biosqldb = os.getenv("BIOSQLDB")

biosqlport = 3307
if os.getenv("BIOSQLPORT"):
    biosqlport = int(os.getenv("BIOSQLPORT"))


[docs]def download_dat(url, dat): urllib.request.urlretrieve(url, dat)
# TODO: Use the AWS Lamba API for getting latest IMGT/DB
[docs]class ReferenceData(Model): ''' classdocs ''' def __init__(self, server: BioSeqDatabase=None, datafile: str=None, dbversion: str='3310', alleles: List=None, seqdata: Dict=None, hladata: Dict=None, featuredata=None, kir: bool=False, alignments: bool=False, verbose: bool=False, verbosity: int=0): """ ReferenceData - a model defined in Swagger :param server: The server of this ReferenceData. :type server: BioSeqDatabase :param datafile: The datafile of this ReferenceData. :type datafile: str :param dbversion: The dbversion of this ReferenceData. :type dbversion: str """ self.data_types = { 'server': BioSeqDatabase, 'datafile': str, 'dbversion': str, 'hla_names': List[str], 'feature_lengths': Dict, 'hlaref': Dict, 'seqref': Dict, 'feature_lengths': Dict, 'structure_max': Dict, 'struct_order': Dict, 'structures': Dict, 'blastdb': str, 'server_avail': bool, 'verbose': bool, 'verbosity': int, 'alignments': bool } self.attribute_map = { 'seqdata': 'seqdata', 'hlaref': 'hlaref', 'seqref': 'seqref', 'server': 'server', 'datafile': 'datafile', 'dbversion': 'dbversion', 'hla_names': 'hla_names', 'structure_max': 'structure_max', 'feature_lengths': 'feature_lengths', 'struct_order': 'struct_order', 'structures': 'structures', 'blastdb': 'blastdb', 'hla_loci': 'hla_loci', 'server_avail': 'server_avail', 'kir': 'kir', 'alignments': 'alignments', 'verbose': 'verbose', 'verbosity': 'verbosity' } self._seqref = {} self._hlaref = {} self._kir = kir self._verbose = verbose self._verbosity = verbosity self._dbversion = dbversion self._server = server self._datafile = datafile self._alignments = alignments self._server_avail = True if server else False self.logger = logging.getLogger("Logger." + __name__) hla_url = 'https://raw.githubusercontent.com/ANHIG/IMGTHLA/' \ + dbversion + '/hla.dat' kir_url = 'ftp://ftp.ebi.ac.uk/pub/databases/ipd/kir/KIR.dat' hla_loci = ['HLA-A', 'HLA-B', 'HLA-C', 'HLA-DRB1', 'HLA-DQB1', 'HLA-DPB1', 'HLA-DQA1', 'HLA-DPA1', 'HLA-DRB3', 'HLA-DRB4', 'HLA-DRB5', 'HLA-DRA'] if self.verbose and verbosity > 0: self.logger.info("IPD-IMGT/HLA release = " + str(dbversion)) self.logger.info("HLA URL = " + hla_url) self.logger.info("KIR URL = " + kir_url) if self.server_avail: self.logger.info("Using BioSQL Server") self.logger.info("BIOSQLUSER = " + biosqluser) self.logger.info("BIOSQLHOST = " + biosqlhost) self.logger.info("BIOSQLDB = " + biosqldb) self.logger.info("BIOSQLPORT = " + str(biosqlport)) # TODO: ** Have script seqann --setup (--latest|--release|--all) # - downloads and creates all files # - removes all data files except alignment files # - Creates blast db # # TODO: Download! Don't have in package! hla_names = [] data_dir = os.path.dirname(__file__) if kir: blastdb = data_dir + '/../data/blast/KIR' allele_list = data_dir + '/../data/allele_lists/Allelelist.' \ + 'KIR.txt' else: blastdb = data_dir + '/../data/blast/' + dbversion allele_list = data_dir + '/../data/allele_lists/Allelelist.' \ + dbversion + '.txt' if alleles: self._hla_names = alleles else: splitter = " " if int(dbversion) < 3320 else "," # Open allele list file try: with open(allele_list, 'r') as f: for line in f: line = line.rstrip() if re.search("#", line): continue accession, name = line.split(splitter) if not kir: hla_names.append("HLA-" + name) else: hla_names.append(name) f.close() if self.verbose and verbosity > 0: self.logger.info("Loaded " + str(len(hla_names)) + " allele names") except OSError as err: self.logger.error("OS error: {0}".format(err)) except: self.logger.error("Unexpected error:", sys.exc_info()[0]) raise self._hla_names = hla_names #if self.verbose: # mem = "{:4.4f}".format(sys.getsizeof(self.all_feats) / 1000000) # self.logger.info(self.logname + "Finished loading all features * all_feats = " + mem + " MB *") feature_lengths = {} columns = ['mean', 'std', 'min', 'max'] featurelength_file = '' if kir: featurelength_file = data_dir + "/../data/kir-feature_lengths.csv" else: featurelength_file = data_dir + "/../data/feature_lengths.csv" if featuredata: self._feature_lengths = featuredata else: # TODO: use pandas try: columns = ['mean', 'std', 'min', 'max'] with open(featurelength_file, newline='') as csvfile: reader = csv.DictReader(csvfile) for row in reader: ldata = [row[c] for c in columns] if row['locus'] in feature_lengths: feature_lengths[row['locus']].update({row['feature']: ldata}) else: feature_lengths.update({row['locus']: {row['feature']: ldata}}) csvfile.close() except OSError as err: self.logger.error("OS error: {0}".format(err)) except: self.logger.error("Unexpected error:", sys.exc_info()[0]) raise self._feature_lengths = feature_lengths self._blastdb = blastdb self._hla_loci = hla_loci self._structures = get_structures() self._struct_order = get_structorder() self._structure_max = {'KIR2DP1': 20, 'KIR2DL5A': 20, 'KIR2DS4': 20, 'HLA-DPA1': 9, 'HLA-DQA1': 9, 'KIR2DL2': 20, 'HLA-DPB1': 11, 'KIR2DS2': 20, 'KIR3DP1': 20, 'HLA-DRB4': 13, 'KIR2DL1': 20, 'KIR2DS5': 20, 'HLA-DRB3': 13, 'KIR2DS3': 20, 'KIR3DL1': 20, 'HLA-A': 17, 'HLA-DRB5': 13, 'KIR2DL4': 20, 'HLA-DQB1': 13, 'KIR3DL2': 20, 'HLA-B': 15, 'KIR3DS1': 20, 'KIR2DL5B': 20, 'HLA-DRB1': 13, 'KIR3DL3': 20, 'KIR2DS1': 20, 'HLA-C': 17, 'HLA-DRA': 9} # Starting location of sequence for IPD-IMGT/HLA alignments # ** DRA location is not right ** self.location = {"HLA-A": -300, "HLA-B": -284, "HLA-C": -283, "HLA-DRB1": -599, "HLA-DRB3": -327, "HLA-DRB4": -313, "HLA-DQB1": -525, "HLA-DPB1": -366, "HLA-DPA1": -523, "HLA-DQA1": -746, "HLA-DRA": -400} self.align_coordinates = {} self.annoated_alignments = {} if alignments: pickle_dir = data_dir + '/../data/alignments/' + dbversion pickle_files = glob.glob(pickle_dir + '/*.pickle') for pickle_file in pickle_files: locus = pickle_file.split("/")[len(pickle_file.split("/"))-1].split(".")[0].split("_")[0] if self.verbose: self.logger.info("Loading " + pickle_file) with open(pickle_file, 'rb') as handle: self.annoated_alignments.update({locus: pickle.load(handle)}) handle.close() allele = list(self.annoated_alignments[locus].keys())[0] if not locus in self.align_coordinates and "HLA-" + locus in self.struct_order: start = 0 feat_order = list(self.struct_order["HLA-" + locus].keys()) feat_order.sort() self.align_coordinates.update({locus: {}}) if self.verbose and self.verbosity > 2: self.logger.info("* Alignment coordinates *") for i in feat_order: feat = self.struct_order["HLA-" + locus][i] seq = self.annoated_alignments[locus][allele][feat]['Seq'] end = start + len(seq) if self.verbose and self.verbosity > 2: self.logger.info(feat + " start = " + str(start) + " | end = " + str(end)) for j in range(start, end): self.align_coordinates[locus].update({j: feat}) start = end # If no server is provided # download the dat file if seqdata and hladata: self._hlaref = hladata self._seqref = seqdata elif not self._server_avail: if kir: datfile = data_dir + '/../data/KIR.dat' else: datfile = data_dir + '/../data/' + dbversion + '.hla.dat' if not os.path.isfile(datfile) and not kir: if self.verbose: self.logger.info("Downloding KIR data file - " + datfile) download_dat(hla_url, datfile) elif not os.path.isfile(datfile) and kir: if self.verbose: self.logger.info("Downloding HLA data file - " + datfile) download_dat(kir_url, datfile) # Load HLA dat file seqref_pickle = data_dir \ + '/../data/seqref.' + dbversion + ".pickle" hlaref_pickle = data_dir \ + '/../data/hlaref.' + dbversion + ".pickle" if not os.path.isfile(seqref_pickle) or \ not os.path.isfile(hlaref_pickle): hladata = SeqIO.parse(datfile, "imgt") for seqrec in hladata: seqname = seqrec.description.split(",")[0] locus = seqname.split("*")[0] if locus in self.structure_max: self._hlaref.update({seqname: seqrec}) self._seqref.update({str(seqrec.seq): seqname}) if self.verbose: self.logger.info("Finished loading dat file") self.logger.info("Writing pickle of dat file") with open(seqref_pickle, 'wb') as handle: pickle.dump(self._seqref, handle, protocol=pickle.HIGHEST_PROTOCOL) handle.close() with open(hlaref_pickle, 'wb') as handle: pickle.dump(self._hlaref, handle, protocol=pickle.HIGHEST_PROTOCOL) handle.close() else: if self.verbose: self.logger.info("Loading pickle dat file") with open(seqref_pickle, 'rb') as handle: self._seqref = pickle.load(handle) handle.close() with open(hlaref_pickle, 'rb') as handle: self._hlaref = pickle.load(handle) handle.close()
[docs] @classmethod def from_dict(cls, dikt) -> 'ReferenceData': """ Returns the dict as a model :param dikt: A dict. :type: dict :return: The ReferenceData of this ReferenceData. :rtype: ReferenceData """ return deserialize_model(dikt, cls)
@property def server(self) -> BioSeqDatabase: """ Gets the server of this ReferenceData. :return: The server of this ReferenceData. :rtype: BioSeqDatabase """ return self._server @server.setter def server(self, server: BioSeqDatabase): """ Sets the server of this ReferenceData. :param server: The server of this ReferenceData. :type server: BioSeqDatabase """ self._server = server @property def verbose(self) -> bool: """ Gets the server of this ReferenceData. :return: The server of this ReferenceData. :rtype: BioSeqDatabase """ return self._verbose @verbose.setter def verbose(self, verbose: bool): """ Sets the verbose of this bool. :param verbose: The server of this ReferenceData. :type verbose: bool """ self._verbose = verbose @property def verbosity(self) -> int: """ Gets the server of this ReferenceData. :return: The server of this ReferenceData. :rtype: BioSeqDatabase """ return self._verbosity @verbosity.setter def verbosity(self, verbosity: int): """ Sets the verbosity of this bool. :param verbosity: The server of this ReferenceData. :type verbosity: bool """ self._verbosity = verbosity @property def alignments(self) -> bool: """ Gets the alignments of this ReferenceData. :return: The alignments of this ReferenceData. :rtype: BioSeqDatabase """ return self._alignments @alignments.setter def alignments(self, alignments: bool): """ Sets the alignments of this bool. :param alignments: The server of this ReferenceData. :type alignments: bool """ self._alignments = alignments @property def datafile(self) -> str: """ Gets the datafile of this ReferenceData. :return: The datafile of this ReferenceData. :rtype: str """ return self._datafile @datafile.setter def datafile(self, datafile: str): """ Sets the datafile of this ReferenceData. :param datafile: The datafile of this ReferenceData. :type datafile: str """ self._datafile = datafile @property def dbversion(self) -> str: """ Gets the dbversion of this ReferenceData. :return: The dbversion of this ReferenceData. :rtype: str """ return self._dbversion @dbversion.setter def dbversion(self, dbversion: str): """ Sets the dbversion of this ReferenceData. :param dbversion: The dbversion of this ReferenceData. :type dbversion: str """ self._dbversion = dbversion @property def structures(self) -> Dict: """ Gets the structures of this ReferenceData. :return: The structures of this ReferenceData. :rtype: Dict """ return self._structures @property def structure_max(self) -> Dict: """ Gets the structure_max of this ReferenceData. :return: The structure_max of this ReferenceData. :rtype: Dict """ return self._structure_max @property def hlaref(self) -> Dict: """ Gets the hlaref of this ReferenceData. :return: The hlaref of this ReferenceData. :rtype: Dict """ return self._hlaref @hlaref.setter def hlaref(self, hlaref: Dict): # Sets the seqref of this ReferenceData. # :param seqref: The seqref of this ReferenceData. # :type seqref: str self._hlaref = hlaref @property def seqref(self) -> Dict: """ Gets the seqref of this ReferenceData. :return: The seqref of this ReferenceData. :rtype: Dict """ return self._seqref @seqref.setter def seqref(self, seqref: Dict): # Sets the seqref of this ReferenceData. # :param seqref: The seqref of this ReferenceData. # :type seqref: str self._seqref = seqref @property def blastdb(self) -> str: """ Gets the blastdb of this ReferenceData. :return: The blastdb of this ReferenceData. :rtype: str """ return self._blastdb @property def struct_order(self) -> Dict: """ Gets the struct_order of this ReferenceData. :return: The struct_order of this ReferenceData. :rtype: Dict """ return self._struct_order @property def feature_lengths(self) -> Dict: """ Gets the feature_lengths of this ReferenceData. :return: The feature_lengths of this ReferenceData. :rtype: Dict """ return self._feature_lengths @property def hla_names(self) -> List[str]: """ Gets the hla_names of this ReferenceData. :return: The hla_names of this ReferenceData. :rtype: Dict """ return self._hla_names @property def kir(self) -> bool: """ Gets the kir of this ReferenceData. :return: The kir of this ReferenceData. :rtype: bool """ return self._kir @property def hla_loci(self) -> List[str]: """ Gets the hla_loci of this ReferenceData. :return: The hla_loci of this ReferenceData. :rtype: List[str] """ return self._hla_loci @property def server_avail(self) -> bool: """ Gets the server_avail of this ReferenceData. :return: The server_avail of this ReferenceData. :rtype: bool """ return self._server_avail
[docs] def search_refdata(self, seq, locus): """ This checks to see if a sequence already exists in the reference data. If it does, then it'll return the known annotation. :return: The Annotation of associated with the input sequence :rtype: :ref:`ann` Example: >>> from Bio.Seq import Seq >>> from seqann.models.reference_data import ReferenceData >>> sequence = Seq('AGAGACTCTCCCGAGGATTTCGTGTACCAGTTTAAGGCCATGTGCTACTTCACC') >>> refdata = ReferenceData() >>> matched_annotation = refdata.search_refdata(sequence, locus) """ # TODO: ONLY MAKE ONE CONNECTION # TODO: add try statement # TODO: take password from environment variable if self.server_avail: hla, loc = locus.split('-') p1 = "SELECT ent.name " p2 = "FROM bioentry ent,biosequence seq,biodatabase dbb " p3 = "WHERE dbb.biodatabase_id = ent.biodatabase_id AND seq.bioentry_id = ent.bioentry_id " p4 = " AND dbb.name = \"" + self.dbversion + "_" + loc + "\"" p5 = " AND seq.seq = \"" + str(seq.seq) + "\"" select_stm = p1 + p2 + p3 + p4 + p5 # TODO: add try statement conn = pymysql.connect(host=biosqlhost, port=biosqlport, user=biosqluser, passwd=biosqlpass, db=biosqldb) cur = conn.cursor() cur.execute(select_stm) typ = '' for row in cur: typ = row[0] cur.close() conn.close() if typ: if self.verbose: self.logger.info("Exact typing found in BioSQL database") seqrecord = self.seqrecord(typ, loc) return self.seqannotation(seqrecord, typ, loc) else: return else: if str(seq.seq) in self.seqref: if self.verbose: self.logger.info("Exact typing found in dat file") seqrec = self.hlaref[self.seqref[str(seq.seq)]] return self.seqannotation(seqrec, self.seqref[str(seq.seq)], locus) else: return
# def refseqs(self, locus, n): # hla, loc = locus.split('-') # if self.server_avail: # select_stm = "SELECT ent.name " + \ # "FROM bioentry ent,biosequence seq,biodatabase dbb " + \ # "WHERE dbb.biodatabase_id = ent.biodatabase_id AND " + \ # "seq.bioentry_id = ent.bioentry_id " + \ # "AND dbb.name = \"" + self.dbversion + "_" + loc + "\" " + \ # "LIMIT " + n # # TODO: add try statement # conn = pymysql.connect(host=biosqlhost, port=biosqlport, # user=biosqluser, passwd=biosqlpass, # db=biosqldb) # cur = conn.cursor() # cur.execute(select_stm) # typing = [] # for row in cur: # typing.append(self.seqrecord(row[0], loc)) # cur.close() # conn.close() # if typing: # if self.verbose: # self.logger.info("Exact typing found in BioSQL database") # return typing # else: # return # else: # typings = [a for a in self.imgtdat # if a.description.split(",")[0].split("*")[0] == locus][0:n] # return typings
[docs] def seqrecord(self, allele, locus): """ Gets the Annotation from the found sequence :return: The Annotation from the found sequence :rtype: Annotation """ try: db = self.server[self.dbversion + "_" + locus] except: self.logger.error("The database " + self.dbversion + "_" + locus + " does not exist!") return '' seqrecord = db.lookup(name=allele) return seqrecord
[docs] def seqannotation(self, seqrecord, allele, loc): """ Gets the Annotation from the found sequence :return: The Annotation from the found sequence :rtype: Annotation """ #seqrecord = self.seqrecord(allele, loc) complete_annotation = get_features(seqrecord) annotation = Annotation(annotation=complete_annotation, method='match', complete_annotation=True) if self.alignments: alignment = {f: self.annoated_alignments[loc][allele][f]['Seq'] for f in self.annoated_alignments[loc][allele].keys()} annotation.aligned = alignment return annotation