Skip to content

Commit

Permalink
OGRLayer::GetArrowStream(): add a DATETIME_AS_STRING=YES/NO option
Browse files Browse the repository at this point in the history
DATETIME_AS_STRING=YES/NO. Defaults to NO. Added in GDAL 3.11.
Whether DateTime fields should be returned as a (normally ISO-8601
formatted) string by drivers. The aim is to be able to handle mixed
timezones (or timezone naive values) in the same column.
All drivers must honour that option, and potentially fallback to the
OGRLayer generic implementation if they cannot (which is the case for the
Arrow, Parquet and ADBC drivers).
When DATETIME_AS_STRING=YES, the TIMEZONE option is ignored.

Fixes geopandas/pyogrio#487
  • Loading branch information
rouault committed Nov 5, 2024
1 parent c578e9d commit bc71db1
Show file tree
Hide file tree
Showing 13 changed files with 456 additions and 73 deletions.
24 changes: 24 additions & 0 deletions autotest/ogr/ogr_adbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,30 @@ def test_ogr_adbc_test_ogrsf_parquet_filename_with_glob():
assert "ERROR" not in ret


###############################################################################
# Test DATETIME_AS_STRING=YES GetArrowStream() option


def test_ogr_adbc_arrow_stream_numpy_datetime_as_string(tmp_vsimem):
pytest.importorskip("osgeo.gdal_array")
pytest.importorskip("numpy")

if not _has_libduckdb():
pytest.skip("libduckdb.so missing")

with gdal.OpenEx(
"data/parquet/test.parquet", gdal.OF_VECTOR, allowed_drivers=["ADBC"]
) as ds:
lyr = ds.GetLayer(0)
stream = lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "DATETIME_AS_STRING=YES"]
)
batches = [batch for batch in stream]
batch = batches[0]
# Should be "2019-01-01T14:00:00.500-02:15" but DuckDB returns in UTC without the timezone
assert batch["timestamp_ms_gmt_minus_0215"][0] == b"2019-01-01T16:15:00.500"


###############################################################################
# Run test_ogrsf on a DuckDB dataset

Expand Down
49 changes: 49 additions & 0 deletions autotest/ogr/ogr_flatgeobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1564,3 +1564,52 @@ def test_ogr_flatgeobuf_sql_arrow(tmp_vsimem):
assert f["bar"] == "baz"
assert f.GetGeometryRef().ExportToWkt() == "POINT (1 2)"
f = tmp_lyr.GetNextFeature()


###############################################################################
# Test DATETIME_AS_STRING=YES GetArrowStream() option


def test_ogr_flatgeobuf_arrow_stream_numpy_datetime_as_string(tmp_vsimem):
pytest.importorskip("osgeo.gdal_array")
pytest.importorskip("numpy")

filename = str(tmp_vsimem / "datetime_as_string.fgb")
with ogr.GetDriverByName("FlatGeoBuf").CreateDataSource(filename) as ds:
lyr = ds.CreateLayer("test")

field = ogr.FieldDefn("datetime", ogr.OFTDateTime)
lyr.CreateField(field)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetGeometry(ogr.CreateGeometryFromWkt("POINT (1 2)"))
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56.789Z")
f.SetGeometry(ogr.CreateGeometryFromWkt("POINT (1 2)"))
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56")
f.SetGeometry(ogr.CreateGeometryFromWkt("POINT (1 2)"))
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56+12:30")
f.SetGeometry(ogr.CreateGeometryFromWkt("POINT (1 2)"))
lyr.CreateFeature(f)

with ogr.Open(filename) as ds:
lyr = ds.GetLayer(0)
stream = lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "DATETIME_AS_STRING=YES"]
)
batches = [batch for batch in stream]
assert len(batches) == 1
batch = batches[0]
assert len(batch["datetime"]) == 4
assert batch["datetime"][0] == b""
assert batch["datetime"][1] == b"2022-05-31T12:34:56.789Z"
assert batch["datetime"][2] == b"2022-05-31T12:34:56"
assert batch["datetime"][3] == b"2022-05-31T12:34:56+12:30"
57 changes: 57 additions & 0 deletions autotest/ogr/ogr_gpkg.py
Original file line number Diff line number Diff line change
Expand Up @@ -10766,3 +10766,60 @@ def test_gpkg_secure_delete(tmp_vsimem):
with ds.ExecuteSQL("PRAGMA secure_delete") as sql_lyr:
f = sql_lyr.GetNextFeature()
assert f.GetField(0) == 0


###############################################################################
# Test DATETIME_AS_STRING=YES GetArrowStream() option


