diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml new file mode 100644 index 0000000..50c5c62 --- /dev/null +++ b/.github/workflows/build-and-test.yml @@ -0,0 +1,27 @@ +name: Build & Test + +on: + push: + schedule: + - cron: '0 3 * * 1' + + +jobs: + build-linux: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: 3.8 + - name: Install dependencies + run: | + pip install -r requirements.txt + - name: Install ANARCI + run: | + python setup.py install + - name: Test with pytest + run: | + pip install pytest + pytest ./test diff --git a/bin/ANARCI b/bin/ANARCI index 18c0e07..2094941 100755 --- a/bin/ANARCI +++ b/bin/ANARCI @@ -147,16 +147,8 @@ if __name__ == "__main__": # Check that hmmscan can be found in the path if args.hmmerpath: - hmmerpath=args.hmmerpath - scan_path = os.path.join( hmmerpath, "hmmscan" ) - if not ( os.path.exists( scan_path ) and os.access(scan_path, os.X_OK)): - print("Error: No hmmscan executable file found in directory: %s"%(hmmerpath), file=sys.stderr) - sys.exit(1) - elif not which("hmmscan"): - print("Error: hmmscan was not found in the path. Either install and add to path or provide path with commandline option.", file=sys.stderr) - sys.exit(1) - - + print('Note: ANARCI is using pyhmmer, ignoring --hmmerpath argument') + # Check if there should be some restriction as to which chain types should be numbered. # If it is not the imgt scheme they want then restrict to only igs (otherwise you'll hit assertion errors) types_to_chains = {"ig":["H","K","L"], "tr":["A", "B","G","D"], "heavy":["H"], "light":["K","L"] } diff --git a/build_pipeline/BuildHMM.py b/build_pipeline/BuildHMM.py new file mode 100644 index 0000000..e8783cd --- /dev/null +++ b/build_pipeline/BuildHMM.py @@ -0,0 +1,31 @@ +""" +Build All.HMM from stockholm aligned file using PyHMMER +""" + +import sys +import pyhmmer + + +def build(out_path, in_path): + alphabet = pyhmmer.easel.Alphabet.amino() + print('Loading MSA from {}'.format(in_path)) + with pyhmmer.easel.MSAFile(in_path, digital=True, alphabet=alphabet) as msa_file: + msas = list(msa_file) + + builder = pyhmmer.plan7.Builder(alphabet, architecture='hand') + background = pyhmmer.plan7.Background(alphabet) + + print('Building HMMs from {} MSAs'.format(len(msas))) + hmms = [] + with open(out_path, "wb") as output_file: + for msa in msas: + hmm, _, _ = builder.build_msa(msa, background) + hmm.write(output_file) + hmms.append(hmm) + print('Pressing HMMs') + pyhmmer.hmmpress(hmms, out_path) + print('Saved HMMs to {}'.format(out_path)) + + +if __name__ == '__main__': + build(sys.argv[1], sys.argv[2]) \ No newline at end of file diff --git a/build_pipeline/RUN_pipeline.sh b/build_pipeline/RUN_pipeline.sh index cc36a47..f556a52 100755 --- a/build_pipeline/RUN_pipeline.sh +++ b/build_pipeline/RUN_pipeline.sh @@ -15,10 +15,4 @@ python3 $DIR/FormatAlignments.py # Build the hmms for each species and chain. # --hand option required otherwise it will delete columns that are mainly gaps. We want 128 columns otherwise ARNACI will fall over. mkdir -p $DIR/HMMs -hmmbuild --hand $DIR/HMMs/ALL.hmm $DIR/curated_alignments/ALL.stockholm -#hmmbuild --hand $DIR/HMMs/ALL_AND_C.hmm $DIR/curated_alignments/ALL_AND_C.stockholm - -# Turn the output HMMs file into a binary form. This is required for hmmscan that is used in ARNACI. -hmmpress -f $DIR/HMMs/ALL.hmm -#hmmpress -f $DIR/HMMs/ALL_AND_C.hmm - +python3 $DIR/BuildHMM.py $DIR/HMMs/ALL.hmm $DIR/curated_alignments/ALL.stockholm diff --git a/lib/python/anarci/anarci.py b/lib/python/anarci/anarci.py index 640529c..c330f7e 100644 --- a/lib/python/anarci/anarci.py +++ b/lib/python/anarci/anarci.py @@ -52,16 +52,15 @@ import os import sys -import tempfile import gzip import math +import traceback from functools import partial from textwrap import wrap -from subprocess import Popen, PIPE from itertools import groupby, islice from multiprocessing import Pool -from Bio.SearchIO.HmmerIO import Hmmer3TextParser as HMMERParser +import pyhmmer # Import from the schemes submodule from .schemes import * @@ -305,18 +304,18 @@ def _domains_are_same(dom1, dom2): @return: True or False """ - dom1, dom2 = sorted( [dom1, dom2], key=lambda x: x.query_start ) - if dom2.query_start >= dom1.query_end: + dom1, dom2 = sorted( [dom1, dom2], key=lambda dom: dom.alignment.target_from ) + if dom2.alignment.target_from-1 >= dom1.alignment.target_to: return False return True -def _parse_hmmer_query(query, bit_score_threshold=80, hmmer_species=None): +def _parse_hmmer_query(hsps, seq_len, hmmer_species=None): """ - @param query: hmmer query object from Biopython - @param bit_score_threshold: the threshold for which to consider a hit a hit. - + @param hsps: List of Domain objects from pyhmmer (corresponding to BioPython hsps) + @param seq_len: sequence length + The function will identify multiple domains if they have been found and provide the details for the best alignment for each domain. This allows the ability to identify single chain fvs and engineered antibody sequences as well as the capability in the future for identifying constant domains. @@ -328,48 +327,46 @@ def _parse_hmmer_query(query, bit_score_threshold=80, hmmer_species=None): top_descriptions, domains,state_vectors = [], [], [] - if query.hsps: # We have some hits + if hsps: # We have some hits # If we have specified a species, check to see we have hits for that species # Otherwise revert back to using any species if hmmer_species: - #hit_correct_species = [hsp for hsp in query.hsps if hsp.hit_id.startswith(hmmer_species) and hsp.bitscore >= bit_score_threshold] hit_correct_species = [] - for hsp in query.hsps: - if hsp.bitscore >= bit_score_threshold: - for species in hmmer_species: - if hsp.hit_id.startswith(species): - hit_correct_species.append(hsp) + for hsp in hsps: + for species in hmmer_species: + hit_id = hsp.alignment.hmm_name.decode() + if hit_id.startswith(species): + hit_correct_species.append(hsp) if hit_correct_species: hsp_list = hit_correct_species else: print("Limiting hmmer search to species %s was requested but hits did not achieve a high enough bitscore. Reverting to using any species" %(hmmer_species)) - hsp_list = query.hsps + hsp_list = hsps else: - hsp_list = query.hsps + hsp_list = hsps - for hsp in sorted(hsp_list, key=lambda x: x.evalue): # Iterate over the matches of the domains in order of their e-value (most significant first) + for hsp in sorted(hsp_list, key=lambda x: x.i_evalue): # Iterate over the matches of the domains in order of their e-value (most significant first) new=True - if hsp.bitscore >= bit_score_threshold: # Only look at those with hits that are over the threshold bit-score. - for i in range( len(domains) ): # Check to see if we already have seen the domain - if _domains_are_same( domains[i], hsp ): - new = False - break - hit_table.append( [ hsp.hit_id, hsp.hit_description, hsp.evalue, hsp.bitscore, hsp.bias, hsp.query_start, hsp.query_end] ) - if new: # It is a new domain and this is the best hit. Add it for further processing. - domains.append( hsp ) - top_descriptions.append( dict( list(zip(hit_table[0], hit_table[-1])) ) ) # Add the last added to the descriptions list. + for i in range( len(domains) ): # Check to see if we already have seen the domain + if _domains_are_same( domains[i], hsp ): + new = False + break + ali = hsp.alignment + hit_table.append( [ ali.hmm_name.decode(), ali.hmm_accession.decode(), hsp.i_evalue, hsp.score, hsp.bias, ali.target_from-1, ali.target_to] ) + if new: # It is a new domain and this is the best hit. Add it for further processing. + domains.append( hsp ) + top_descriptions.append( dict( list(zip(hit_table[0], hit_table[-1])) ) ) # Add the last added to the descriptions list. # Reorder the domains according to the order they appear in the sequence. - ordering = sorted( list(range(len(domains))), key=lambda x: domains[x].query_start) + ordering = sorted( list(range(len(domains))), key=lambda x: domains[x].alignment.target_from) domains = [ domains[_] for _ in ordering ] top_descriptions = [ top_descriptions[_] for _ in ordering ] ndomains = len( domains ) for i in range(ndomains): # If any significant hits were identified parse and align them to the reference state. - domains[i].order = i species, chain = top_descriptions[i]["id"].split("_") - state_vectors.append( _hmm_alignment_to_states(domains[i], ndomains, query.seq_len) ) # Alignment to the reference states. + state_vectors.append( _hmm_alignment_to_states(domains[i], ndomains, seq_len, i) ) # Alignment to the reference states. top_descriptions[i][ "species"] = species # Reparse top_descriptions[i][ "chain_type"] = chain top_descriptions[i][ "query_start"] = state_vectors[-1][0][-1] # Make sure the query_start agree if it was changed @@ -377,40 +374,38 @@ def _parse_hmmer_query(query, bit_score_threshold=80, hmmer_species=None): return hit_table, state_vectors, top_descriptions -def _hmm_alignment_to_states(hsp, n, seq_length): +def _hmm_alignment_to_states(hsp, n, seq_length, order): """ Take a hit hsp and turn the alignment into a state vector with sequence indices """ - - # Extract the strings for the reference states and the posterior probability strings - reference_string = hsp.aln_annotation["RF"] - state_string = hsp.aln_annotation["PP"] - - assert len(reference_string) == len(state_string), "Aligned reference and state strings had different lengths. Don't know how to handle" + ali = hsp.alignment # Extract the start an end points of the hmm states and the sequence # These are python indices i.e list[ start:end ] and therefore start will be one less than in the text file - _hmm_start = hsp.hit_start - _hmm_end = hsp.hit_end + _hmm_start = ali.hmm_from - 1 + _hmm_end = ali.hmm_to - _seq_start = hsp.query_start - _seq_end = hsp.query_end + _seq_start = ali.target_from - 1 + _seq_end = ali.target_to - # Extact the full length of the HMM hit - species, ctype = hsp.hit_id.split('_') + # Extract the full length of the HMM hit + species, ctype = ali.hmm_name.decode().split('_') _hmm_length = get_hmm_length( species, ctype ) + hmm_seq = ali.hmm_sequence.upper() + target_seq = ali.target_sequence + # Handle cases where there are n terminal modifications. # In most cases the user is going to want these included in the numbered domain even though they are not 'antibody like' and # not matched to the germline. Only allow up to a maximum of 5 unmatched states at the start of the domain # Adds a bug here if there is a very short linker between a scfv domains with a modified n-term second domain # Thus this is only done for the first identified domain ( hence order attribute on hsp ) - if hsp.order == 0 and _hmm_start and _hmm_start < 5: + if order == 0 and _hmm_start and _hmm_start < 5: n_extend = _hmm_start if _hmm_start > _seq_start: n_extend = min( _seq_start , _hmm_start - _seq_start ) - state_string = '8'*n_extend + state_string - reference_string = 'x'*n_extend + reference_string + hmm_seq = 'X'*n_extend + hmm_seq + target_seq = 'X'*n_extend + target_seq _seq_start = _seq_start - n_extend _hmm_start = _hmm_start - n_extend @@ -419,120 +414,101 @@ def _hmm_alignment_to_states(hsp, n, seq_length): # Extension is only made when half of framework 4 has been recognised and there is only one domain recognised. if n==1 and _seq_end < seq_length and (123 < _hmm_end < _hmm_length): # Extend forwards n_extend = min( _hmm_length - _hmm_end, seq_length - _seq_end ) - state_string = state_string + '8'*n_extend - reference_string = reference_string + 'x'*n_extend + hmm_seq = hmm_seq + 'X'*n_extend + target_seq = target_seq + 'X'*n_extend _seq_end = _seq_end + n_extend _hmm_end = _hmm_end + n_extend - + assert len(hmm_seq) == len(target_seq), \ + 'Unexpected mismatch in aligned HMM sequence, {} != {}'.format(len(hmm_seq), len(target_seq)) - # Generate lists for the states and the sequence indices that are included in this alignment - hmm_states = all_reference_states[ _hmm_start : _hmm_end ] - sequence_indices = list(range(_seq_start, _seq_end)) - h, s = 0, 0 # initialise the current index in the hmm and the sequence - + # Generate state vector from aligned HMM sequence and input sequence + # Each item in the list corresponds to: (hmm_state, state_type), sequence_index + # hmm_state starts from 1, sequence_index starts from 0 + # For example: (111, 'm'), 110 state_vector = [] - # iterate over the state string (or the reference string) - for i in range( len(state_string) ): - if reference_string[i] == "x": # match state - state_type = "m" - else: # insert state - state_type = "i" - - if state_string[i] == ".": # overloading if deleted relative to reference. delete_state - state_type = "d" - sequence_index = None - else: - sequence_index = sequence_indices[s] - # Store the alignment as the state identifier (uncorrected IMGT annotation) and the index of the sequence - - state_vector.append( ((hmm_states[h], state_type), sequence_index ) ) - - # Updates to the indices - if state_type == "m": - h+=1 - s+=1 - elif state_type == "i": - s+=1 - else: # delete state - h+=1 - - return state_vector - - -def parse_hmmer_output(filedescriptor="", bit_score_threshold=80, hmmer_species=None): - """ - Parse the output of HMMscan and return top alignment and the score table for each input sequence. - """ - results = [] - if type(filedescriptor) is str: - openfile = open - elif type(filedescriptor) is int: - openfile = os.fdopen - - with openfile(filedescriptor) as inputfile: - p = HMMERParser( inputfile ) - for query in p: - results.append(_parse_hmmer_query(query,bit_score_threshold=bit_score_threshold,hmmer_species=hmmer_species )) + hmm_state = _hmm_start + 1 # IMGT HMM states go from 1 to 128 + sequence_index = _seq_start + for hmm_aa, target_aa in zip(hmm_seq, target_seq): + state_type = 'i' if hmm_aa == '.' else ('d' if target_aa == '-' else 'm') + state_vector.append(((hmm_state, state_type), sequence_index if state_type != 'd' else None)) + if state_type == 'm' or state_type == 'i': + sequence_index += 1 + if state_type == 'm' or state_type == 'd': + hmm_state += 1 + + assert hmm_state == _hmm_end + 1, \ + 'Unexpected mismatch in HMM end {} != {}'.format(hmm_state, _hmm_end + 1) + assert sequence_index == _seq_end, \ + 'Unexpected mismatch in input end {} != {}'.format(sequence_index, _seq_end) - return results + return state_vector -def run_hmmer(sequence_list,hmm_database="ALL",hmmerpath="", ncpu=None, bit_score_threshold=80, hmmer_species=None): +def run_pyhmmer(sequence_list, hmm_database="ALL", ncpu=None, bit_score_threshold=80, hmmer_species=None): """ Run the sequences in sequence list against a precompiled hmm_database. Those sequence that have a significant hit with a bit score over a threshold will - be recognised and an alignment given. The alignment will be used to number the + be recognised and an alignment given. The alignment will be used to number the sequence. @param sequence_list: a list of (name, sequence) tuples. Both are strings @param hmm_database: The hmm database to use. Currently, all hmms are in the ALL database. The code to develop new models is in build_pipeline in the git repo. - @param hmmerpath: The path to hmmer binaries if not in the path @param ncpu: The number of cpu's to allow hmmer to use. """ + hmm_full_path = os.path.join(HMM_path, '{}.hmm'.format(hmm_database)) + if not os.path.exists(hmm_full_path): + print("ANARCI HMM database not found:", hmm_full_path, file=sys.stderr) + print("This indicates an error during installation. Please reinstall ANARCI.") + sys.exit(1) + + with pyhmmer.plan7.HMMFile(hmm_full_path) as hmm_file: + hmms = list(hmm_file) + + results = [] + hsps_per_sequence = _find_pyhmmer_hsps(sequence_list, hmms, ncpu=ncpu, bit_score_threshold=bit_score_threshold) + assert len(sequence_list) == len(hsps_per_sequence), \ + 'Unexpected mismatch in returned sequence hits: {} != {}'.format(len(sequence_list), len(hsps_per_sequence)) + + for (sequence_id, sequence), hsps in zip(sequence_list, hsps_per_sequence): + results.append(_parse_hmmer_query( + hsps=hsps, + seq_len=len(sequence), + hmmer_species=hmmer_species + )) + return results - # Check that hmm_database is available - - assert hmm_database in ["ALL"], "Unknown HMM database %s"%hmm_database - HMM = os.path.join( HMM_path, "%s.hmm"%hmm_database ) +def _find_pyhmmer_hsps(sequence_list, hmms, ncpu=None, bit_score_threshold=80): + alphabet = hmms[0].alphabet + Z = len(hmms) # database size, used to calculate correct e-value - # Create a fasta file for all the sequences. Label them with their sequence index - # This will go to a temp file - fasta_filehandle, fasta_filename = tempfile.mkstemp( ".fasta", text=True ) - with os.fdopen(fasta_filehandle,'w') as outfile: - write_fasta(sequence_list, outfile) + sequences = [pyhmmer.easel.TextSequence(sequence=seq, accession=str(i).encode()).digitize(alphabet) + for i, (id, seq) in enumerate(sequence_list)] - output_filehandle, output_filename = tempfile.mkstemp( ".txt", text=True ) + # We are using hmmsearch because hmmscan is not available in pyhmmer + # It works the same except it returns a list of hits for each HMM model in the database + # We are using the Z database size parameter to make sure that the evalue corresponds to hmmscan + hits_per_hmm = pyhmmer.hmmsearch( + hmms, + sequences, + cpus=0 if ncpu is None else ncpu, + Z=Z + ) + + hsps_batch = [[] for i in range(len(sequence_list))] + + # Distribute hits by sequence and filter out hits with low bitscore + for hmm_hits in hits_per_hmm: + for hit in hmm_hits: + for hsp in hit.domains: + if hsp.score >= bit_score_threshold: + hsps_batch[int(hit.accession.decode())].append(hsp) + + return hsps_batch - # Run hmmer as a subprocess - if hmmerpath: - hmmscan = os.path.join(hmmerpath,"hmmscan") - else: - hmmscan = "hmmscan" - try: - if ncpu is None: - command = [ hmmscan, "-o", output_filename, HMM, fasta_filename] - else: - command = [ hmmscan, "-o", output_filename, "--cpu", str(ncpu), HMM, fasta_filename] - process = Popen( command, stdout=PIPE, stderr=PIPE ) - _, pr_stderr = process.communicate() - - if pr_stderr: - _f = os.fdopen(output_filehandle) # This is to remove the filedescriptor from the os. I have had problems with it before. - _f.close() - - raise HMMscanError(pr_stderr) - results = parse_hmmer_output(output_filehandle, bit_score_threshold=bit_score_threshold, hmmer_species=hmmer_species) - - finally: - # clear up - os.remove(fasta_filename) - os.remove(output_filename) - - return results def get_hmm_length( species, ctype ): ''' @@ -629,6 +605,7 @@ def number_sequences_from_alignment(sequences, alignments, scheme="imgt", allow= print(str(e), file=sys.stderr) raise e # Validation went wrong. Error message will go to stderr. Want this to be fatal during development. except Exception as e: + traceback.print_exc() print("Error: Something really went wrong that has not been handled", file=sys.stderr) print(str(e), file=sys.stderr) raise e @@ -732,7 +709,7 @@ def check_for_j( sequences, alignments, scheme ): cys_ai = ali.index( ((104, 'm'), cys_si) ) # Try to identify a J region in the remaining sequence after the 104. A low bit score threshold is used. - _, re_states, re_details = run_hmmer( [(sequences[i][0], sequences[i][1][cys_si+1:])], + _, re_states, re_details = run_pyhmmer( [(sequences[i][0], sequences[i][1][cys_si+1:])], bit_score_threshold=10 )[0] # Check if a J region was detected in the remaining sequence. @@ -764,7 +741,7 @@ def check_for_j( sequences, alignments, scheme ): # Main function for ANARCI # Name conflict with function, module and package is kept for legacy unless issues are reported in future. def anarci(sequences, scheme="imgt", database="ALL", output=False, outfile=None, csv=False, allow=set(["H","K","L","A","B","G","D"]), - hmmerpath="", ncpu=None, assign_germline=False, allowed_species=['human','mouse'], bit_score_threshold=80): + ncpu=None, assign_germline=False, allowed_species=['human','mouse'], bit_score_threshold=80): """ The main function for anarci. Identify antibody and TCR domains, number them and annotate their germline and species. @@ -794,8 +771,7 @@ def anarci(sequences, scheme="imgt", database="ALL", output=False, outfile=None, @param bit_score_threshold: The threshold score from HMMER at which an alignment should be numbered. Lowering the threshold means domain recognition is more permissive and can be useful for numbering heavily engineered molecules. However, too low and false positive recognition of other ig-like molecules will occur. - @param hmmerpath: The path to hmmscan. If left unspecified then the PATH will be searched. - @param ncpu: The number of cpu's that hmmer should be allowed to use. If not specified then the hmmscan + @param ncpu: The number of cpu's that hmmer should be allowed to use. If not specified then the hmmscan default is used. N.B. hmmscan must be compiled with multithreading enabled for this option to have effect. Please consider using the run_anarci function for native multiprocessing with anarci. @param database: The HMMER database that should be used. Normally not changed unless a custom db is created. @@ -827,7 +803,7 @@ def anarci(sequences, scheme="imgt", database="ALL", output=False, outfile=None, # Perform the alignments of the sequences to the hmm database - alignments = run_hmmer(sequences,hmm_database=database,hmmerpath=hmmerpath,ncpu=ncpu,bit_score_threshold=bit_score_threshold,hmmer_species=allowed_species ) + alignments = run_pyhmmer(sequences,hmm_database=database,ncpu=ncpu,bit_score_threshold=bit_score_threshold,hmmer_species=allowed_species ) # Check the numbering for likely very long CDR3s that will have been missed by the first pass. # Modify alignments in-place @@ -841,7 +817,7 @@ def anarci(sequences, scheme="imgt", database="ALL", output=False, outfile=None, # Output if necessary if output: if csv: - csv_output(sequences, numbered, details, outfile) + csv_output(sequences, numbered, alignment_details, outfile) else: outto, close=sys.stdout, False if outfile: diff --git a/requirements.txt b/requirements.txt index e0116bd..3116bdf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1 @@ -biopython +pyhmmer diff --git a/test/test_hmmer.py b/test/test_hmmer.py new file mode 100644 index 0000000..86ed669 --- /dev/null +++ b/test/test_hmmer.py @@ -0,0 +1,45 @@ +import pytest +from anarci import run_pyhmmer + + +def test_run_pyhmmer(): + sequence = 'QVQLQQSGAELARPGASVKMSCKASGYTFTRYTMHWVKQRPGQGLEWIGYINPSRGYTNYNQKFKDKATLTTDKSSSTAYMQLSSLTSEDSAVYYCARYYDDHYELVISCLDYWGQGTTLTVSS' + hit_table, state_vectors, top_descriptions = run_pyhmmer( + [('id', sequence)], + hmm_database='ALL', + ncpu=None, + bit_score_threshold=80, + hmmer_species=['human', 'mouse'] + )[0] + + state_vector = state_vectors[0] + + hmm_states = [hmm_state for (hmm_state, state_type), sequence_index in state_vector] + state_types = ''.join(state_type for (hmm_state, state_type), sequence_index in state_vector) + sequence_reproduced = ''.join(sequence[sequence_index] if sequence_index is not None else '' for _, sequence_index in state_vector) + + assert sequence == sequence_reproduced + # QVQLQQSGA-ELARPGASVKMSCKASGYTF----TRYTMHWVKQRPGQGLEWIGYINPS--RGYTNYNQKFK-DKATLTTDKSSSTAYMQLSSLTSEDSAVYYCARYYDDHYELVISCLDYWGQGTTLTVSS + assert state_types == 'mmmmmmmmmdmmmmmmmmmmmmmmmmmmmmddddmmmmmmmmmmmmmmmmmmmmmmmmmddmmmmmmmmmmmdmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmmiiiimmmmmmmmmmmmmmmmmm' + assert hmm_states == list(range(1, 111)) + [111, 111, 111, 111, 111] + list(range(112, 129)) + + assert len(hit_table) == 3 + assert hit_table[0] == ['id', 'description', 'evalue', 'bitscore', 'bias', 'query_start', 'query_end'] + assert hit_table[1][0] == 'mouse_H' + assert hit_table[1][1] == '' + assert hit_table[1][2] == pytest.approx(1.6e-52) + assert hit_table[1][3] == pytest.approx(188.1, rel=1e-2) + assert hit_table[1][4] == pytest.approx(3.3, rel=1e-2) + assert hit_table[1][5] == 0 + assert hit_table[1][6] == 124 + + assert len(top_descriptions) == 1 + assert top_descriptions[0]['id'] == 'mouse_H' + assert top_descriptions[0]['description'] == '' + assert top_descriptions[0]['evalue'] == pytest.approx(1.6e-52) + assert top_descriptions[0]['bitscore'] == pytest.approx(188.1, rel=1e-2) + assert top_descriptions[0]['bias'] == pytest.approx(3.3, rel=1e-2) + assert top_descriptions[0]['query_start'] == 0 + assert top_descriptions[0]['query_end'] == 124 + assert top_descriptions[0]['chain_type'] == 'H' + assert top_descriptions[0]['species'] == 'mouse' \ No newline at end of file diff --git a/test/test_numbering.py b/test/test_numbering.py new file mode 100644 index 0000000..3885cdd --- /dev/null +++ b/test/test_numbering.py @@ -0,0 +1,43 @@ +from anarci import number + + +def test_imgt_heavy(): + sequence = "QVQLQQSGAELARPGASVKMSCKASGYTFTRYTMHWVKQRPGQGLEWIGYINPSRGYTNYNQKFKDKATLTTDKSSSTAYMQLSSLTSEDSAVYYCARYYDDHYELVISCLDYWGQGTTLTVSS" + numbering, chain_type = number(sequence, "imgt") + + assert chain_type == 'H' + + positions = [f'{pos}{letter}'.strip() for (pos, letter), aa in numbering] + assert positions == [str(i) for i in range(1, 111)] + ['111', '111A', '111B', '112B', '112A'] + [str(i) for i in range(112, 129)] + + aligned_sequence = ''.join(aa for (pos, letter), aa in numbering) + assert aligned_sequence == 'QVQLQQSGA-ELARPGASVKMSCKASGYTF----TRYTMHWVKQRPGQGLEWIGYINPS--RGYTNYNQKFK-DKATLTTDKSSSTAYMQLSSLTSEDSAVYYCARYYDDHYELVISCLDYWGQGTTLTVSS' + + reproduced_sequence = ''.join(aa for (pos, letter), aa in numbering if aa != '-') + assert reproduced_sequence == sequence + + +def test_imgt_light(): + sequence = "DVVMTQSPLSLPVTLGQPASISCRSSQSLVYSDGNTYLNWFQQRPGQSPRRLIYKVSNRDSGVPDRFSGSGSGTDFTLKISRVEAEDVGVYYCMQGTFGQGTKVEIK" + numbering, chain_type = number(sequence, "imgt") + + assert chain_type == 'L' # L for light + + positions = [f'{pos}{letter}'.strip() for (pos, letter), aa in numbering] + assert positions == [str(i) for i in range(1, 128)] + + aligned_sequence = ''.join(aa for (pos, letter), aa in numbering) + assert aligned_sequence == 'DVVMTQSPLSLPVTLGQPASISCRSSQSLVYS-DGNTYLNWFQQRPGQSPRRLIYKV-------SNRDSGVP-DRFSGSG--SGTDFTLKISRVEAEDVGVYYCMQ---------GTFGQGTKVEIK' + + reproduced_sequence = ''.join(aa for (pos, letter), aa in numbering if aa != '-') + assert reproduced_sequence == sequence + + +def test_imgt_heavy_incomplete(): + """Check that a few mismatching residues at N and C termini are treated as part of the aligned domain""" + sequence = "AAQLQQSGAELARPGASVKMSCKASGYTFTRYTMHWVKQRPGQGLEWIGYINPSRGYTNYNQKFKDKATLTTDKSSSTAYMQLSSLTSEDSAVYYCARYYDDHYELVISCLDYWGQGTTLTVAA" + numbering, chain_type = number(sequence, "imgt") + + aligned_sequence = ''.join(aa for (pos, letter), aa in numbering) + assert aligned_sequence == 'AAQLQQSGA-ELARPGASVKMSCKASGYTF----TRYTMHWVKQRPGQGLEWIGYINPS--RGYTNYNQKFK-DKATLTTDKSSSTAYMQLSSLTSEDSAVYYCARYYDDHYELVISCLDYWGQGTTLTVAA' +