2121:meth:`GeneFeature.to_gene_model()`. This function can be over-ridden to provide custom parsing implementations.
2222"""
2323import itertools
24- import logging
2524import pathlib
25+ import warnings
2626from abc import ABC
2727from collections import Counter
2828from copy import deepcopy
3131from Bio import SeqIO
3232from Bio .SeqFeature import SeqFeature
3333from Bio .SeqRecord import SeqRecord
34-
3534from inscripta .biocantor .gene import Biotype , CDSInterval , CDSFrame
35+ from inscripta .biocantor .io .exc import StrandViolationWarning
3636from inscripta .biocantor .io .features import extract_feature_types , extract_feature_name_id , merge_qualifiers
3737from inscripta .biocantor .io .genbank .constants import (
3838 GeneFeatures ,
4848 EmptyGenBankError ,
4949 GenBankLocusTagError ,
5050 GenBankLocationException ,
51+ GenBankNullStrandException ,
5152)
5253from inscripta .biocantor .io .models import (
5354 GeneIntervalModel ,
6364 EmptyLocation ,
6465)
6566
66- logger = logging .getLogger (__name__ )
67-
6867
6968class Feature (ABC ):
7069 """Generic feature."""
@@ -77,7 +76,7 @@ def __init__(self, feature: SeqFeature, record: SeqRecord):
7776 if feature .location is None :
7877 raise GenBankLocationException (f"Feature { feature } did not have parseable coordinates." )
7978 if not feature .strand :
80- raise GenBankParserError (f"Feature { feature } is unstranded or has multiple strands." )
79+ raise GenBankNullStrandException (f"Feature { feature } is unstranded or has multiple strands." )
8180 self .feature = feature
8281 self .record = record
8382 self .children = []
@@ -220,7 +219,7 @@ def from_transcript_or_cds_feature(feature: SeqFeature, seqrecord: SeqRecord) ->
220219
221220 Construct a GeneFeature from such records."""
222221 old_type = feature .type
223- feature .type = "gene"
222+ feature .type = GeneFeatures . GENE . value
224223 gene = GeneFeature (feature , seqrecord )
225224 feature .type = old_type
226225 gene .add_child (feature )
@@ -232,7 +231,6 @@ def add_child(self, feature: SeqFeature):
232231 self .children .append (TranscriptFeature (feature , self .record ))
233232 elif feature .type in IntervalFeature .types :
234233 # infer a transcript
235- logger .debug (f"Inferring a transcript for { feature } " )
236234 tx_feature = deepcopy (feature )
237235 if tx_feature .type == GeneIntervalFeatures .CDS .value :
238236 tx_feature .type = TranscriptFeatures .CODING_TRANSCRIPT .value
@@ -307,10 +305,10 @@ def to_gene_model(cls: "GeneFeature") -> Dict[str, Any]:
307305 cds_frames = cds_frames ,
308306 qualifiers = tx .merge_cds_qualifiers_to_transcript (),
309307 is_primary_tx = False ,
310- transcript_id = tx .get_qualifier_from_tx_or_cds_features ("transcript_id" ),
311- protein_id = tx .get_qualifier_from_tx_or_cds_features ("protein_id" ),
312- product = tx .get_qualifier_from_tx_or_cds_features ("product" ),
313- transcript_symbol = tx .get_qualifier_from_tx_or_cds_features ("gene" ),
308+ transcript_id = tx .get_qualifier_from_tx_or_cds_features (KnownQualifiers . TRANSCRIPT_ID . value ),
309+ protein_id = tx .get_qualifier_from_tx_or_cds_features (KnownQualifiers . PROTEIN_ID . value ),
310+ product = tx .get_qualifier_from_tx_or_cds_features (KnownQualifiers . PRODUCT . value ),
311+ transcript_symbol = tx .get_qualifier_from_tx_or_cds_features (KnownQualifiers . GENE . value ),
314312 transcript_type = transcript_biotype .name ,
315313 sequence_name = tx .record .id ,
316314 )
@@ -321,9 +319,9 @@ def to_gene_model(cls: "GeneFeature") -> Dict[str, Any]:
321319 gene = GeneIntervalModel .Schema ().load (
322320 dict (
323321 transcripts = transcripts ,
324- gene_id = cls .feature .qualifiers .get ("gene_id" , [None ])[0 ],
325- gene_symbol = cls .feature .qualifiers .get ("gene" , [None ])[0 ],
326- locus_tag = cls .feature .qualifiers .get ("locus_tag" , [None ])[0 ],
322+ gene_id = cls .feature .qualifiers .get (KnownQualifiers . GENE_ID . value , [None ])[0 ],
323+ gene_symbol = cls .feature .qualifiers .get (KnownQualifiers . GENE . value , [None ])[0 ],
324+ locus_tag = cls .feature .qualifiers .get (KnownQualifiers . LOCUS_TAG . value , [None ])[0 ],
327325 gene_type = gene_biotype .name ,
328326 qualifiers = cls .feature .qualifiers ,
329327 sequence_name = cls .record .id ,
@@ -358,7 +356,7 @@ def infer_exon_features(self):
358356 def construct_frames (self , cds_interval : Location ) -> List [str ]:
359357 """We need to build frames. Since GenBank lacks this info, do our best"""
360358 # make 0 based offset, if possible, otherwise assume always in frame
361- frame = int (self .children [0 ].feature .qualifiers .get ("codon_start" , [1 ])[0 ]) - 1
359+ frame = int (self .children [0 ].feature .qualifiers .get (KnownQualifiers . CODON_START . value , [1 ])[0 ]) - 1
362360 frame = CDSFrame .from_int (frame )
363361 frames = CDSInterval .construct_frames_from_location (cds_interval , frame )
364362 return [x .name for x in frames ]
@@ -396,7 +394,7 @@ def find_cds_interval(self, exon_interval: Location) -> Location:
396394 cds_i = SingleInterval (
397395 cds [0 ].feature .location .nofuzzy_start ,
398396 cds [- 1 ].feature .location .nofuzzy_end ,
399- Strand .from_int (self .feature . strand ),
397+ Strand .from_int (self .strand ),
400398 )
401399 return exon_interval .intersection (cds_i )
402400
@@ -425,6 +423,47 @@ def __str__(self):
425423 return f"----> { self .feature .__repr__ ()} "
426424
427425
426+ def _construct_gene_from_feature (
427+ feature : SeqFeature ,
428+ seqrecord : SeqRecord ,
429+ cls_or_fn : Callable [[SeqFeature , SeqRecord ], GeneFeature ],
430+ ) -> Optional [GeneFeature ]:
431+ """
432+ Convenience function for producing :class:`GeneFeature` from a `SeqFeature`, handling both
433+ possible constructor routes (construction from a ``gene`` feature as found in the GenBank,
434+ or inference from a transcript/interval level feature in case no ``gene`` level feature was found).
435+
436+ This wrapper function catches exceptions raised for common errors, and converts them to warnings
437+ as appropriate.
438+ """
439+ try :
440+ return cls_or_fn (feature , seqrecord )
441+ except GenBankNullStrandException :
442+ warnings .warn (
443+ StrandViolationWarning (f"Found multiple strands for feature { feature } . This feature will be skipped." )
444+ )
445+
446+
447+ def _construct_feature_collection_from_features (
448+ features : List [SeqFeature ],
449+ seqrecord : SeqRecord ,
450+ ) -> Optional [FeatureIntervalGenBankCollection ]:
451+ """
452+ Convenience function for producing :class:`FeatureIntervalGenBankCollection` from a `SeqFeature`.
453+
454+ This wrapper function catches exceptions raised for common errors, and converts them to warnings
455+ as appropriate.
456+ """
457+ try :
458+ return FeatureIntervalGenBankCollection (features , seqrecord )
459+ except GenBankNullStrandException :
460+ warnings .warn (
461+ StrandViolationWarning (
462+ f"Found multiple strands for feature group { features } . " f"This feature collection will be skipped."
463+ )
464+ )
465+
466+
428467def parse_genbank (
429468 genbank_handle_or_path : Union [TextIO , str , pathlib .Path ],
430469 parse_func : Optional [Callable [[GeneFeature ], Dict [str , Any ]]] = GeneFeature .to_gene_model ,
@@ -514,15 +553,20 @@ def group_gene_records_from_sorted_genbank(
514553 # base case for start; iterate until we find a gene
515554 elif gene is None :
516555 if feature .type in GeneFeature .types :
517- gene = GeneFeature (feature , seqrecord )
556+ gene = _construct_gene_from_feature (feature , seqrecord , GeneFeature )
557+ # gene is None if it was not parseable
558+ if not gene :
559+ continue
518560 # base case for starting with a isolated ncRNA or CDS feature; immediately add them
519561 # and reset the gene to None
520562 elif feature .type in TranscriptFeature .types or feature .type in IntervalFeature .types :
521- gene = GeneFeature .from_transcript_or_cds_feature (feature , seqrecord )
522- gene .finalize ()
523- gene = parse_func (gene )
524- genes .append (gene )
525- gene = None
563+ gene = _construct_gene_from_feature (feature , seqrecord , GeneFeature .from_transcript_or_cds_feature )
564+ # gene is None if it was not parseable
565+ if gene :
566+ gene .finalize ()
567+ gene = parse_func (gene )
568+ genes .append (gene )
569+ gene = None
526570 # this must be a generic feature
527571 else :
528572 feature_features .append (feature )
@@ -532,15 +576,20 @@ def group_gene_records_from_sorted_genbank(
532576 gene .finalize ()
533577 gene = parse_func (gene )
534578 genes .append (gene )
535- gene = GeneFeature (feature , seqrecord )
579+ gene = _construct_gene_from_feature (feature , seqrecord , GeneFeature )
580+ if not gene :
581+ continue
536582 elif feature .type in TranscriptFeature .types :
537583 # if the current gene is non-empty, and the feature is not a mRNA, then this is a isolated ncRNA
538584 # finish this gene and start a new one
539585 if feature .type != TranscriptFeatures .CODING_TRANSCRIPT and gene .has_children :
540586 gene .finalize ()
541587 gene = parse_func (gene )
542588 genes .append (gene )
543- gene = GeneFeature .from_transcript_or_cds_feature (feature , seqrecord )
589+ gene = _construct_gene_from_feature (feature , seqrecord , GeneFeature .from_transcript_or_cds_feature )
590+ # gene is None if it was not parseable
591+ if not gene :
592+ continue
544593 else :
545594 gene .add_child (feature )
546595 elif feature .type in IntervalFeature .types :
@@ -619,31 +668,42 @@ def group_gene_records_by_locus_tag(
619668 remaining_features = []
620669 source = None
621670 for f in seqrecord .features :
622- if f .type in GENBANK_GENE_FEATURES and "locus_tag" in f .qualifiers :
671+ if f .type in GENBANK_GENE_FEATURES and KnownQualifiers . LOCUS_TAG . value in f .qualifiers :
623672 gene_filtered_features .append (f )
624673 elif f .type == MetadataFeatures .SOURCE .value :
625674 source = f
626675 else :
627676 remaining_features .append (f )
628677
629- sorted_gene_filtered_features = sorted (gene_filtered_features , key = lambda f : f .qualifiers ["locus_tag" ])
678+ sorted_gene_filtered_features = sorted (
679+ gene_filtered_features , key = lambda f : f .qualifiers [KnownQualifiers .LOCUS_TAG .value ]
680+ )
630681
631682 genes = []
632683 for locus_tag , gene_features in itertools .groupby (
633- sorted_gene_filtered_features , key = lambda f : f .qualifiers ["locus_tag" ][0 ]
684+ sorted_gene_filtered_features , key = lambda f : f .qualifiers [KnownQualifiers . LOCUS_TAG . value ][0 ]
634685 ):
635686 # sort the features for this locus tag to bubble the "gene" feature to the top, if it exists
636- gene_features = sorted (gene_features , key = lambda f : f .type != "gene" )
637- gene = gene_features [0 ]
687+ gene_features = sorted (gene_features , key = lambda f : f .type != GeneFeatures .GENE .value )
638688
639- if gene .type not in GeneFeature .types :
640- raise GenBankLocusTagError ("Grouping by locus tag produced a mis-ordered interpretation" )
641- gene = GeneFeature (gene , seqrecord )
689+ # do we have more than one gene with this locus_tag?
690+ if len (gene_features ) > 1 and gene_features [1 ].type == GeneFeatures .GENE .value :
691+ raise GenBankLocusTagError (
692+ f"Grouping by locus tag found multiple gene features with the same locus tag:"
693+ f"\n { gene_features [0 ]} \n { gene_features [1 ]} "
694+ )
695+
696+ gene_feature = gene_features [0 ]
697+ if gene_feature .type == GeneFeatures .GENE .value :
698+ gene = _construct_gene_from_feature (gene_feature , seqrecord , GeneFeature )
699+ else :
700+ gene = _construct_gene_from_feature (gene_feature , seqrecord , GeneFeature .from_transcript_or_cds_feature )
701+ # gene is None if it was not parseable
702+ if not gene :
703+ continue
642704
643705 for feature in gene_features [1 :]:
644- if feature .type in GeneFeature .types :
645- raise GenBankLocusTagError ("Grouping by locus tag found two genes" )
646- elif feature .type in TranscriptFeature .types :
706+ if feature .type in TranscriptFeature .types :
647707 gene .add_child (feature )
648708 elif feature .type in IntervalFeature .types :
649709 if len (gene .children ) == 0 :
@@ -723,19 +783,23 @@ def _extract_generic_features(
723783 """
724784
725785 # sort by locus tag, or null if no locus tag is provided.
726- sorted_filtered_features = sorted (filtered_features , key = lambda f : f .qualifiers .get ("locus_tag" , ["" ])[0 ])
786+ sorted_filtered_features = sorted (
787+ filtered_features , key = lambda f : f .qualifiers .get (KnownQualifiers .LOCUS_TAG .value , ["" ])[0 ]
788+ )
727789
728790 feature_collections = []
729791 for locus_tag , features in itertools .groupby (
730- sorted_filtered_features , key = lambda f : f .qualifiers .get ("locus_tag" , ["" ])[0 ]
792+ sorted_filtered_features , key = lambda f : f .qualifiers .get (KnownQualifiers . LOCUS_TAG . value , ["" ])[0 ]
731793 ):
732794 if not locus_tag :
733795 # we are in the null scenario, meaning that there are no locus tag information and thus no groupings.
734796 for feature in features :
735- feature_collection = FeatureIntervalGenBankCollection ([feature ], seqrecord )
736- feature_collections .append (feature_collection )
797+ feature_collection = _construct_feature_collection_from_features ([feature ], seqrecord )
798+ if feature_collection :
799+ feature_collections .append (feature_collection )
737800 else :
738- feature_collection = FeatureIntervalGenBankCollection (list (features ), seqrecord )
739- feature_collections .append (feature_collection )
801+ feature_collection = _construct_feature_collection_from_features (list (features ), seqrecord )
802+ if feature_collection :
803+ feature_collections .append (feature_collection )
740804
741805 return [feature_parse_func (fc ) for fc in feature_collections ] if feature_collections else None
0 commit comments