Smarter choice of histogram construction for distributed gpu_hist (#4519)

* Smarter choice of histogram construction for distributed gpu_hist

* Limit omp team size in ExecuteShards
This commit is contained in:
Rory Mitchell
2019-05-31 14:11:34 +12:00
committed by GitHub
parent dd60fc23e6
commit fbbae3386a
3 changed files with 67 additions and 10 deletions

View File

@@ -7,6 +7,7 @@
#include <thrust/system/cuda/error.h>
#include <thrust/system_error.h>
#include <xgboost/logging.h>
#include <rabit/rabit.h>
#include "common.h"
#include "span.h"
@@ -784,6 +785,7 @@ class AllReducer {
bool initialised_;
size_t allreduce_bytes_; // Keep statistics of the number of bytes communicated
size_t allreduce_calls_; // Keep statistics of the number of reduce calls
std::vector<size_t> host_data; // Used for all reduce on host
#ifdef XGBOOST_USE_NCCL
std::vector<ncclComm_t> comms;
std::vector<cudaStream_t> streams;
@@ -1024,6 +1026,42 @@ class AllReducer {
return id;
}
#endif
/** \brief Perform max all reduce operation on the host. This function first
* reduces over omp threads then over nodes using rabit (which is not thread
* safe) using the master thread. Uses naive reduce algorithm for local
* threads, don't expect this to scale.*/
void HostMaxAllReduce(std::vector<size_t> *p_data) {
auto &data = *p_data;
// Wait in case some other thread is accessing host_data
#pragma omp barrier
// Reset shared buffer
#pragma omp single
{
host_data.resize(data.size());
std::fill(host_data.begin(), host_data.end(), size_t(0));
}
// Threads update shared array
for (auto i = 0ull; i < data.size(); i++) {
#pragma omp critical
{ host_data[i] = std::max(host_data[i], data[i]); }
}
// Wait until all threads are finished
#pragma omp barrier
// One thread performs all reduce across distributed nodes
#pragma omp master
{
rabit::Allreduce<rabit::op::Max, size_t>(host_data.data(),
host_data.size());
}
#pragma omp barrier
// Threads can now read back all reduced values
for (auto i = 0ull; i < data.size(); i++) {
data[i] = host_data[i];
}
}
};
/**
@@ -1044,7 +1082,7 @@ void ExecuteIndexShards(std::vector<T> *shards, FunctionT f) {
bool dynamic = omp_get_dynamic();
omp_set_dynamic(false);
const long shards_size = static_cast<long>(shards->size());
#pragma omp parallel for schedule(static, 1) if (shards_size > 1)
#pragma omp parallel for schedule(static, 1) if (shards_size > 1) num_threads(shards_size)
for (long shard = 0; shard < shards_size; ++shard) {
f(shard, shards->at(shard));
}