Thread safe, inplace prediction. (#5389)

Normal prediction with DMatrix is now thread safe with locks.  Added inplace prediction is lock free thread safe.

When data is on device (cupy, cudf), the returned data is also on device.

* Implementation for numpy, csr, cudf and cupy.

* Implementation for dask.

* Remove sync in simple dmatrix.
This commit is contained in:
Jiaming Yuan
2020-03-30 15:35:28 +08:00
committed by GitHub
parent 7f980e9f83
commit 6601a641d7
25 changed files with 1217 additions and 167 deletions

View File

@@ -86,7 +86,7 @@ class CMakeExtension(Extension): # pylint: disable=too-few-public-methods
super().__init__(name=name, sources=[])
class BuildExt(build_ext.build_ext):
class BuildExt(build_ext.build_ext): # pylint: disable=too-many-ancestors
'''Custom build_ext command using CMake.'''
logger = logging.getLogger('XGBoost build_ext')

View File

@@ -207,6 +207,19 @@ def ctypes2numpy(cptr, length, dtype):
return res
def ctypes2cupy(cptr, length, dtype):
"""Convert a ctypes pointer array to a cupy array."""
import cupy # pylint: disable=import-error
mem = cupy.zeros(length.value, dtype=dtype, order='C')
addr = ctypes.cast(cptr, ctypes.c_void_p).value
# pylint: disable=c-extension-no-member,no-member
cupy.cuda.runtime.memcpy(
mem.__cuda_array_interface__['data'][0], addr,
length.value * ctypes.sizeof(ctypes.c_float),
cupy.cuda.runtime.memcpyDeviceToDevice)
return mem
def ctypes2buffer(cptr, length):
"""Convert ctypes pointer to buffer type."""
if not isinstance(cptr, ctypes.POINTER(ctypes.c_char)):
@@ -474,6 +487,7 @@ class DMatrix(object):
data, feature_names, feature_types = _convert_dataframes(
data, feature_names, feature_types
)
missing = np.nan if missing is None else missing
if isinstance(data, (STRING_TYPES, os_PathLike)):
handle = ctypes.c_void_p()
@@ -1428,12 +1442,17 @@ class Booster(object):
training=False):
"""Predict with data.
.. note:: This function is not thread safe.
.. note:: This function is not thread safe except for ``gbtree``
booster.
For each booster object, predict can only be called from one thread.
If you want to run prediction using multiple thread, call
``bst.copy()`` to make copies of model object and then call
``predict()``.
For ``gbtree`` booster, the thread safety is guaranteed by locks.
For lock free prediction use ``inplace_predict`` instead. Also, the
safety does not hold when used in conjunction with other methods.
When using booster other than ``gbtree``, predict can only be called
from one thread. If you want to run prediction using multiple
thread, call ``bst.copy()`` to make copies of model object and then
call ``predict()``.
Parameters
----------
@@ -1547,6 +1566,146 @@ class Booster(object):
preds = preds.reshape(nrow, chunk_size)
return preds
def inplace_predict(self, data, iteration_range=(0, 0),
predict_type='value', missing=np.nan):
'''Run prediction in-place, Unlike ``predict`` method, inplace prediction does
not cache the prediction result.
Calling only ``inplace_predict`` in multiple threads is safe and lock
free. But the safety does not hold when used in conjunction with other
methods. E.g. you can't train the booster in one thread and perform
prediction in the other.
.. code-block:: python
booster.set_param({'predictor': 'gpu_predictor'})
booster.inplace_predict(cupy_array)
booster.set_param({'predictor': 'cpu_predictor})
booster.inplace_predict(numpy_array)
Parameters
----------
data : numpy.ndarray/scipy.sparse.csr_matrix/cupy.ndarray/
cudf.DataFrame/pd.DataFrame
The input data, must not be a view for numpy array. Set
``predictor`` to ``gpu_predictor`` for running prediction on CuPy
array or CuDF DataFrame.
iteration_range : tuple
Specifies which layer of trees are used in prediction. For
example, if a random forest is trained with 100 rounds. Specifying
`iteration_range=(10, 20)`, then only the forests built during [10,
20) (open set) rounds are used in this prediction.
predict_type : str
* `value` Output model prediction values.
* `margin` Output the raw untransformed margin value.
missing : float
Value in the input data which needs to be present as a missing
value.
Returns
-------
prediction : numpy.ndarray/cupy.ndarray
The prediction result. When input data is on GPU, prediction
result is stored in a cupy array.
'''
def reshape_output(predt, rows):
'''Reshape for multi-output prediction.'''
if predt.size != rows and predt.size % rows == 0:
cols = int(predt.size / rows)
predt = predt.reshape(rows, cols)
return predt
return predt
length = c_bst_ulong()
preds = ctypes.POINTER(ctypes.c_float)()
iteration_range = (ctypes.c_uint(iteration_range[0]),
ctypes.c_uint(iteration_range[1]))
# once caching is supported, we can pass id(data) as cache id.
if isinstance(data, DataFrame):
data = data.values
if isinstance(data, np.ndarray):
assert data.flags.c_contiguous
arr = np.array(data.reshape(data.size), copy=False,
dtype=np.float32)
_check_call(_LIB.XGBoosterPredictFromDense(
self.handle,
arr.ctypes.data_as(ctypes.POINTER(ctypes.c_float)),
c_bst_ulong(data.shape[0]),
c_bst_ulong(data.shape[1]),
ctypes.c_float(missing),
iteration_range[0],
iteration_range[1],
c_str(predict_type),
c_bst_ulong(0),
ctypes.byref(length),
ctypes.byref(preds)
))
preds = ctypes2numpy(preds, length.value, np.float32)
rows = data.shape[0]
return reshape_output(preds, rows)
if isinstance(data, scipy.sparse.csr_matrix):
csr = data
_check_call(_LIB.XGBoosterPredictFromCSR(
self.handle,
c_array(ctypes.c_size_t, csr.indptr),
c_array(ctypes.c_uint, csr.indices),
c_array(ctypes.c_float, csr.data),
ctypes.c_size_t(len(csr.indptr)),
ctypes.c_size_t(len(csr.data)),
ctypes.c_size_t(csr.shape[1]),
ctypes.c_float(missing),
iteration_range[0],
iteration_range[1],
c_str(predict_type),
c_bst_ulong(0),
ctypes.byref(length),
ctypes.byref(preds)))
preds = ctypes2numpy(preds, length.value, np.float32)
rows = data.shape[0]
return reshape_output(preds, rows)
if lazy_isinstance(data, 'cupy.core.core', 'ndarray'):
assert data.flags.c_contiguous
interface = data.__cuda_array_interface__
if 'mask' in interface:
interface['mask'] = interface['mask'].__cuda_array_interface__
interface_str = bytes(json.dumps(interface, indent=2), 'utf-8')
_check_call(_LIB.XGBoosterPredictFromArrayInterface(
self.handle,
interface_str,
ctypes.c_float(missing),
iteration_range[0],
iteration_range[1],
c_str(predict_type),
c_bst_ulong(0),
ctypes.byref(length),
ctypes.byref(preds)))
mem = ctypes2cupy(preds, length, np.float32)
rows = data.shape[0]
return reshape_output(mem, rows)
if lazy_isinstance(data, 'cudf.core.dataframe', 'DataFrame'):
interfaces_str = _cudf_array_interfaces(data)
_check_call(_LIB.XGBoosterPredictFromArrayInterfaceColumns(
self.handle,
interfaces_str,
ctypes.c_float(missing),
iteration_range[0],
iteration_range[1],
c_str(predict_type),
c_bst_ulong(0),
ctypes.byref(length),
ctypes.byref(preds)))
mem = ctypes2cupy(preds, length, np.float32)
rows = data.shape[0]
predt = reshape_output(mem, rows)
return predt
raise TypeError('Data type:' + str(type(data)) +
' not supported by inplace prediction.')
def save_model(self, fname):
"""Save the model to a file.

