Skip to content

Commit 3310980

Browse files
authored
Integrate fsspec to enable accessing WFDB files from cloud URIs (#523)
As mentioned in #517, we want to be able to read WFDB files from within cloud environments using WFDB-Python. This PR enables using the `fsspec` library ( https://filesystem-spec.readthedocs.io/en/latest/ ) to read WFDB files from cloud URIs. It replaces the standard Python `open` with `fsspec.open` . Also, it adds logic to differentiate between loading a file from a cloud URI or from a PhysioNet Database. In the initial commit, access has only been added for `rdheader`. We can expand this across all relevant WFDB functions once the approach has been agreed upon. I've tested this with a local `.hea` file, a file read from a PhysioNet Database (using `pn_dir`), and a file from a Datastore in the Azure AI / ML Studio.
2 parents 0d45b74 + 6089a88 commit 3310980

File tree

7 files changed

+175
-55
lines changed

7 files changed

+175
-55
lines changed

.github/workflows/test.yml

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,18 +38,20 @@ jobs:
3838
- name: Check style
3939
run: uv run --extra dev black --check --diff .
4040

41-
test-deb10-i386:
42-
name: Python 3.7 on Debian 10 i386
41+
test-deb11-i386:
42+
name: Python 3.7 on Debian 11 i386
4343
runs-on: ubuntu-latest
44-
container: i386/debian:10
44+
container: i386/debian:11
4545
steps:
4646
- name: Install dependencies
4747
run: |
4848
apt-get update
4949
apt-get install -y --no-install-recommends \
50+
python3-fsspec \
5051
python3-matplotlib \
5152
python3-numpy \
5253
python3-pandas \
54+
python3-pip \
5355
python3-requests \
5456
python3-scipy \
5557
python3-soundfile \

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ dependencies = [
1616
"soundfile >= 0.10.0",
1717
"matplotlib >= 3.2.2",
1818
"requests >= 2.8.1",
19+
"fsspec >= 2023.10.0",
20+
"aiohttp >= 3.10.11",
1921
]
2022
dynamic = ["version"]
2123

wfdb/io/_coreio.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
11
import posixpath
22

3+
import fsspec
4+
35
from wfdb.io import _url
46
from wfdb.io.download import config
57

68

9+
# Cloud protocols
10+
CLOUD_PROTOCOLS = ["az://", "azureml://", "s3://", "gs://"]
11+
12+
713
def _open_file(
814
pn_dir,
915
file_name,
@@ -26,10 +32,11 @@ def _open_file(
2632
----------
2733
pn_dir : str or None
2834
The PhysioNet database directory where the file is stored, or None
29-
if file_name is a local path.
35+
if file_name is a local or cloud path.
3036
file_name : str
31-
The name of the file, either as a local filesystem path (if
32-
`pn_dir` is None) or a URL path (if `pn_dir` is a string.)
37+
The name of the file, either as a local filesystem path or cloud
38+
URL (if `pn_dir` is None) or a PhysioNet URL path
39+
(if `pn_dir` is a string.)
3340
mode : str, optional
3441
The standard I/O mode for the file ("r" by default). If `pn_dir`
3542
is not None, this must be "r", "rt", or "rb".
@@ -47,7 +54,7 @@ def _open_file(
4754
4855
"""
4956
if pn_dir is None:
50-
return open(
57+
return fsspec.open(
5158
file_name,
5259
mode,
5360
buffering=buffering,
@@ -56,6 +63,12 @@ def _open_file(
5663
newline=newline,
5764
)
5865
else:
66+
# check to make sure a cloud path isn't being passed under pn_dir
67+
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
68+
raise ValueError(
69+
"Cloud paths should be passed under record_name, not under pn_dir"
70+
)
71+
5972
url = posixpath.join(config.db_index_url, pn_dir, file_name)
6073
return _url.openurl(
6174
url,

wfdb/io/_signal.py

Lines changed: 40 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import math
22
import os
3+
import posixpath
34
import sys
45

6+
import fsspec
57
import numpy as np
68

79
from wfdb.io import download, _coreio, util
8-
10+
from wfdb.io._coreio import CLOUD_PROTOCOLS
911

1012
MAX_I32 = 2147483647
1113
MIN_I32 = -2147483648
@@ -1643,10 +1645,10 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp):
16431645
The name of the dat file.
16441646
dir_name : str
16451647
The full directory where the dat file(s) are located, if the dat
1646-
file(s) are local.
1648+
file(s) are local or in the cloud.
16471649
pn_dir : str
16481650
The PhysioNet directory where the dat file(s) are located, if
1649-
the dat file(s) are remote.
1651+
the dat file(s) are on a PhysioNet server.
16501652
fmt : str
16511653
The format of the dat file.
16521654
start_byte : int
@@ -1686,15 +1688,22 @@ def _rd_dat_file(file_name, dir_name, pn_dir, fmt, start_byte, n_samp):
16861688
element_count = n_samp
16871689
byte_count = n_samp * BYTES_PER_SAMPLE[fmt]
16881690

1689-
# Local dat file
1691+
# Local or cloud dat file
16901692
if pn_dir is None:
1691-
with open(os.path.join(dir_name, file_name), "rb") as fp:
1693+
with fsspec.open(os.path.join(dir_name, file_name), "rb") as fp:
16921694
fp.seek(start_byte)
1693-
sig_data = np.fromfile(
1695+
sig_data = util.fromfile(
16941696
fp, dtype=np.dtype(DATA_LOAD_TYPES[fmt]), count=element_count
16951697
)
1696-
# Stream dat file from Physionet
1698+
1699+
# Stream dat file from PhysioNet
16971700
else:
1701+
# check to make sure a cloud path isn't being passed under pn_dir
1702+
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
1703+
raise ValueError(
1704+
"Cloud paths should be passed under record_name, not under pn_dir"
1705+
)
1706+
16981707
dtype_in = np.dtype(DATA_LOAD_TYPES[fmt])
16991708
sig_data = download._stream_dat(
17001709
file_name, pn_dir, byte_count, start_byte, dtype_in
@@ -1840,8 +1849,9 @@ def _rd_compressed_file(
18401849
file_name : str
18411850
The name of the signal file.
18421851
dir_name : str
1843-
The full directory where the signal file is located, if local.
1844-
This argument is ignored if `pn_dir` is not None.
1852+
The full directory where the signal file is located, if this
1853+
is a local or cloud path. This argument is ignored if `pn_dir`
1854+
is not None.
18451855
pn_dir : str or None
18461856
The PhysioNet database directory where the signal file is located.
18471857
fmt : str
@@ -2585,10 +2595,10 @@ def _infer_sig_len(
25852595
The byte offset of the dat file. None is equivalent to zero.
25862596
dir_name : str
25872597
The full directory where the dat file(s) are located, if the dat
2588-
file(s) are local.
2598+
file(s) are local or on the cloud.
25892599
pn_dir : str, optional
25902600
The PhysioNet directory where the dat file(s) are located, if
2591-
the dat file(s) are remote.
2601+
the dat file(s) are on a PhysioNet server.
25922602
25932603
Returns
25942604
-------
@@ -2600,13 +2610,29 @@ def _infer_sig_len(
26002610
sig_len * tsamps_per_frame * bytes_per_sample == file_size
26012611
26022612
"""
2603-
if pn_dir is None:
2604-
file_size = os.path.getsize(os.path.join(dir_name, file_name))
2605-
else:
2613+
from wfdb.io.record import CLOUD_PROTOCOLS
2614+
2615+
# If this is a cloud path, use posixpath to construct the path and fsspec to open file
2616+
if any(dir_name.startswith(proto) for proto in CLOUD_PROTOCOLS):
2617+
with fsspec.open(posixpath.join(dir_name, file_name), mode="rb") as f:
2618+
file_size = f.seek(0, os.SEEK_END)
2619+
2620+
# If the PhysioNet database path is provided, construct the download path using the database version
2621+
elif pn_dir is not None:
2622+
# check to make sure a cloud path isn't being passed under pn_dir
2623+
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
2624+
raise ValueError(
2625+
"Cloud paths should be passed under record_name, not under pn_dir"
2626+
)
2627+
26062628
file_size = download._remote_file_size(
26072629
file_name=file_name, pn_dir=pn_dir
26082630
)
26092631

2632+
# If it isn't a cloud path or a PhysioNet path, we treat as a local file
2633+
else:
2634+
file_size = os.path.getsize(os.path.join(dir_name, file_name))
2635+
26102636
if byte_offset is None:
26112637
byte_offset = 0
26122638
data_size = file_size - byte_offset

wfdb/io/annotation.py

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import copy
2+
import fsspec
23
import numpy as np
34
import os
45
import pandas as pd
@@ -9,6 +10,8 @@
910
from wfdb.io import download
1011
from wfdb.io import _header
1112
from wfdb.io import record
13+
from wfdb.io import util
14+
from wfdb.io._coreio import CLOUD_PROTOCOLS
1215

1316

1417
class Annotation(object):
@@ -1892,7 +1895,7 @@ def rdann(
18921895
----------
18931896
record_name : str
18941897
The record name of the WFDB annotation file. ie. for file '100.atr',
1895-
record_name='100'.
1898+
record_name='100'. The path to the file can be a cloud URL.
18961899
extension : str
18971900
The annotatator extension of the annotation file. ie. for file
18981901
'100.atr', extension='atr'.
@@ -1936,11 +1939,17 @@ def rdann(
19361939
>>> ann = wfdb.rdann('sample-data/100', 'atr', sampto=300000)
19371940
19381941
"""
1939-
if (pn_dir is not None) and ("." not in pn_dir):
1940-
dir_list = pn_dir.split("/")
1941-
pn_dir = posixpath.join(
1942-
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
1943-
)
1942+
if pn_dir is not None:
1943+
# check to make sure a cloud path isn't being passed under pn_dir
1944+
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
1945+
raise ValueError(
1946+
"Cloud paths should be passed under record_name, not under pn_dir"
1947+
)
1948+
if "." not in pn_dir:
1949+
dir_list = pn_dir.split("/")
1950+
pn_dir = posixpath.join(
1951+
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
1952+
)
19441953

19451954
return_label_elements = check_read_inputs(
19461955
sampfrom, sampto, return_label_elements
@@ -2071,7 +2080,7 @@ def load_byte_pairs(record_name, extension, pn_dir):
20712080
----------
20722081
record_name : str
20732082
The record name of the WFDB annotation file. ie. for file '100.atr',
2074-
record_name='100'.
2083+
record_name='100'. The path to the file can be a cloud URL.
20752084
extension : str
20762085
The annotatator extension of the annotation file. ie. for file
20772086
'100.atr', extension='atr'.
@@ -2086,10 +2095,11 @@ def load_byte_pairs(record_name, extension, pn_dir):
20862095
The input filestream converted to an Nx2 array of unsigned bytes.
20872096
20882097
"""
2089-
# local file
2098+
# local or cloud file
20902099
if pn_dir is None:
2091-
with open(record_name + "." + extension, "rb") as f:
2092-
filebytes = np.fromfile(f, "<u1").reshape([-1, 2])
2100+
with fsspec.open(record_name + "." + extension, "rb") as f:
2101+
filebytes = util.fromfile(f, "<u1").reshape([-1, 2])
2102+
20932103
# PhysioNet file
20942104
else:
20952105
filebytes = download._stream_annotation(

wfdb/io/record.py

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import re
66

7+
import fsspec
78
import numpy as np
89
import pandas as pd
910

@@ -13,6 +14,7 @@
1314
from wfdb.io import download
1415
from wfdb.io import header
1516
from wfdb.io import util
17+
from wfdb.io._coreio import CLOUD_PROTOCOLS
1618

1719

1820
# -------------- WFDB Signal Calibration and Classification ---------- #
@@ -1824,27 +1826,39 @@ def rdheader(record_name, pn_dir=None, rd_segments=False):
18241826
18251827
"""
18261828
dir_name, base_record_name = os.path.split(record_name)
1827-
dir_name = os.path.abspath(dir_name)
1829+
file_name = f"{base_record_name}.hea"
18281830

1829-
# Construct the download path using the database version
1830-
if (pn_dir is not None) and ("." not in pn_dir):
1831-
dir_list = pn_dir.split("/")
1832-
pn_dir = posixpath.join(
1833-
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
1834-
)
1831+
# If this is a cloud path, use posixpath to construct the path and fsspec to open file
1832+
if any(dir_name.startswith(proto) for proto in CLOUD_PROTOCOLS):
1833+
with fsspec.open(posixpath.join(dir_name, file_name), mode="r") as f:
1834+
header_content = f.read()
18351835

1836-
# Read the local or remote header file.
1837-
file_name = f"{base_record_name}.hea"
1838-
if pn_dir is None:
1839-
with open(
1836+
# If the PhysioNet database path is provided, construct the download path using the database version
1837+
elif pn_dir is not None:
1838+
# check to make sure a cloud path isn't being passed under pn_dir
1839+
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
1840+
raise ValueError(
1841+
"Cloud paths should be passed under record_name, not under pn_dir"
1842+
)
1843+
1844+
if "." not in pn_dir:
1845+
dir_list = pn_dir.split("/")
1846+
pn_dir = posixpath.join(
1847+
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
1848+
)
1849+
1850+
header_content = download._stream_header(file_name, pn_dir)
1851+
1852+
# If it isn't a cloud path or a PhysioNet path, we treat as a local file
1853+
else:
1854+
dir_name = os.path.abspath(dir_name)
1855+
with fsspec.open(
18401856
os.path.join(dir_name, file_name),
18411857
"r",
18421858
encoding="ascii",
18431859
errors="ignore",
18441860
) as f:
18451861
header_content = f.read()
1846-
else:
1847-
header_content = download._stream_header(file_name, pn_dir)
18481862

18491863
# Separate comment and non-comment lines
18501864
header_lines, comment_lines = header.parse_header_content(header_content)
@@ -2017,14 +2031,22 @@ def rdrecord(
20172031
20182032
"""
20192033
dir_name, base_record_name = os.path.split(record_name)
2020-
dir_name = os.path.abspath(dir_name)
2034+
# Update the dir_name using abspath unless it is a cloud path
2035+
if not any(dir_name.startswith(proto) for proto in CLOUD_PROTOCOLS):
2036+
dir_name = os.path.abspath(dir_name)
20212037

20222038
# Read the header fields
2023-
if (pn_dir is not None) and ("." not in pn_dir):
2024-
dir_list = pn_dir.split("/")
2025-
pn_dir = posixpath.join(
2026-
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
2027-
)
2039+
if pn_dir is not None:
2040+
# check to make sure a cloud path isn't being passed under pn_dir
2041+
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
2042+
raise ValueError(
2043+
"Cloud paths should be passed under record_name, not under pn_dir"
2044+
)
2045+
if "." not in pn_dir:
2046+
dir_list = pn_dir.split("/")
2047+
pn_dir = posixpath.join(
2048+
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
2049+
)
20282050

20292051
record = rdheader(record_name, pn_dir=pn_dir, rd_segments=False)
20302052

@@ -2308,11 +2330,17 @@ def rdsamp(
23082330
channels=[1,3])
23092331
23102332
"""
2311-
if (pn_dir is not None) and ("." not in pn_dir):
2312-
dir_list = pn_dir.split("/")
2313-
pn_dir = posixpath.join(
2314-
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
2315-
)
2333+
if pn_dir is not None:
2334+
# check to make sure a cloud path isn't being passed under pn_dir
2335+
if any(pn_dir.startswith(proto) for proto in CLOUD_PROTOCOLS):
2336+
raise ValueError(
2337+
"Cloud paths should be passed under record_name, not under pn_dir"
2338+
)
2339+
if "." not in pn_dir:
2340+
dir_list = pn_dir.split("/")
2341+
pn_dir = posixpath.join(
2342+
dir_list[0], download.get_version(dir_list[0]), *dir_list[1:]
2343+
)
23162344

23172345
record = rdrecord(
23182346
record_name=record_name,

0 commit comments

Comments
 (0)