Skip to content

Commit 89d6354

Browse files
apacheGH-40384: [Python] Expand the C Device Interface bindings to support import on CUDA device (apache#40385)
### Rationale for this change Follow-up on apache#39979 which added `_export_to_c_device`/`_import_from_c_device` methods, but for now only for CPU devices. ### What changes are included in this PR? * Ensure `pyarrow.cuda` is imported before importing data through the C Interface, to ensure the CUDA device is registered * Add tests for exporting/importing with the device interface on CUDA ### Are these changes tested? Yes, added tests for CUDA. * GitHub Issue: apache#40384 Authored-by: Joris Van den Bossche <[email protected]> Signed-off-by: Joris Van den Bossche <[email protected]>
1 parent 5252090 commit 89d6354

File tree

6 files changed

+210
-8
lines changed

6 files changed

+210
-8
lines changed

python/pyarrow/array.pxi

+6-4
Original file line numberDiff line numberDiff line change
@@ -1825,23 +1825,25 @@ cdef class Array(_PandasConvertible):
18251825
This is a low-level function intended for expert users.
18261826
"""
18271827
cdef:
1828-
void* c_ptr = _as_c_pointer(in_ptr)
1828+
ArrowDeviceArray* c_device_array = <ArrowDeviceArray*>_as_c_pointer(in_ptr)
18291829
void* c_type_ptr
18301830
shared_ptr[CArray] c_array
18311831

1832+
if c_device_array.device_type == ARROW_DEVICE_CUDA:
1833+
_ensure_cuda_loaded()
1834+
18321835
c_type = pyarrow_unwrap_data_type(type)
18331836
if c_type == nullptr:
18341837
# Not a DataType object, perhaps a raw ArrowSchema pointer
18351838
c_type_ptr = _as_c_pointer(type)
18361839
with nogil:
18371840
c_array = GetResultValue(
1838-
ImportDeviceArray(<ArrowDeviceArray*> c_ptr,
1839-
<ArrowSchema*> c_type_ptr)
1841+
ImportDeviceArray(c_device_array, <ArrowSchema*> c_type_ptr)
18401842
)
18411843
else:
18421844
with nogil:
18431845
c_array = GetResultValue(
1844-
ImportDeviceArray(<ArrowDeviceArray*> c_ptr, c_type)
1846+
ImportDeviceArray(c_device_array, c_type)
18451847
)
18461848
return pyarrow_wrap_array(c_array)
18471849

python/pyarrow/includes/libarrow.pxd

+6-1
Original file line numberDiff line numberDiff line change
@@ -2964,8 +2964,13 @@ cdef extern from "arrow/c/abi.h":
29642964
cdef struct ArrowArrayStream:
29652965
void (*release)(ArrowArrayStream*) noexcept nogil
29662966

2967+
ctypedef int32_t ArrowDeviceType
2968+
cdef ArrowDeviceType ARROW_DEVICE_CUDA
2969+
29672970
cdef struct ArrowDeviceArray:
2968-
pass
2971+
ArrowArray array
2972+
int64_t device_id
2973+
int32_t device_type
29692974

29702975
cdef extern from "arrow/c/bridge.h" namespace "arrow" nogil:
29712976
CStatus ExportType(CDataType&, ArrowSchema* out)

python/pyarrow/lib.pyx

+19
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ UnionMode_DENSE = _UnionMode_DENSE
125125

126126
__pc = None
127127
__pac = None
128+
__cuda_loaded = None
128129

129130

130131
def _pc():
@@ -143,6 +144,24 @@ def _pac():
143144
return __pac
144145

145146

147+
def _ensure_cuda_loaded():
148+
# Try importing the cuda module to ensure libarrow_cuda gets loaded
149+
# to register the CUDA device for the C Data Interface import
150+
global __cuda_loaded
151+
if __cuda_loaded is None:
152+
try:
153+
import pyarrow.cuda # no-cython-lint
154+
__cuda_loaded = True
155+
except ImportError as exc:
156+
__cuda_loaded = str(exc)
157+
158+
if __cuda_loaded is not True:
159+
raise ImportError(
160+
"Trying to import data on a CUDA device, but PyArrow is not built with "
161+
f"CUDA support.\n(importing 'pyarrow.cuda' resulted in \"{__cuda_loaded}\")."
162+
)
163+
164+
146165
def _gdb_test_session():
147166
GdbTestSession()
148167

python/pyarrow/table.pxi

+6-3
Original file line numberDiff line numberDiff line change
@@ -3752,21 +3752,24 @@ cdef class RecordBatch(_Tabular):
37523752
This is a low-level function intended for expert users.
37533753
"""
37543754
cdef:
3755-
void* c_ptr = _as_c_pointer(in_ptr)
3755+
ArrowDeviceArray* c_device_array = <ArrowDeviceArray*>_as_c_pointer(in_ptr)
37563756
void* c_schema_ptr
37573757
shared_ptr[CRecordBatch] c_batch
37583758

3759+
if c_device_array.device_type == ARROW_DEVICE_CUDA:
3760+
_ensure_cuda_loaded()
3761+
37593762
c_schema = pyarrow_unwrap_schema(schema)
37603763
if c_schema == nullptr:
37613764
# Not a Schema object, perhaps a raw ArrowSchema pointer
37623765
c_schema_ptr = _as_c_pointer(schema, allow_null=True)
37633766
with nogil:
37643767
c_batch = GetResultValue(ImportDeviceRecordBatch(
3765-
<ArrowDeviceArray*> c_ptr, <ArrowSchema*> c_schema_ptr))
3768+
c_device_array, <ArrowSchema*> c_schema_ptr))
37663769
else:
37673770
with nogil:
37683771
c_batch = GetResultValue(ImportDeviceRecordBatch(
3769-
<ArrowDeviceArray*> c_ptr, c_schema))
3772+
c_device_array, c_schema))
37703773
return pyarrow_wrap_batch(c_batch)
37713774

