Skip to content

Commit 5bb6deb

Browse files
Merge pull request #80 from open-sciencelab/feature/uniprot_search
feat: add get_by_fasta in UniProtSearch
2 parents 96a5799 + 2267de9 commit 5bb6deb

File tree

4 files changed

+106
-46
lines changed

4 files changed

+106
-46
lines changed
Lines changed: 103 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,118 @@
1-
import requests
2-
from fastapi import HTTPException
1+
from io import StringIO
2+
from typing import Dict, Optional
33

4-
from graphgen.utils import logger
4+
from Bio import ExPASy, SeqIO, SwissProt, UniProt
5+
from Bio.Blast import NCBIWWW, NCBIXML
56

6-
UNIPROT_BASE = "https://rest.uniprot.org/uniprotkb/search"
7+
from graphgen.utils import logger
78

89

910
class UniProtSearch:
1011
"""
1112
UniProt Search client to search with UniProt.
1213
1) Get the protein by accession number.
13-
2) Search with keywords or protein names.
14+
2) Search with keywords or protein names (fuzzy search).
15+
3) Search with FASTA sequence (BLAST search).
1416
"""
1517

16-
def get_entry(self, accession: str) -> dict:
18+
def get_by_accession(self, accession: str) -> Optional[dict]:
19+
try:
20+
handle = ExPASy.get_sprot_raw(accession)
21+
record = SwissProt.read(handle)
22+
handle.close()
23+
return self._swissprot_to_dict(record)
24+
except Exception as exc: # pylint: disable=broad-except
25+
logger.error("Accession %s not found: %s", accession, exc)
26+
return None
27+
28+
@staticmethod
29+
def _swissprot_to_dict(record: SwissProt.Record) -> dict:
30+
"""error
31+
Convert a SwissProt.Record to a dictionary.
1732
"""
18-
Get the UniProt entry by accession number(e.g., P04637).
33+
functions = []
34+
for line in record.comments:
35+
if line.startswith("FUNCTION:"):
36+
functions.append(line[9:].strip())
37+
38+
return {
39+
"molecule_type": "protein",
40+
"database": "UniProt",
41+
"id": record.accessions[0],
42+
"entry_name": record.entry_name,
43+
"gene_names": record.gene_name,
44+
"protein_name": record.description.split(";")[0].split("=")[-1],
45+
"organism": record.organism.split(" (")[0],
46+
"sequence": str(record.sequence),
47+
"function": functions,
48+
"url": f"https://www.uniprot.org/uniprot/{record.accessions[0]}",
49+
}
50+
51+
def get_best_hit(self, keyword: str) -> Optional[Dict]:
1952
"""
20-
url = f"{UNIPROT_BASE}/{accession}.json"
21-
return self._safe_get(url).json()
22-
23-
def search(
24-
self,
25-
query: str,
26-
*,
27-
size: int = 10,
28-
cursor: str = None,
29-
fields: list[str] = None,
30-
) -> dict:
53+
Search UniProt with a keyword and return the best hit.
54+
:param keyword: The search keyword.
55+
:return: A dictionary containing the best hit information or None if not found.
3156
"""
32-
Search UniProt with a query string.
33-
:param query: The search query.
34-
:param size: The number of results to return.
35-
:param cursor: The cursor for pagination.
36-
:param fields: The fields to return in the response.
37-
:return: A dictionary containing the search results.
57+
if not keyword.strip():
58+
return None
59+
60+
try:
61+
iterator = UniProt.search(keyword, fields=None, batch_size=1)
62+
hit = next(iterator, None)
63+
if hit is None:
64+
return None
65+
return self.get_by_accession(hit["primaryAccession"])
66+
67+
except Exception as e: # pylint: disable=broad-except
68+
logger.error("Keyword %s not found: %s", keyword, e)
69+
return None
70+
71+
def get_by_fasta(self, fasta_sequence: str, threshold: float) -> Optional[Dict]:
3872
"""
39-
params = {
40-
"query": query,
41-
"size": size,
42-
}
43-
if cursor:
44-
params["cursor"] = cursor
45-
if fields:
46-
params["fields"] = ",".join(fields)
47-
url = UNIPROT_BASE
48-
return self._safe_get(url, params=params).json()
73+
Search UniProt with a FASTA sequence and return the best hit.
74+
:param fasta_sequence: The FASTA sequence.
75+
:param threshold: E-value threshold for BLAST search.
76+
:return: A dictionary containing the best hit information or None if not found.
77+
"""
78+
try:
79+
if fasta_sequence.startswith(">"):
80+
seq = str(list(SeqIO.parse(StringIO(fasta_sequence), "fasta"))[0].seq)
81+
else:
82+
seq = fasta_sequence.strip()
83+
except Exception as e: # pylint: disable=broad-except
84+
logger.error("Invalid FASTA sequence: %s", e)
85+
return None
4986

50-
@staticmethod
51-
def _safe_get(url: str, params: dict = None) -> requests.Response:
52-
r = requests.get(
53-
url,
54-
params=params,
55-
headers={"Accept": "application/json"},
56-
timeout=10,
57-
)
58-
if not r.ok:
59-
logger.error("Search engine error: %s", r.text)
60-
raise HTTPException(r.status_code, "Search engine error.")
61-
return r
87+
if not seq:
88+
logger.error("Empty FASTA sequence provided.")
89+
return None
90+
91+
# UniProtKB/Swiss-Prot BLAST API
92+
try:
93+
result_handle = NCBIWWW.qblast(
94+
program="blastp",
95+
database="swissprot",
96+
sequence=seq,
97+
hitlist_size=1,
98+
expect=threshold,
99+
)
100+
blast_record = NCBIXML.read(result_handle)
101+
except Exception as e: # pylint: disable=broad-except
102+
logger.error("BLAST search failed: %s", e)
103+
return None
104+
105+
if not blast_record.alignments:
106+
logger.info("No BLAST hits found for the given sequence.")
107+
return None
108+
109+
best_alignment = blast_record.alignments[0]
110+
best_hsp = best_alignment.hsps[0]
111+
if best_hsp.expect > threshold:
112+
logger.info("No BLAST hits below the threshold E-value.")
113+
return None
114+
hit_id = best_alignment.hit_id
115+
116+
# like sp|P01308.1|INS_HUMAN
117+
accession = hit_id.split("|")[1].split(".")[0] if "|" in hit_id else hit_id
118+
return self.get_by_accession(accession)

graphgen/operators/search/db/__init__.py

Whitespace-only changes.

graphgen/operators/search/db/search_uniprot.py

Whitespace-only changes.

requirements.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,8 @@ leidenalg
2424
igraph
2525
python-louvain
2626

27+
# Bioinformatics
28+
biopython
29+
2730
# For visualization
2831
matplotlib

0 commit comments

Comments
 (0)