Optimize DMatrix build time. (#5877)
Co-authored-by: SHVETS, KIRILL <kirill.shvets@intel.com>
This commit is contained in:
parent
29b7fea572
commit
24f2e6c97e
@ -422,7 +422,7 @@ class DMatrix: # pylint: disable=too-many-instance-attributes
|
|||||||
raise TypeError('Input data can not be a list.')
|
raise TypeError('Input data can not be a list.')
|
||||||
|
|
||||||
self.missing = missing if missing is not None else np.nan
|
self.missing = missing if missing is not None else np.nan
|
||||||
self.nthread = nthread if nthread is not None else 1
|
self.nthread = nthread if nthread is not None else -1
|
||||||
self.silent = silent
|
self.silent = silent
|
||||||
|
|
||||||
# force into void_p, mac need to pass things in as void_p
|
# force into void_p, mac need to pass things in as void_p
|
||||||
|
|||||||
@ -17,6 +17,7 @@
|
|||||||
#include <cstddef>
|
#include <cstddef>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
#include "xgboost/base.h"
|
#include "xgboost/base.h"
|
||||||
|
|
||||||
@ -56,10 +57,10 @@ class ParallelGroupBuilder {
|
|||||||
void InitBudget(std::size_t max_key, int nthread) {
|
void InitBudget(std::size_t max_key, int nthread) {
|
||||||
thread_rptr_.resize(nthread);
|
thread_rptr_.resize(nthread);
|
||||||
for (std::size_t i = 0; i < thread_rptr_.size(); ++i) {
|
for (std::size_t i = 0; i < thread_rptr_.size(); ++i) {
|
||||||
thread_rptr_[i].resize(max_key - std::min(base_row_offset_, max_key));
|
thread_rptr_[i].resize(max_key - std::min(base_row_offset_, max_key), 0);
|
||||||
std::fill(thread_rptr_[i].begin(), thread_rptr_[i].end(), 0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief step 2: add budget to each key
|
* \brief step 2: add budget to each key
|
||||||
* \param key the key
|
* \param key the key
|
||||||
@ -74,6 +75,7 @@ class ParallelGroupBuilder {
|
|||||||
}
|
}
|
||||||
trptr[offset_key] += nelem;
|
trptr[offset_key] += nelem;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*! \brief step 3: initialize the necessary storage */
|
/*! \brief step 3: initialize the necessary storage */
|
||||||
inline void InitStorage() {
|
inline void InitStorage() {
|
||||||
// set rptr to correct size
|
// set rptr to correct size
|
||||||
@ -101,6 +103,7 @@ class ParallelGroupBuilder {
|
|||||||
}
|
}
|
||||||
data_.resize(rptr_.back());
|
data_.resize(rptr_.back());
|
||||||
}
|
}
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief step 4: add data to the allocated space,
|
* \brief step 4: add data to the allocated space,
|
||||||
* the calls to this function should be exactly match previous call to AddBudget
|
* the calls to this function should be exactly match previous call to AddBudget
|
||||||
@ -109,10 +112,10 @@ class ParallelGroupBuilder {
|
|||||||
* \param value The value to be pushed to the group.
|
* \param value The value to be pushed to the group.
|
||||||
* \param threadid the id of thread that calls this function
|
* \param threadid the id of thread that calls this function
|
||||||
*/
|
*/
|
||||||
void Push(std::size_t key, ValueType value, int threadid) {
|
void Push(std::size_t key, ValueType&& value, int threadid) {
|
||||||
size_t offset_key = key - base_row_offset_;
|
size_t offset_key = key - base_row_offset_;
|
||||||
SizeType &rp = thread_rptr_[threadid][offset_key];
|
SizeType &rp = thread_rptr_[threadid][offset_key];
|
||||||
data_[rp++] = value;
|
data_[rp++] = std::move(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|||||||
@ -840,10 +840,11 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread
|
|||||||
// Set number of threads but keep old value so we can reset it after
|
// Set number of threads but keep old value so we can reset it after
|
||||||
const int nthreadmax = omp_get_max_threads();
|
const int nthreadmax = omp_get_max_threads();
|
||||||
if (nthread <= 0) nthread = nthreadmax;
|
if (nthread <= 0) nthread = nthreadmax;
|
||||||
int nthread_original = omp_get_max_threads();
|
const int nthread_original = omp_get_max_threads();
|
||||||
omp_set_num_threads(nthread);
|
omp_set_num_threads(nthread);
|
||||||
auto& offset_vec = offset.HostVector();
|
auto& offset_vec = offset.HostVector();
|
||||||
auto& data_vec = data.HostVector();
|
auto& data_vec = data.HostVector();
|
||||||
|
|
||||||
size_t builder_base_row_offset = this->Size();
|
size_t builder_base_row_offset = this->Size();
|
||||||
common::ParallelGroupBuilder<
|
common::ParallelGroupBuilder<
|
||||||
Entry, std::remove_reference<decltype(offset_vec)>::type::value_type>
|
Entry, std::remove_reference<decltype(offset_vec)>::type::value_type>
|
||||||
@ -858,48 +859,74 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread
|
|||||||
last_line.GetElement(last_line.Size() - 1).row_idx - base_rowid;
|
last_line.GetElement(last_line.Size() - 1).row_idx - base_rowid;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
builder.InitBudget(expected_rows, nthread);
|
|
||||||
uint64_t max_columns = 0;
|
|
||||||
|
|
||||||
// First-pass over the batch counting valid elements
|
|
||||||
size_t batch_size = batch.Size();
|
size_t batch_size = batch.Size();
|
||||||
#pragma omp parallel for schedule(static)
|
const size_t thread_size = batch_size/nthread;
|
||||||
for (omp_ulong i = 0; i < static_cast<omp_ulong>(batch_size);
|
builder.InitBudget(expected_rows+1, nthread);
|
||||||
++i) { // NOLINT(*)
|
uint64_t max_columns = 0;
|
||||||
|
if (batch_size == 0) {
|
||||||
|
omp_set_num_threads(nthread_original);
|
||||||
|
return max_columns;
|
||||||
|
}
|
||||||
|
std::vector<std::vector<uint64_t>> max_columns_vector(nthread);
|
||||||
|
dmlc::OMPException exec;
|
||||||
|
// First-pass over the batch counting valid elements
|
||||||
|
#pragma omp parallel num_threads(nthread)
|
||||||
|
{
|
||||||
|
exec.Run([&]() {
|
||||||
int tid = omp_get_thread_num();
|
int tid = omp_get_thread_num();
|
||||||
|
size_t begin = tid*thread_size;
|
||||||
|
size_t end = tid != (nthread-1) ? (tid+1)*thread_size : batch_size;
|
||||||
|
max_columns_vector[tid].resize(1, 0);
|
||||||
|
uint64_t& max_columns_local = max_columns_vector[tid][0];
|
||||||
|
|
||||||
|
for (size_t i = begin; i < end; ++i) {
|
||||||
auto line = batch.GetLine(i);
|
auto line = batch.GetLine(i);
|
||||||
for (auto j = 0ull; j < line.Size(); j++) {
|
for (auto j = 0ull; j < line.Size(); j++) {
|
||||||
data::COOTuple element = line.GetElement(j);
|
auto element = line.GetElement(j);
|
||||||
max_columns =
|
const size_t key = element.row_idx - base_rowid;
|
||||||
std::max(max_columns, static_cast<uint64_t>(element.column_idx + 1));
|
CHECK_GE(key, builder_base_row_offset);
|
||||||
|
max_columns_local =
|
||||||
|
std::max(max_columns_local, static_cast<uint64_t>(element.column_idx + 1));
|
||||||
|
|
||||||
if (!common::CheckNAN(element.value) && element.value != missing) {
|
if (!common::CheckNAN(element.value) && element.value != missing) {
|
||||||
size_t key = element.row_idx - base_rowid;
|
|
||||||
// Adapter row index is absolute, here we want it relative to
|
// Adapter row index is absolute, here we want it relative to
|
||||||
// current page
|
// current page
|
||||||
CHECK_GE(key, builder_base_row_offset);
|
|
||||||
builder.AddBudget(key, tid);
|
builder.AddBudget(key, tid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
exec.Rethrow();
|
||||||
|
for (const auto & max : max_columns_vector) {
|
||||||
|
max_columns = std::max(max_columns, max[0]);
|
||||||
|
}
|
||||||
|
|
||||||
builder.InitStorage();
|
builder.InitStorage();
|
||||||
|
|
||||||
// Second pass over batch, placing elements in correct position
|
// Second pass over batch, placing elements in correct position
|
||||||
#pragma omp parallel for schedule(static)
|
|
||||||
for (omp_ulong i = 0; i < static_cast<omp_ulong>(batch_size);
|
#pragma omp parallel num_threads(nthread)
|
||||||
++i) { // NOLINT(*)
|
{
|
||||||
|
exec.Run([&]() {
|
||||||
int tid = omp_get_thread_num();
|
int tid = omp_get_thread_num();
|
||||||
|
size_t begin = tid*thread_size;
|
||||||
|
size_t end = tid != (nthread-1) ? (tid+1)*thread_size : batch_size;
|
||||||
|
for (size_t i = begin; i < end; ++i) {
|
||||||
auto line = batch.GetLine(i);
|
auto line = batch.GetLine(i);
|
||||||
for (auto j = 0ull; j < line.Size(); j++) {
|
for (auto j = 0ull; j < line.Size(); j++) {
|
||||||
auto element = line.GetElement(j);
|
auto element = line.GetElement(j);
|
||||||
|
const size_t key = (element.row_idx - base_rowid);
|
||||||
if (!common::CheckNAN(element.value) && element.value != missing) {
|
if (!common::CheckNAN(element.value) && element.value != missing) {
|
||||||
size_t key = element.row_idx -
|
|
||||||
base_rowid; // Adapter row index is absolute, here we want
|
|
||||||
// it relative to current page
|
|
||||||
builder.Push(key, Entry(element.column_idx, element.value), tid);
|
builder.Push(key, Entry(element.column_idx, element.value), tid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
exec.Rethrow();
|
||||||
omp_set_num_threads(nthread_original);
|
omp_set_num_threads(nthread_original);
|
||||||
|
|
||||||
return max_columns;
|
return max_columns;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user