RMM integration plugin (#5873)

* [CI] Add RMM as an optional dependency

* Replace caching allocator with pool allocator from RMM

* Revert "Replace caching allocator with pool allocator from RMM"

This reverts commit e15845d4e72e890c2babe31a988b26503a7d9038.

* Use rmm::mr::get_default_resource()

* Try setting default resource (doesn't work yet)

* Allocate pool_mr in the heap

* Prevent leaking pool_mr handle

* Separate EXPECT_DEATH() in separate test suite suffixed DeathTest

* Turn off death tests for RMM

* Address reviewer's feedback

* Prevent leaking of cuda_mr

* Fix Jenkinsfile syntax

* Remove unnecessary function in Jenkinsfile

* [CI] Install NCCL into RMM container

* Run Python tests

* Try building with RMM, CUDA 10.0

* Do not use RMM for CUDA 10.0 target

* Actually test for test_rmm flag

* Fix TestPythonGPU

* Use CNMeM allocator, since pool allocator doesn't yet support multiGPU

* Use 10.0 container to build RMM-enabled XGBoost

* Revert "Use 10.0 container to build RMM-enabled XGBoost"

This reverts commit 789021fa31112e25b683aef39fff375403060141.

* Fix Jenkinsfile

* [CI] Assign larger /dev/shm to NCCL

* Use 10.2 artifact to run multi-GPU Python tests

* Add CUDA 10.0 -> 11.0 cross-version test; remove CUDA 10.0 target

* Rename Conda env rmm_test -> gpu_test

* Use env var to opt into CNMeM pool for C++ tests

* Use identical CUDA version for RMM builds and tests

* Use Pytest fixtures to enable RMM pool in Python tests

* Move RMM to plugin/CMakeLists.txt; use PLUGIN_RMM

* Use per-device MR; use command arg in gtest

* Set CMake prefix path to use Conda env

* Use 0.15 nightly version of RMM

* Remove unnecessary header

* Fix a unit test when cudf is missing

* Add RMM demos

* Remove print()

* Use HostDeviceVector in GPU predictor

* Simplify pytest setup; use LocalCUDACluster fixture

* Address reviewers' commments

Co-authored-by: Hyunsu Cho <chohyu01@cs.wasshington.edu>
This commit is contained in:
Philip Hyunsu Cho
2020-08-12 01:26:02 -07:00
committed by GitHub
parent c3ea3b7e37
commit 9adb812a0a
26 changed files with 508 additions and 140 deletions

View File

@@ -0,0 +1,45 @@
import sys
import pytest
import logging
sys.path.append("tests/python")
import testing as tm # noqa
def has_rmm():
try:
import rmm
return True
except ImportError:
return False
@pytest.fixture(scope='session', autouse=True)
def setup_rmm_pool(request, pytestconfig):
if pytestconfig.getoption('--use-rmm-pool'):
if not has_rmm():
raise ImportError('The --use-rmm-pool option requires the RMM package')
import rmm
from dask_cuda.utils import get_n_gpus
rmm.reinitialize(pool_allocator=True, initial_pool_size=1024*1024*1024,
devices=list(range(get_n_gpus())))
@pytest.fixture(scope='function')
def local_cuda_cluster(request, pytestconfig):
kwargs = {}
if hasattr(request, 'param'):
kwargs.update(request.param)
if pytestconfig.getoption('--use-rmm-pool'):
if not has_rmm():
raise ImportError('The --use-rmm-pool option requires the RMM package')
import rmm
from dask_cuda.utils import get_n_gpus
rmm.reinitialize()
kwargs['rmm_pool_size'] = '2GB'
if tm.no_dask_cuda()['condition']:
raise ImportError('The local_cuda_cluster fixture requires dask_cuda package')
from dask_cuda import LocalCUDACluster
cluster = LocalCUDACluster(**kwargs)
yield cluster
cluster.close()
def pytest_addoption(parser):
parser.addoption('--use-rmm-pool', action='store_true', default=False, help='Use RMM pool')

View File

@@ -6,7 +6,6 @@ sys.path.append("tests/python")
import testing as tm
import test_demos as td # noqa
@pytest.mark.skipif(**tm.no_cupy())
def test_data_iterator():
script = os.path.join(td.PYTHON_DEMO_DIR, 'data_iterator.py')

View File

@@ -3,7 +3,6 @@ import os
import pytest
import numpy as np
import asyncio
import unittest
import xgboost
import subprocess
from hypothesis import given, strategies, settings, note
@@ -23,7 +22,6 @@ import testing as tm # noqa
try:
import dask.dataframe as dd
from xgboost import dask as dxgb
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
from dask import array as da
import cudf
@@ -151,50 +149,51 @@ def run_gpu_hist(params, num_rounds, dataset, DMatrixT, client):
assert tm.non_increasing(history['train'][dataset.metric])
class TestDistributedGPU(unittest.TestCase):
class TestDistributedGPU:
@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_cudf())
@pytest.mark.skipif(**tm.no_dask_cudf())
@pytest.mark.skipif(**tm.no_dask_cuda())
@pytest.mark.mgpu
def test_dask_dataframe(self):
with LocalCUDACluster() as cluster:
with Client(cluster) as client:
run_with_dask_dataframe(dxgb.DaskDMatrix, client)
run_with_dask_dataframe(dxgb.DaskDeviceQuantileDMatrix, client)
def test_dask_dataframe(self, local_cuda_cluster):
with Client(local_cuda_cluster) as client:
run_with_dask_dataframe(dxgb.DaskDMatrix, client)
run_with_dask_dataframe(dxgb.DaskDeviceQuantileDMatrix, client)
@given(parameter_strategy, strategies.integers(1, 20),
tm.dataset_strategy)
@given(params=parameter_strategy, num_rounds=strategies.integers(1, 20),
dataset=tm.dataset_strategy)
@settings(deadline=duration(seconds=120))
@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_dask_cuda())
@pytest.mark.parametrize('local_cuda_cluster', [{'n_workers': 2}], indirect=['local_cuda_cluster'])
@pytest.mark.mgpu
def test_gpu_hist(self, params, num_rounds, dataset):
with LocalCUDACluster(n_workers=2) as cluster:
with Client(cluster) as client:
run_gpu_hist(params, num_rounds, dataset, dxgb.DaskDMatrix,
client)
run_gpu_hist(params, num_rounds, dataset,
dxgb.DaskDeviceQuantileDMatrix, client)
def test_gpu_hist(self, params, num_rounds, dataset, local_cuda_cluster):
with Client(local_cuda_cluster) as client:
run_gpu_hist(params, num_rounds, dataset, dxgb.DaskDMatrix,
client)
run_gpu_hist(params, num_rounds, dataset,
dxgb.DaskDeviceQuantileDMatrix, client)
@pytest.mark.skipif(**tm.no_cupy())
@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_dask_cuda())
@pytest.mark.mgpu
def test_dask_array(self):
with LocalCUDACluster() as cluster:
with Client(cluster) as client:
run_with_dask_array(dxgb.DaskDMatrix, client)
run_with_dask_array(dxgb.DaskDeviceQuantileDMatrix, client)
def test_dask_array(self, local_cuda_cluster):
with Client(local_cuda_cluster) as client:
run_with_dask_array(dxgb.DaskDMatrix, client)
run_with_dask_array(dxgb.DaskDeviceQuantileDMatrix, client)
@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_dask_cuda())
@pytest.mark.mgpu
def test_empty_dmatrix(self):
with LocalCUDACluster() as cluster:
with Client(cluster) as client:
parameters = {'tree_method': 'gpu_hist',
'debug_synchronize': True}
run_empty_dmatrix_reg(client, parameters)
run_empty_dmatrix_cls(client, parameters)
def test_empty_dmatrix(self, local_cuda_cluster):
with Client(local_cuda_cluster) as client:
parameters = {'tree_method': 'gpu_hist',
'debug_synchronize': True}
run_empty_dmatrix_reg(client, parameters)
run_empty_dmatrix_cls(client, parameters)
def run_quantile(self, name):
def run_quantile(self, name, local_cuda_cluster):
if sys.platform.startswith("win"):
pytest.skip("Skipping dask tests on Windows")
@@ -217,34 +216,33 @@ class TestDistributedGPU(unittest.TestCase):
env[port[0]] = port[1]
return subprocess.run([exe, test], env=env, stdout=subprocess.PIPE)
with LocalCUDACluster() as cluster:
with Client(cluster) as client:
workers = list(dxgb._get_client_workers(client).keys())
rabit_args = client.sync(dxgb._get_rabit_args, workers, client)
futures = client.map(runit,
workers,
pure=False,
workers=workers,
rabit_args=rabit_args)
results = client.gather(futures)
for ret in results:
msg = ret.stdout.decode('utf-8')
assert msg.find('1 test from GPUQuantile') != -1, msg
assert ret.returncode == 0, msg
with Client(local_cuda_cluster) as client:
workers = list(dxgb._get_client_workers(client).keys())
rabit_args = client.sync(dxgb._get_rabit_args, workers, client)
futures = client.map(runit,
workers,
pure=False,
workers=workers,
rabit_args=rabit_args)
results = client.gather(futures)
for ret in results:
msg = ret.stdout.decode('utf-8')
assert msg.find('1 test from GPUQuantile') != -1, msg
assert ret.returncode == 0, msg
@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_dask_cuda())
@pytest.mark.mgpu
@pytest.mark.gtest
def test_quantile_basic(self):
self.run_quantile('AllReduceBasic')
def test_quantile_basic(self, local_cuda_cluster):
self.run_quantile('AllReduceBasic', local_cuda_cluster)
@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_dask_cuda())
@pytest.mark.mgpu
@pytest.mark.gtest
def test_quantile_same_on_all_workers(self):
self.run_quantile('SameOnAllWorkers')
def test_quantile_same_on_all_workers(self, local_cuda_cluster):
self.run_quantile('SameOnAllWorkers', local_cuda_cluster)
async def run_from_dask_array_asyncio(scheduler_address):
@@ -275,11 +273,11 @@ async def run_from_dask_array_asyncio(scheduler_address):
@pytest.mark.skipif(**tm.no_dask())
@pytest.mark.skipif(**tm.no_dask_cuda())
@pytest.mark.mgpu
def test_with_asyncio():
with LocalCUDACluster() as cluster:
with Client(cluster) as client:
address = client.scheduler.address
output = asyncio.run(run_from_dask_array_asyncio(address))
assert isinstance(output['booster'], xgboost.Booster)
assert isinstance(output['history'], dict)
def test_with_asyncio(local_cuda_cluster):
with Client(local_cuda_cluster) as client:
address = client.scheduler.address
output = asyncio.run(run_from_dask_array_asyncio(address))
assert isinstance(output['booster'], xgboost.Booster)
assert isinstance(output['history'], dict)