37723775

python/pyarrow/tests/test_cffi.py

+21
Original file line numberDiff line numberDiff line change
@@ -705,3 +705,24 @@ def test_roundtrip_chunked_array_capsule_requested_schema():
705705
ValueError, match="Could not cast string to requested type int64"
706706
):
707707
chunked.__arrow_c_stream__(requested_capsule)
708+
709+
710+
def test_import_device_no_cuda():
711+
try:
712+
import pyarrow.cuda # noqa
713+
except ImportError:
714+
pass
715+
else:
716+
pytest.skip("pyarrow.cuda is available")
717+
718+
c_array = ffi.new("struct ArrowDeviceArray*")
719+
ptr_array = int(ffi.cast("uintptr_t", c_array))
720+
arr = pa.array([1, 2, 3], type=pa.int64())
721+
arr._export_to_c_device(ptr_array)
722+
723+
# patch the device type of the struct, this results in an invalid ArrowDeviceArray
724+
# but this is just to test we raise am error before actually importing buffers
725+
c_array.device_type = 2 # ARROW_DEVICE_CUDA
726+
727+
with pytest.raises(ImportError, match="Trying to import data on a CUDA device"):
728+
pa.Array._import_from_c_device(ptr_array, arr.type)

python/pyarrow/tests/test_cuda.py