def test_ogr_gpkg_arrow_stream_numpy_datetime_as_string(tmp_vsimem):
pytest.importorskip("osgeo.gdal_array")
pytest.importorskip("numpy")

filename = str(tmp_vsimem / "datetime_as_string.gpkg")
ds = ogr.GetDriverByName("GPKG").CreateDataSource(filename)
lyr = ds.CreateLayer("test")

field = ogr.FieldDefn("datetime", ogr.OFTDateTime)
lyr.CreateField(field)

f = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56.789Z")
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56.000")
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56.000+12:30")
lyr.CreateFeature(f)

# Test DATETIME_AS_STRING=YES
stream = lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "DATETIME_AS_STRING=YES"]
)
batches = [batch for batch in stream]
assert len(batches) == 1
batch = batches[0]
assert len(batch["datetime"]) == 4
assert batch["datetime"][0] == b""
assert batch["datetime"][1] == b"2022-05-31T12:34:56.789Z"
assert batch["datetime"][2] == b"2022-05-31T12:34:56.000"
assert batch["datetime"][3] == b"2022-05-31T12:34:56.000+12:30"

with ds.ExecuteSQL("SELECT * FROM test") as sql_lyr:
stream = sql_lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "DATETIME_AS_STRING=YES"]
)
batches = [batch for batch in stream]
assert len(batches) == 1
batch = batches[0]
assert len(batch["datetime"]) == 4
assert batch["datetime"][0] == b""
assert batch["datetime"][1] == b"2022-05-31T12:34:56.789Z"
assert batch["datetime"][2] == b"2022-05-31T12:34:56.000"
assert batch["datetime"][3] == b"2022-05-31T12:34:56.000+12:30"
43 changes: 43 additions & 0 deletions autotest/ogr/ogr_mem.py
Original file line number Diff line number Diff line change
Expand Up @@ -979,6 +979,49 @@ def test_ogr_mem_arrow_stream_numpy():
assert len(batches) == 0


###############################################################################
# Test DATETIME_AS_STRING=YES GetArrowStream() option


def test_ogr_mem_arrow_stream_numpy_datetime_as_string():
pytest.importorskip("osgeo.gdal_array")
pytest.importorskip("numpy")

ds = ogr.GetDriverByName("Memory").CreateDataSource("")
lyr = ds.CreateLayer("foo")

field = ogr.FieldDefn("datetime", ogr.OFTDateTime)
lyr.CreateField(field)

f = ogr.Feature(lyr.GetLayerDefn())
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56.789Z")
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56")
lyr.CreateFeature(f)

f = ogr.Feature(lyr.GetLayerDefn())
f.SetField("datetime", "2022-05-31T12:34:56+12:30")
lyr.CreateFeature(f)

# Test DATETIME_AS_STRING=YES
stream = lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "DATETIME_AS_STRING=YES"]
)
batches = [batch for batch in stream]
assert len(batches) == 1
batch = batches[0]
assert len(batch["datetime"]) == 4
assert batch["datetime"][0] == b""
assert batch["datetime"][1] == b"2022-05-31T12:34:56.789Z"
assert batch["datetime"][2] == b"2022-05-31T12:34:56"
assert batch["datetime"][3] == b"2022-05-31T12:34:56+12:30"


###############################################################################


