Group builder modified for incremental building (#5098)

This commit is contained in:
Rory Mitchell
2019-12-10 14:33:56 +13:00
committed by GitHub
parent 1cb6bcc382
commit 979f74d51a
5 changed files with 126 additions and 30 deletions

View File

@@ -15,6 +15,7 @@
#define XGBOOST_COMMON_GROUP_DATA_H_
#include <vector>
#include <algorithm>
#include "xgboost/base.h"
@@ -26,30 +27,44 @@ namespace common {
* \tparam SizeType type of the index range holder
*/
template<typename ValueType, typename SizeType = bst_ulong>
struct ParallelGroupBuilder {
class ParallelGroupBuilder {
public:
// parallel group builder of data
ParallelGroupBuilder(std::vector<SizeType> *p_rptr,
std::vector<ValueType> *p_data)
: rptr_(*p_rptr), data_(*p_data), thread_rptr_(tmp_thread_rptr_) {
}
/**
* \brief parallel group builder of data.
*
* \param [in,out] p_rptr Row offsets for CSR matrix.
* \param [in,out] p_data Data vector to populate
* \param base_row_offset (Optional) If the matrix we are building
* is already partially populated, use this to indicate the row index we are
* starting from. This saves considerable amounts of time/memory when
* incrementaly building.
*/
ParallelGroupBuilder(std::vector<SizeType> *p_rptr,
std::vector<ValueType> *p_data,
std::vector< std::vector<SizeType> > *p_thread_rptr)
: rptr_(*p_rptr), data_(*p_data), thread_rptr_(*p_thread_rptr) {
}
size_t base_row_offset = 0)
: rptr_(*p_rptr),
data_(*p_data),
thread_rptr_(tmp_thread_rptr_),
base_row_offset_(base_row_offset) {}
ParallelGroupBuilder(std::vector<SizeType> *p_rptr,
std::vector<ValueType> *p_data,
std::vector<std::vector<SizeType> > *p_thread_rptr,
size_t base_row_offset = 0)
: rptr_(*p_rptr),
data_(*p_data),
thread_rptr_(*p_thread_rptr),
base_row_offset_(base_row_offset) {}
public:
/*!
* \brief step 1: initialize the helper, with hint of number keys
* and thread used in the construction
* \param nkeys number of keys in the matrix, can be smaller than expected
* \param max_key number of keys in the matrix, can be smaller than expected
* \param nthread number of thread that will be used in construction
*/
inline void InitBudget(std::size_t nkeys, int nthread) {
inline void InitBudget(std::size_t max_key, int nthread) {
thread_rptr_.resize(nthread);
for (std::size_t i = 0; i < thread_rptr_.size(); ++i) {
thread_rptr_[i].resize(nkeys);
for (std::size_t i = 0; i < thread_rptr_.size(); ++i) {
thread_rptr_[i].resize(max_key - std::min(base_row_offset_, max_key));
std::fill(thread_rptr_[i].begin(), thread_rptr_[i].end(), 0);
}
}
@@ -61,28 +76,32 @@ struct ParallelGroupBuilder {
*/
inline void AddBudget(std::size_t key, int threadid, SizeType nelem = 1) {
std::vector<SizeType> &trptr = thread_rptr_[threadid];
if (trptr.size() < key + 1) {
trptr.resize(key + 1, 0);
size_t offset_key = key - base_row_offset_;
if (trptr.size() < offset_key + 1) {
trptr.resize(offset_key + 1, 0);
}
trptr[key] += nelem;
trptr[offset_key] += nelem;
}
/*! \brief step 3: initialize the necessary storage */
inline void InitStorage() {
// set rptr to correct size
SizeType rptr_fill_value = rptr_.empty() ? 0 : rptr_.back();
for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
if (rptr_.size() <= thread_rptr_[tid].size()) {
rptr_.resize(thread_rptr_[tid].size() + 1, rptr_fill_value); // key + 1
if (rptr_.size() <= thread_rptr_[tid].size() + base_row_offset_) {
rptr_.resize(thread_rptr_[tid].size() + base_row_offset_ + 1,
rptr_fill_value); // key + 1
}
}
// initialize rptr to be beginning of each segment
std::size_t count = 0;
for (std::size_t i = 0; i + 1 < rptr_.size(); ++i) {
for (std::size_t i = base_row_offset_; i + 1 < rptr_.size(); ++i) {
for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
std::vector<SizeType> &trptr = thread_rptr_[tid];
if (i < trptr.size()) { // i^th row is assigned for this thread
std::size_t thread_count = trptr[i]; // how many entries in this row
trptr[i] = count + rptr_.back();
if (i < trptr.size() +
base_row_offset_) { // i^th row is assigned for this thread
std::size_t thread_count =
trptr[i - base_row_offset_]; // how many entries in this row
trptr[i - base_row_offset_] = count + rptr_.back();
count += thread_count;
}
}
@@ -99,7 +118,8 @@ struct ParallelGroupBuilder {
* \param threadid the id of thread that calls this function
*/
void Push(std::size_t key, ValueType value, int threadid) {
SizeType &rp = thread_rptr_[threadid][key];
size_t offset_key = key - base_row_offset_;
SizeType &rp = thread_rptr_[threadid][offset_key];
data_[rp++] = value;
}
@@ -112,6 +132,8 @@ struct ParallelGroupBuilder {
std::vector<std::vector<SizeType> > &thread_rptr_;
/*! \brief local temp thread ptr, use this if not specified by the constructor */
std::vector<std::vector<SizeType> > tmp_thread_rptr_;
/** \brief Used when rows being pushed into the builder are strictly above some number. */
size_t base_row_offset_;
};
} // namespace common
} // namespace xgboost

View File

@@ -51,10 +51,21 @@ class SimpleDMatrix : public DMatrix {
// Iterate over batches of input data
while (adapter->Next()) {
auto &batch = adapter->Value();
size_t base_row_offset = offset_vec.empty() ? 0 : offset_vec.size() - 1;
common::ParallelGroupBuilder<
Entry, std::remove_reference<decltype(offset_vec)>::type::value_type>
builder(&offset_vec, &data_vec);
builder.InitBudget(0, nthread);
builder(&offset_vec, &data_vec, base_row_offset);
// Estimate expected number of rows by using last element in batch
// This is not required to be exact but prevents unnecessary resizing
size_t expected_rows = 0;
if (batch.Size() > 0) {
auto last_line = batch.GetLine(batch.Size() - 1);
if (last_line.Size() > 0) {
expected_rows = last_line.GetElement(last_line.Size() - 1).row_idx;
}
}
builder.InitBudget(expected_rows, nthread);
// First-pass over the batch counting valid elements
size_t num_lines = batch.Size();