Skip to content

Commit 85168ce

Browse files
authored
fix the parquet dataset creation with list_files and map. (#1521)
The change moves the `filename` IsDirectory check to the cc layer from python layer. This allows the users to use tf.data.Dataset.list_files(...) and map() to load multiple parquet files into a dataset.
1 parent bd78dc8 commit 85168ce

File tree

3 files changed

+45
-5
lines changed

3 files changed

+45
-5
lines changed

tensorflow_io/core/kernels/parquet_kernels.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,12 @@ class ParquetReadableResource : public ResourceBase {
3131

3232
Status Init(const string& input) {
3333
mutex_lock l(mu_);
34+
Status status = env_->IsDirectory(input);
35+
if (status.ok()) {
36+
return errors::InvalidArgument(
37+
"passing a directory path to 'filename' is not supported. ",
38+
"Use 'tf.data.Dataset.list_files()' with a map() operation instead.");
39+
}
3440

3541
file_.reset(new SizedRandomAccessFile(env_, input, nullptr, 0));
3642
TF_RETURN_IF_ERROR(file_->GetFileSize(&file_size_));

tensorflow_io/python/ops/parquet_dataset_ops.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,6 @@ def __init__(self, filename, columns=None, internal=True):
2727
"""ParquetIODataset."""
2828
assert internal
2929
with tf.name_scope("ParquetIODataset"):
30-
if tf.io.gfile.isdir(filename):
31-
raise ValueError(
32-
"passing a directory path to 'filename' is not supported. "
33-
"Use 'tf.data.Dataset.list_files()' with a map() operation instead."
34-
)
3530
components, shapes, dtypes = core_ops.io_parquet_readable_info(
3631
filename, shared=filename, container="ParquetIODataset"
3732
)

tests/test_parquet.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,5 +205,44 @@ def test_parquet_data():
205205
i += 1
206206

207207

208+
def test_parquet_dataset_from_file_pattern():
209+
"""Test the parquet dataset creation process using a file pattern"""
210+
df = pd.DataFrame({"pred_0": 0.1 * np.arange(100), "pred_1": -0.1 * np.arange(100)})
211+
df.to_parquet("df_x.parquet")
212+
df.to_parquet("df_y.parquet")
213+
columns = {
214+
"pred_0": tf.TensorSpec(tf.TensorShape([]), tf.double),
215+
"pred_1": tf.TensorSpec(tf.TensorShape([]), tf.double),
216+
}
217+
218+
def map_fn(file_location):
219+
return tfio.IODataset.from_parquet(file_location, columns=columns)
220+
221+
ds = tf.data.Dataset.list_files("*.parquet").map(map_fn)
222+
223+
for d in ds: # loop over the files
224+
for item in d: # loop over items
225+
print(item)
226+
227+
228+
def test_parquet_dataset_from_dir_failure():
229+
"""Test the dataset creation failure when a directory is passed
230+
instead of a filename."""
231+
dir_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "test_parquet")
232+
columns = {
233+
"pred_0": tf.TensorSpec(tf.TensorShape([]), tf.double),
234+
"pred_1": tf.TensorSpec(tf.TensorShape([]), tf.double),
235+
}
236+
try:
237+
_ = tfio.IODataset.from_parquet(dir_path, columns=columns)
238+
except Exception as e:
239+
assert (
240+
str(e)
241+
== "passing a directory path to 'filename' is not supported. "
242+
+ "Use 'tf.data.Dataset.list_files()' with a map() operation instead. "
243+
+ "[Op:IO>ParquetReadableInfo]"
244+
)
245+
246+
208247
if __name__ == "__main__":
209248
test.main()

0 commit comments

Comments
 (0)