Expand Down
22 changes: 22 additions & 0 deletions autotest/ogr/ogr_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -4150,3 +4150,25 @@ def test_ogr_parquet_IsArrowSchemaSupported_arrow_15_types(
success, error_msg = dst_lyr.IsArrowSchemaSupported(schema)
assert not success
assert error_msg == expected_error_msg


###############################################################################
# Test DATETIME_AS_STRING=YES GetArrowStream() option


def test_ogr_parquet_arrow_stream_numpy_datetime_as_string(tmp_vsimem):
pytest.importorskip("osgeo.gdal_array")
pytest.importorskip("numpy")

with gdal.OpenEx(
"data/parquet/test.parquet", gdal.OF_VECTOR, allowed_drivers=["Parquet"]
) as ds:
lyr = ds.GetLayer(0)
stream = lyr.GetArrowStreamAsNumPy(
options=["USE_MASKED_ARRAYS=NO", "DATETIME_AS_STRING=YES"]
)
batches = [batch for batch in stream]
batch = batches[0]
assert (
batch["timestamp_ms_gmt_minus_0215"][0] == b"2019-01-01T14:00:00.500-02:15"
)
5 changes: 4 additions & 1 deletion ogr/ogrsf_frmts/adbc/ogradbclayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,11 @@ GDALDataset *OGRADBCLayer::GetDataset()
bool OGRADBCLayer::GetArrowStream(struct ArrowArrayStream *out_stream,
CSLConstList papszOptions)
{
if (m_poFilterGeom || m_poAttrQuery)
if (m_poFilterGeom || m_poAttrQuery ||
CPLFetchBool(papszOptions, "DATETIME_AS_STRING", false))
{
return OGRLayer::GetArrowStream(out_stream, papszOptions);
}

if (m_stream)
{
Expand Down
7 changes: 7 additions & 0 deletions ogr/ogrsf_frmts/arrow_common/ograrrowlayer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5490,6 +5490,13 @@ inline bool OGRArrowLayer::UseRecordBatchBaseImplementation() const
return true;
}

if (m_aosArrowArrayStreamOptions.FetchBool("DATETIME_AS_STRING", false))
{
CPLDebug("ARROW", "DATETIME_AS_STRING=YES not compatible of fast "
"Arrow implementation");
return true;
}

if (EQUAL(m_aosArrowArrayStreamOptions.FetchNameValueDef(
"GEOMETRY_ENCODING", ""),
"WKB"))
Expand Down
98 changes: 54 additions & 44 deletions ogr/ogrsf_frmts/flatgeobuf/ogrflatgeobuflayer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1481,6 +1481,8 @@ int OGRFlatGeobufLayer::GetNextArrowArray(struct ArrowArrayStream *stream,
}

const GIntBig nFeatureIdxStart = m_featuresPos;
const bool bDateTimeAsString =
m_aosArrowArrayStreamOptions.FetchBool("DATETIME_AS_STRING", false);

const uint32_t nMemLimit = OGRArrowArrayHelper::GetMemLimit();
while (iFeat < sHelper.m_nMaxBatchSize)
Expand Down Expand Up @@ -1851,6 +1853,58 @@ int OGRFlatGeobufLayer::GetNextArrowArray(struct ArrowArrayStream *stream,
offset += sizeof(double);
break;

case ColumnType::DateTime:
{
if (!bDateTimeAsString)
{
if (offset + sizeof(uint32_t) > size)
{
CPLErrorInvalidSize("datetime length ");
goto error;
}
uint32_t len;
memcpy(&len, data + offset, sizeof(int32_t));
CPL_LSBPTR32(&len);
offset += sizeof(uint32_t);
if (len > size - offset || len > 32)
{
CPLErrorInvalidSize("datetime value");
goto error;
}
if (!isIgnored)
{
OGRField ogrField;
if (ParseDateTime(
reinterpret_cast<const char *>(data +
offset),
len, &ogrField))
{
sHelper.SetDateTime(
psArray, iFeat, brokenDown,
sHelper.m_anTZFlags[i], ogrField);
}
else
{
char str[32 + 1];
memcpy(str, data + offset, len);
str[len] = '\0';
if (OGRParseDate(str, &ogrField, 0))
{
sHelper.SetDateTime(
psArray, iFeat, brokenDown,
sHelper.m_anTZFlags[i], ogrField);
}
}
}
offset += len;
break;
}
else
{
[[fallthrough]];
}
}

case ColumnType::String:
case ColumnType::Json:
case ColumnType::Binary:
Expand Down Expand Up @@ -1896,50 +1950,6 @@ int OGRFlatGeobufLayer::GetNextArrowArray(struct ArrowArrayStream *stream,
offset += len;
break;
}

case ColumnType::DateTime:
{
if (offset + sizeof(uint32_t) > size)
{
CPLErrorInvalidSize("datetime length ");
goto error;
}
uint32_t len;
memcpy(&len, data + offset, sizeof(int32_t));
CPL_LSBPTR32(&len);
offset += sizeof(uint32_t);
if (len > size - offset || len > 32)
{
CPLErrorInvalidSize("datetime value");
goto error;
}
if (!isIgnored)
{
OGRField ogrField;
if (ParseDateTime(reinterpret_cast<const char *>(
data + offset),
len, &ogrField))
{
sHelper.SetDateTime(psArray, iFeat, brokenDown,
sHelper.m_anTZFlags[i],
ogrField);
}
else
{
char str[32 + 1];
memcpy(str, data + offset, len);
str[len] = '\0';
if (OGRParseDate(str, &ogrField, 0))
{
sHelper.SetDateTime(
psArray, iFeat, brokenDown,
sHelper.m_anTZFlags[i], ogrField);
}
}
}
offset += len;
break;
}
}
}
}
Expand Down
Loading

0 comments on commit bc71db1

Please sign in to comment.