Optimizations of pre-processing for 'hist' tree method (#4310)

* oprimizations for pre-processing

* code cleaning

* code cleaning

* code cleaning after review

* Apply suggestions from code review

Co-Authored-By: SmirnovEgorRu <egor.smirnov@intel.com>
This commit is contained in:
Egor Smirnov
2019-04-17 03:36:19 +03:00
committed by Philip Hyunsu Cho
parent 207f058711
commit 711397d645
6 changed files with 299 additions and 48 deletions

View File

@@ -11,6 +11,7 @@
#include "./column_matrix.h"
#include "./hist_util.h"
#include "./quantile.h"
#include "./../tree/updater_quantile_hist.h"
#if defined(XGBOOST_MM_PREFETCH_PRESENT)
#include <xmmintrin.h>
@@ -49,7 +50,7 @@ void HistCutMatrix::Init(DMatrix* p_fmat, uint32_t max_num_bins) {
constexpr int kFactor = 8;
std::vector<WXQSketch> sketchs;
const int nthread = omp_get_max_threads();
const size_t nthread = omp_get_max_threads();
unsigned const nstep =
static_cast<unsigned>((info.num_col_ + nthread - 1) / nthread);
@@ -67,34 +68,85 @@ void HistCutMatrix::Init(DMatrix* p_fmat, uint32_t max_num_bins) {
// Use group index for weights?
bool const use_group_ind = num_groups != 0 && weights.size() != info.num_row_;
for (const auto &batch : p_fmat->GetRowBatches()) {
size_t group_ind = 0;
if (use_group_ind) {
group_ind = this->SearchGroupIndFromBaseRow(group_ptr, batch.base_rowid);
}
#pragma omp parallel num_threads(nthread) firstprivate(group_ind, use_group_ind)
{
CHECK_EQ(nthread, omp_get_num_threads());
auto tid = static_cast<unsigned>(omp_get_thread_num());
unsigned begin = std::min(nstep * tid, ncol);
unsigned end = std::min(nstep * (tid + 1), ncol);
if (use_group_ind) {
for (const auto &batch : p_fmat->GetRowBatches()) {
size_t group_ind = this->SearchGroupIndFromBaseRow(group_ptr, batch.base_rowid);
#pragma omp parallel num_threads(nthread) firstprivate(group_ind, use_group_ind)
{
CHECK_EQ(nthread, omp_get_num_threads());
auto tid = static_cast<unsigned>(omp_get_thread_num());
unsigned begin = std::min(nstep * tid, ncol);
unsigned end = std::min(nstep * (tid + 1), ncol);
// do not iterate if no columns are assigned to the thread
if (begin < end && end <= ncol) {
for (size_t i = 0; i < batch.Size(); ++i) { // NOLINT(*)
size_t const ridx = batch.base_rowid + i;
SparsePage::Inst const inst = batch[i];
if (use_group_ind &&
group_ptr[group_ind] == ridx &&
// maximum equals to weights.size() - 1
group_ind < num_groups - 1) {
// move to next group
group_ind++;
// do not iterate if no columns are assigned to the thread
if (begin < end && end <= ncol) {
for (size_t i = 0; i < batch.Size(); ++i) { // NOLINT(*)
size_t const ridx = batch.base_rowid + i;
SparsePage::Inst const inst = batch[i];
if (group_ptr[group_ind] == ridx &&
// maximum equals to weights.size() - 1
group_ind < num_groups - 1) {
// move to next group
group_ind++;
}
for (auto const& entry : inst) {
if (entry.index >= begin && entry.index < end) {
size_t w_idx = group_ind;
sketchs[entry.index].Push(entry.fvalue, info.GetWeight(w_idx));
}
}
}
for (auto const& entry : inst) {
if (entry.index >= begin && entry.index < end) {
size_t w_idx = use_group_ind ? group_ind : ridx;
sketchs[entry.index].Push(entry.fvalue, info.GetWeight(w_idx));
}
}
}
} else {
for (const auto &batch : p_fmat->GetRowBatches()) {
const size_t size = batch.Size();
const size_t block_size = 512;
const size_t block_size_iter = block_size * nthread;
const size_t n_blocks = size / block_size_iter + !!(size % block_size_iter);
std::vector<std::vector<std::pair<float, float>>> buff(nthread);
for (size_t tid = 0; tid < nthread; ++tid) {
buff[tid].resize(block_size * ncol);
}
std::vector<size_t> sizes(nthread * ncol, 0);
for (size_t iblock = 0; iblock < n_blocks; ++iblock) {
#pragma omp parallel num_threads(nthread)
{
int tid = omp_get_thread_num();
const size_t ibegin = iblock * block_size_iter + tid * block_size;
const size_t iend = std::min(ibegin + block_size, size);
auto* p_sizes = sizes.data() + ncol * tid;
auto* p_buff = buff[tid].data();
for (size_t i = ibegin; i < iend; ++i) {
size_t const ridx = batch.base_rowid + i;
bst_float w = info.GetWeight(ridx);
SparsePage::Inst const inst = batch[i];
for (auto const& entry : inst) {
const size_t idx = entry.index;
p_buff[idx * block_size + p_sizes[idx]] = { entry.fvalue, w };
p_sizes[idx]++;
}
}
#pragma omp barrier
#pragma omp for schedule(static)
for (int32_t icol = 0; icol < static_cast<int32_t>(ncol); ++icol) {
for (size_t tid = 0; tid < nthread; ++tid) {
auto* p_sizes = sizes.data() + ncol * tid;
auto* p_buff = buff[tid].data() + icol * block_size;
for (size_t i = 0; i < p_sizes[icol]; ++i) {
sketchs[icol].Push(p_buff[i].first, p_buff[i].second);
}
p_sizes[icol] = 0;
}
}
}
@@ -177,22 +229,66 @@ uint32_t HistCutMatrix::GetBinIdx(const Entry& e) {
void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) {
cut.Init(p_fmat, max_num_bins);
const int nthread = omp_get_max_threads();
const int32_t nthread = omp_get_max_threads();
// const int nthread = 1;
const uint32_t nbins = cut.row_ptr.back();
hit_count.resize(nbins, 0);
hit_count_tloc_.resize(nthread * nbins, 0);
row_ptr.push_back(0);
size_t new_size = 1;
for (const auto &batch : p_fmat->GetRowBatches()) {
const size_t rbegin = row_ptr.size() - 1;
for (size_t i = 0; i < batch.Size(); ++i) {
row_ptr.push_back(batch[i].size() + row_ptr.back());
new_size += batch.Size();
}
row_ptr.resize(new_size);
row_ptr[0] = 0;
size_t rbegin = 0;
size_t prev_sum = 0;
for (const auto &batch : p_fmat->GetRowBatches()) {
MemStackAllocator<size_t, 128> partial_sums(nthread);
size_t* p_part = partial_sums.Get();
size_t block_size = batch.Size() / nthread;
#pragma omp parallel num_threads(nthread)
{
#pragma omp for
for (int32_t tid = 0; tid < nthread; ++tid) {
size_t ibegin = block_size * tid;
size_t iend = (tid == (nthread-1) ? batch.Size() : (block_size * (tid+1)));
size_t sum = 0;
for (size_t i = ibegin; i < iend; ++i) {
sum += batch[i].size();
row_ptr[rbegin + 1 + i] = sum;
}
}
#pragma omp single
{
p_part[0] = prev_sum;
for (int32_t i = 1; i < nthread; ++i) {
p_part[i] = p_part[i - 1] + row_ptr[rbegin + i*block_size];
}
}
#pragma omp for
for (int32_t tid = 0; tid < nthread; ++tid) {
size_t ibegin = block_size * tid;
size_t iend = (tid == (nthread-1) ? batch.Size() : (block_size * (tid+1)));
for (size_t i = ibegin; i < iend; ++i) {
row_ptr[rbegin + 1 + i] += p_part[tid];
}
}
}
index.resize(row_ptr.back());
CHECK_GT(cut.cut.size(), 0U);
CHECK_EQ(cut.row_ptr.back(), cut.cut.size());
auto bsize = static_cast<omp_ulong>(batch.Size());
#pragma omp parallel for num_threads(nthread) schedule(static)
@@ -203,7 +299,6 @@ void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) {
SparsePage::Inst inst = batch[i];
CHECK_EQ(ibegin + inst.size(), iend);
for (bst_uint j = 0; j < inst.size(); ++j) {
uint32_t idx = cut.GetBinIdx(inst[j]);
@@ -215,10 +310,13 @@ void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) {
#pragma omp parallel for num_threads(nthread) schedule(static)
for (bst_omp_uint idx = 0; idx < bst_omp_uint(nbins); ++idx) {
for (int tid = 0; tid < nthread; ++tid) {
for (size_t tid = 0; tid < nthread; ++tid) {
hit_count[idx] += hit_count_tloc_[tid * nbins + idx];
}
}
prev_sum = row_ptr[rbegin + batch.Size()];
rbegin += batch.Size();
}
}