+152
Original file line numberDiff line numberDiff line change
@@ -792,3 +792,155 @@ def test_IPC(size):
792792
p.start()
793793
p.join()
794794
assert p.exitcode == 0
795+
796+
797+
def _arr_copy_to_host(carr):
798+
# TODO replace below with copy to device when exposed in python
799+
buffers = []
800+
for cbuf in carr.buffers():
801+
if cbuf is None:
802+
buffers.append(None)
803+
else:
804+
buf = global_context.foreign_buffer(
805+
cbuf.address, cbuf.size, cbuf
806+
).copy_to_host()
807+
buffers.append(buf)
808+
809+
child = pa.Array.from_buffers(carr.type.value_type, 3, buffers[2:])
810+
new = pa.Array.from_buffers(carr.type, 2, buffers[:2], children=[child])
811+
return new
812+
813+
814+
def test_device_interface_array():
815+
cffi = pytest.importorskip("pyarrow.cffi")
816+
ffi = cffi.ffi
817+
818+
c_schema = ffi.new("struct ArrowSchema*")
819+
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
820+
c_array = ffi.new("struct ArrowDeviceArray*")
821+
ptr_array = int(ffi.cast("uintptr_t", c_array))
822+
823+
typ = pa.list_(pa.int32())
824+
arr = pa.array([[1], [2, 42]], type=typ)
825+
826+
# TODO replace below with copy to device when exposed in python
827+
cbuffers = []
828+
for buf in arr.buffers():
829+
if buf is None:
830+
cbuffers.append(None)
831+
else:
832+
cbuf = global_context.new_buffer(buf.size)
833+
cbuf.copy_from_host(buf, position=0, nbytes=buf.size)
834+
cbuffers.append(cbuf)
835+
836+
carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
837+
pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
838+
])
839+
840+
# Type is known up front
841+
carr._export_to_c_device(ptr_array)
842+
843+
# verify exported struct
844+
assert c_array.device_type == 2 # ARROW_DEVICE_CUDA 2
845+
assert c_array.device_id == global_context.device_number
846+
assert c_array.array.length == 2
847+
848+
# Delete recreate C++ object from exported pointer
849+
del carr
850+
carr_new = pa.Array._import_from_c_device(ptr_array, typ)
851+
assert carr_new.type == pa.list_(pa.int32())
852+
arr_new = _arr_copy_to_host(carr_new)
853+
assert arr_new.equals(arr)
854+
855+
del carr_new
856+
# Now released
857+
with pytest.raises(ValueError, match="Cannot import released ArrowArray"):
858+
pa.Array._import_from_c_device(ptr_array, typ)
859+
860+
# Schema is exported and imported at the same time
861+
carr = pa.Array.from_buffers(typ, 2, cbuffers[:2], children=[
862+
pa.Array.from_buffers(typ.value_type, 3, cbuffers[2:])
863+
])
864+
carr._export_to_c_device(ptr_array, ptr_schema)
865+
# Delete and recreate C++ objects from exported pointers
866+
del carr
867+
carr_new = pa.Array._import_from_c_device(ptr_array, ptr_schema)
868+
assert carr_new.type == pa.list_(pa.int32())
869+
arr_new = _arr_copy_to_host(carr_new)
870+
assert arr_new.equals(arr)
871+
872+
del carr_new
873+
# Now released
874+
with pytest.raises(ValueError, match="Cannot import released ArrowSchema"):
875+
pa.Array._import_from_c_device(ptr_array, ptr_schema)
876+
877+
878+
def _batch_copy_to_host(cbatch):
879+
# TODO replace below with copy to device when exposed in python
880+
arrs = []
881+
for col in cbatch.columns:
882+
buffers = [
883+
global_context.foreign_buffer(buf.address, buf.size, buf).copy_to_host()
884+
if buf is not None else None
885+
for buf in col.buffers()
886+
]
887+
new = pa.Array.from_buffers(col.type, len(col), buffers)
888+
arrs.append(new)
889+
890+
return pa.RecordBatch.from_arrays(arrs, schema=cbatch.schema)
891+
892+
893+
def test_device_interface_batch_array():
894+
cffi = pytest.importorskip("pyarrow.cffi")
895+
ffi = cffi.ffi
896+
897+
c_schema = ffi.new("struct ArrowSchema*")
898+
ptr_schema = int(ffi.cast("uintptr_t", c_schema))
899+
c_array = ffi.new("struct ArrowDeviceArray*")
900+
ptr_array = int(ffi.cast("uintptr_t", c_array))
901+
902+
batch = make_recordbatch(10)
903+
schema = batch.schema
904+
cbuf = cuda.serialize_record_batch(batch, global_context)
905+
cbatch = cuda.read_record_batch(cbuf, schema)
906+
907+
# Schema is known up front
908+
cbatch._export_to_c_device(ptr_array)
909+
910+
# verify exported struct
911+
assert c_array.device_type == 2 # ARROW_DEVICE_CUDA 2
912+
assert c_array.device_id == global_context.device_number
913+
assert c_array.array.length == 10
914+
915+
# Delete recreate C++ object from exported pointer
916+
del cbatch
917+
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, schema)
918+
assert cbatch_new.schema == schema
919+
batch_new = _batch_copy_to_host(cbatch_new)
920+
assert batch_new.equals(batch)
921+
922+
del cbatch_new
923+
# Now released
924+
with pytest.raises(ValueError, match="Cannot import released ArrowArray"):
925+
pa.RecordBatch._import_from_c_device(ptr_array, schema)
926+
927+
# Schema is exported and imported at the same time
928+
cbatch = cuda.read_record_batch(cbuf, schema)
929+
cbatch._export_to_c_device(ptr_array, ptr_schema)
930+
# Delete and recreate C++ objects from exported pointers
931+
del cbatch
932+
cbatch_new = pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)
933+
assert cbatch_new.schema == schema
934+
batch_new = _batch_copy_to_host(cbatch_new)
935+
assert batch_new.equals(batch)
936+
937+
del cbatch_new
938+
# Now released
939+
with pytest.raises(ValueError, match="Cannot import released ArrowSchema"):
940+
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)
941+
942+
# Not a struct type
943+
pa.int32()._export_to_c(ptr_schema)
944+
with pytest.raises(ValueError,
945+
match="ArrowSchema describes non-struct type"):
946+
pa.RecordBatch._import_from_c_device(ptr_array, ptr_schema)

0 commit comments

Comments
 (0)