View File

@@ -26,6 +26,7 @@ from .compat import da, dd, delayed, get_client
from .compat import sparse, scipy_sparse
from .compat import PANDAS_INSTALLED, DataFrame, Series, pandas_concat
from .compat import CUDF_INSTALLED, CUDF_DataFrame, CUDF_Series, CUDF_concat
from .compat import lazy_isinstance
from .core import DMatrix, Booster, _expect
from .training import train as worker_train
@@ -86,7 +87,7 @@ class RabitContext:
LOGGER.debug('--------------- rabit say bye ------------------')
def concat(value):
def concat(value): # pylint: disable=too-many-return-statements
'''To be replaced with dask builtin.'''
if isinstance(value[0], numpy.ndarray):
return numpy.concatenate(value, axis=0)
@@ -98,6 +99,9 @@ def concat(value):
return pandas_concat(value, axis=0)
if CUDF_INSTALLED and isinstance(value[0], (CUDF_DataFrame, CUDF_Series)):
return CUDF_concat(value, axis=0)
if lazy_isinstance(value[0], 'cupy.core.core', 'ndarray'):
import cupy # pylint: disable=import-error
return cupy.concatenate(value, axis=0)
return dd.multi.concat(list(value), axis=0)
@@ -370,8 +374,9 @@ def train(client, params, dtrain, *args, evals=(), **kwargs):
Specify the dask client used for training. Use default client
returned from dask if it's set to None.
\\*\\*kwargs:
Other parameters are the same as `xgboost.train` except for `evals_result`,
which is returned as part of function return value instead of argument.
Other parameters are the same as `xgboost.train` except for
`evals_result`, which is returned as part of function return value
instead of argument.
Returns
-------
@@ -500,11 +505,10 @@ def predict(client, model, data, *args, missing=numpy.nan):
).result()
return predictions
if isinstance(data, dd.DataFrame):
import dask
predictions = client.submit(
dd.map_partitions,
mapped_predict, data, True,
meta=dask.dataframe.utils.make_meta({'prediction': 'f4'})
meta=dd.utils.make_meta({'prediction': 'f4'})
).result()
return predictions.iloc[:, 0]
@@ -572,6 +576,79 @@ def predict(client, model, data, *args, missing=numpy.nan):
return predictions
def inplace_predict(client, model, data,
iteration_range=(0, 0),
predict_type='value',
missing=numpy.nan):
'''Inplace prediction.
Parameters
----------
client: dask.distributed.Client
Specify the dask client used for training. Use default client
returned from dask if it's set to None.
model: Booster/dict
The trained model.
iteration_range: tuple
Specify the range of trees used for prediction.
predict_type: str
* 'value': Normal prediction result.
* 'margin': Output the raw untransformed margin value.
missing: float
Value in the input data which needs to be present as a missing
value. If None, defaults to np.nan.
Returns
-------
prediction: dask.array.Array
'''
_assert_dask_support()
client = _xgb_get_client(client)
if isinstance(model, Booster):
booster = model
elif isinstance(model, dict):
booster = model['booster']
else:
raise TypeError(_expect([Booster, dict], type(model)))
if not isinstance(data, (da.Array, dd.DataFrame)):
raise TypeError(_expect([da.Array, dd.DataFrame], type(data)))
def mapped_predict(data, is_df):
worker = distributed_get_worker()
booster.set_param({'nthread': worker.nthreads})
prediction = booster.inplace_predict(
data,
iteration_range=iteration_range,
predict_type=predict_type,
missing=missing)
if is_df:
if lazy_isinstance(data, 'cudf.core.dataframe', 'DataFrame'):
import cudf # pylint: disable=import-error
# There's an error with cudf saying `concat_cudf` got an
# expected argument `ignore_index`. So this is not yet working.
prediction = cudf.DataFrame({'prediction': prediction},
dtype=numpy.float32)
else:
# If it's from pandas, the partition is a numpy array
prediction = DataFrame(prediction, columns=['prediction'],
dtype=numpy.float32)
return prediction
if isinstance(data, da.Array):
predictions = client.submit(
da.map_blocks,
mapped_predict, data, False, drop_axis=1,
dtype=numpy.float32
).result()
return predictions
if isinstance(data, dd.DataFrame):
predictions = client.submit(
dd.map_partitions,
mapped_predict, data, True,
meta=dd.utils.make_meta({'prediction': 'f4'})
).result()
return predictions.iloc[:, 0]
def _evaluation_matrices(client, validation_set, sample_weights, missing):
'''
Parameters