Implement devices to devices reshard. (#3721)

* Force clearing device memory before Reshard.
* Remove calculating row_segments for gpu_hist and gpu_sketch.
* Guard against changing device.
This commit is contained in:
trivialfis
2018-09-28 17:40:23 +12:00
committed by Rory Mitchell
parent 0b7fd74138
commit 5a7f7e7d49
11 changed files with 179 additions and 96 deletions

View File

@@ -110,7 +110,7 @@ inline void CheckComputeCapability() {
std::ostringstream oss;
oss << "CUDA Capability Major/Minor version number: " << prop.major << "."
<< prop.minor << " is insufficient. Need >=3.5";
int failed = prop.major < 3 || prop.major == 3 && prop.minor < 5;
int failed = prop.major < 3 || (prop.major == 3 && prop.minor < 5);
if (failed) LOG(WARNING) << oss.str() << " for device: " << d_idx;
}
}
@@ -129,15 +129,10 @@ DEV_INLINE void AtomicOrByte(unsigned int* __restrict__ buffer, size_t ibyte, un
* than all elements of the array
*/
DEV_INLINE int UpperBound(const float* __restrict__ cuts, int n, float v) {
if (n == 0) {
return 0;
}
if (cuts[n - 1] <= v) {
return n;
}
if (cuts[0] > v) {
return 0;
}
if (n == 0) { return 0; }
if (cuts[n - 1] <= v) { return n; }
if (cuts[0] > v) { return 0; }
int left = 0, right = n - 1;
while (right - left > 1) {
int middle = left + (right - left) / 2;
@@ -145,7 +140,7 @@ DEV_INLINE int UpperBound(const float* __restrict__ cuts, int n, float v) {
right = middle;
} else {
left = middle;
}
}
}
return right;
}
@@ -184,18 +179,6 @@ T1 DivRoundUp(const T1 a, const T2 b) {
return static_cast<T1>(ceil(static_cast<double>(a) / b));
}
inline void RowSegments(size_t n_rows, size_t n_devices, std::vector<size_t>* segments) {
segments->push_back(0);
size_t row_begin = 0;
size_t shard_size = DivRoundUp(n_rows, n_devices);
for (size_t d_idx = 0; d_idx < n_devices; ++d_idx) {
size_t row_end = std::min(row_begin + shard_size, n_rows);
segments->push_back(row_end);
row_begin = row_end;
}
}
template <typename L>
__global__ void LaunchNKernel(size_t begin, size_t end, L lambda) {
for (auto i : GridStrideRange(begin, end)) {
@@ -322,8 +305,8 @@ class DVec {
void copy(IterT begin, IterT end) {
safe_cuda(cudaSetDevice(this->DeviceIdx()));
if (end - begin != Size()) {
throw std::runtime_error(
"Cannot copy assign vector to DVec, sizes are different");
LOG(FATAL) << "Cannot copy assign vector to DVec, sizes are different" <<
" vector::Size(): " << end - begin << " DVec::Size(): " << Size();
}
thrust::copy(begin, end, this->tbegin());
}
@@ -961,6 +944,29 @@ class AllReducer {
}
};
class SaveCudaContext {
private:
int saved_device_;
public:
template <typename Functor>
explicit SaveCudaContext (Functor func) : saved_device_{-1} {
// When compiled with CUDA but running on CPU only device,
// cudaGetDevice will fail.
try {
safe_cuda(cudaGetDevice(&saved_device_));
} catch (thrust::system::system_error & err) {
saved_device_ = -1;
}
func();
}
~SaveCudaContext() {
if (saved_device_ != -1) {
safe_cuda(cudaSetDevice(saved_device_));
}
}
};
/**
* \brief Executes some operation on each element of the input vector, using a
* single controlling thread for each element.
@@ -973,10 +979,13 @@ class AllReducer {
template <typename T, typename FunctionT>
void ExecuteShards(std::vector<T> *shards, FunctionT f) {
SaveCudaContext {
[&](){
#pragma omp parallel for schedule(static, 1) if (shards->size() > 1)
for (int shard = 0; shard < shards->size(); ++shard) {
f(shards->at(shard));
}
for (int shard = 0; shard < shards->size(); ++shard) {
f(shards->at(shard));
}
}};
}
/**
@@ -992,10 +1001,13 @@ void ExecuteShards(std::vector<T> *shards, FunctionT f) {
template <typename T, typename FunctionT>
void ExecuteIndexShards(std::vector<T> *shards, FunctionT f) {
SaveCudaContext {
[&](){
#pragma omp parallel for schedule(static, 1) if (shards->size() > 1)
for (int shard = 0; shard < shards->size(); ++shard) {
f(shard, shards->at(shard));
}
for (int shard = 0; shard < shards->size(); ++shard) {
f(shard, shards->at(shard));
}
}};
}
/**
@@ -1011,13 +1023,16 @@ void ExecuteIndexShards(std::vector<T> *shards, FunctionT f) {
* \return A reduce_t.
*/
template <typename ReduceT,typename T, typename FunctionT>
ReduceT ReduceShards(std::vector<T> *shards, FunctionT f) {
template <typename ReduceT, typename ShardT, typename FunctionT>
ReduceT ReduceShards(std::vector<ShardT> *shards, FunctionT f) {
std::vector<ReduceT> sums(shards->size());
SaveCudaContext {
[&](){
#pragma omp parallel for schedule(static, 1) if (shards->size() > 1)
for (int shard = 0; shard < shards->size(); ++shard) {
sums[shard] = f(shards->at(shard));
}
for (int shard = 0; shard < shards->size(); ++shard) {
sums[shard] = f(shards->at(shard));
}}
};
return std::accumulate(sums.begin(), sums.end(), ReduceT());
}
} // namespace dh