[dask] dask cudf inplace prediction. (#5512)
* Add inplace prediction for dask-cudf. * Remove Dockerfile.release, since it's not used anywhere * Use Conda exclusively in CUDF and GPU containers * Improve cupy memory copying. * Add skip marks to tests. * Add mgpu-cudf category on the CI to run all distributed tests. Co-authored-by: Hyunsu Cho <chohyu01@cs.washington.edu>
This commit is contained in:
@@ -209,15 +209,32 @@ def ctypes2numpy(cptr, length, dtype):
|
||||
|
||||
def ctypes2cupy(cptr, length, dtype):
|
||||
"""Convert a ctypes pointer array to a cupy array."""
|
||||
import cupy # pylint: disable=import-error
|
||||
mem = cupy.zeros(length.value, dtype=dtype, order='C')
|
||||
# pylint: disable=import-error
|
||||
import cupy
|
||||
from cupy.cuda.memory import MemoryPointer
|
||||
from cupy.cuda.memory import UnownedMemory
|
||||
CUPY_TO_CTYPES_MAPPING = {
|
||||
cupy.float32: ctypes.c_float,
|
||||
cupy.uint32: ctypes.c_uint
|
||||
}
|
||||
if dtype not in CUPY_TO_CTYPES_MAPPING.keys():
|
||||
raise RuntimeError('Supported types: {}'.format(
|
||||
CUPY_TO_CTYPES_MAPPING.keys()
|
||||
))
|
||||
addr = ctypes.cast(cptr, ctypes.c_void_p).value
|
||||
# pylint: disable=c-extension-no-member,no-member
|
||||
cupy.cuda.runtime.memcpy(
|
||||
mem.__cuda_array_interface__['data'][0], addr,
|
||||
length.value * ctypes.sizeof(ctypes.c_float),
|
||||
cupy.cuda.runtime.memcpyDeviceToDevice)
|
||||
return mem
|
||||
device = cupy.cuda.runtime.pointerGetAttributes(addr).device
|
||||
# The owner field is just used to keep the memory alive with ref count. As
|
||||
# unowned's life time is scoped within this function we don't need that.
|
||||
unownd = UnownedMemory(
|
||||
addr, length.value * ctypes.sizeof(CUPY_TO_CTYPES_MAPPING[dtype]),
|
||||
owner=None)
|
||||
memptr = MemoryPointer(unownd, 0)
|
||||
# pylint: disable=unexpected-keyword-arg
|
||||
mem = cupy.ndarray((length.value, ), dtype=dtype, memptr=memptr)
|
||||
assert mem.device.id == device
|
||||
arr = cupy.array(mem, copy=True)
|
||||
return arr
|
||||
|
||||
|
||||
def ctypes2buffer(cptr, length):
|
||||
|
||||
@@ -101,6 +101,11 @@ def concat(value): # pylint: disable=too-many-return-statements
|
||||
return CUDF_concat(value, axis=0)
|
||||
if lazy_isinstance(value[0], 'cupy.core.core', 'ndarray'):
|
||||
import cupy # pylint: disable=import-error
|
||||
# pylint: disable=c-extension-no-member,no-member
|
||||
d = cupy.cuda.runtime.getDevice()
|
||||
for v in value:
|
||||
d_v = v.device.id
|
||||
assert d_v == d, 'Concatenating arrays on different devices.'
|
||||
return cupy.concatenate(value, axis=0)
|
||||
return dd.multi.concat(list(value), axis=0)
|
||||
|
||||
@@ -631,8 +636,6 @@ def inplace_predict(client, model, data,
|
||||
if is_df:
|
||||
if lazy_isinstance(data, 'cudf.core.dataframe', 'DataFrame'):
|
||||
import cudf # pylint: disable=import-error
|
||||
# There's an error with cudf saying `concat_cudf` got an
|
||||
# expected argument `ignore_index`. So this is not yet working.
|
||||
prediction = cudf.DataFrame({'prediction': prediction},
|
||||
dtype=numpy.float32)
|
||||
else:
|
||||
|
||||
Reference in New Issue
Block a user