Memory consumption fix for row-major adapters (#6779)
Co-authored-by: Kirill Shvets <kirill.shvets@intel.com> Co-authored-by: fis <jm.yuan@outlook.com>
This commit is contained in:
parent
744c46995c
commit
8825670c9c
@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright 2014-2020 by Contributors
|
||||
* Copyright 2014-2021 by Contributors
|
||||
* \file group_data.h
|
||||
* \brief this file defines utils to group data by integer keys
|
||||
* Input: given input sequence (key,value), (k1,v1), (k2,v2)
|
||||
@ -27,8 +27,9 @@ namespace common {
|
||||
* \brief multi-thread version of group builder
|
||||
* \tparam ValueType type of entries in the sparse matrix
|
||||
* \tparam SizeType type of the index range holder
|
||||
* \tparam is_row_major bool value helps to reduce memory for row major
|
||||
*/
|
||||
template<typename ValueType, typename SizeType = bst_ulong>
|
||||
template<typename ValueType, typename SizeType = bst_ulong, bool is_row_major = false>
|
||||
class ParallelGroupBuilder {
|
||||
public:
|
||||
/**
|
||||
@ -51,14 +52,21 @@ class ParallelGroupBuilder {
|
||||
/*!
|
||||
* \brief step 1: initialize the helper, with hint of number keys
|
||||
* and thread used in the construction
|
||||
* \param max_key number of keys in the matrix, can be smaller than expected
|
||||
* \param max_key number of keys in the matrix, can be smaller than expected,
|
||||
* for row major adapter max_key is equal to batch size
|
||||
* \param nthread number of thread that will be used in construction
|
||||
*/
|
||||
void InitBudget(std::size_t max_key, int nthread) {
|
||||
thread_rptr_.resize(nthread);
|
||||
for (std::size_t i = 0; i < thread_rptr_.size(); ++i) {
|
||||
thread_rptr_[i].resize(max_key - std::min(base_row_offset_, max_key), 0);
|
||||
const size_t full_size = is_row_major ? max_key : max_key - std::min(base_row_offset_, max_key);
|
||||
thread_displacement_ = is_row_major ? full_size / nthread : 0;
|
||||
for (std::size_t i = 0; i < thread_rptr_.size() - 1; ++i) {
|
||||
const size_t thread_size = is_row_major ? thread_displacement_ : full_size;
|
||||
thread_rptr_[i].resize(thread_size, 0);
|
||||
}
|
||||
const size_t last_thread_size = is_row_major ? (full_size - (nthread - 1)*thread_displacement_)
|
||||
: full_size;
|
||||
thread_rptr_[nthread - 1].resize(last_thread_size, 0);
|
||||
}
|
||||
|
||||
/*!
|
||||
@ -69,7 +77,8 @@ class ParallelGroupBuilder {
|
||||
*/
|
||||
void AddBudget(std::size_t key, int threadid, SizeType nelem = 1) {
|
||||
std::vector<SizeType> &trptr = thread_rptr_[threadid];
|
||||
size_t offset_key = key - base_row_offset_;
|
||||
size_t offset_key = is_row_major ? (key - base_row_offset_ - threadid*thread_displacement_)
|
||||
: (key - base_row_offset_);
|
||||
if (trptr.size() < offset_key + 1) {
|
||||
trptr.resize(offset_key + 1, 0);
|
||||
}
|
||||
@ -78,30 +87,55 @@ class ParallelGroupBuilder {
|
||||
|
||||
/*! \brief step 3: initialize the necessary storage */
|
||||
inline void InitStorage() {
|
||||
// set rptr to correct size
|
||||
SizeType rptr_fill_value = rptr_.empty() ? 0 : rptr_.back();
|
||||
for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
|
||||
if (rptr_.size() <= thread_rptr_[tid].size() + base_row_offset_) {
|
||||
rptr_.resize(thread_rptr_[tid].size() + base_row_offset_ + 1,
|
||||
rptr_fill_value); // key + 1
|
||||
if (is_row_major) {
|
||||
size_t expected_rows = 0;
|
||||
for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
|
||||
expected_rows += thread_rptr_[tid].size();
|
||||
}
|
||||
}
|
||||
// initialize rptr to be beginning of each segment
|
||||
std::size_t count = 0;
|
||||
for (std::size_t i = base_row_offset_; i + 1 < rptr_.size(); ++i) {
|
||||
// initialize rptr to be beginning of each segment
|
||||
SizeType rptr_fill_value = rptr_.empty() ? 0 : rptr_.back();
|
||||
rptr_.resize(expected_rows + base_row_offset_ + 1, rptr_fill_value);
|
||||
|
||||
std::size_t count = 0;
|
||||
size_t offset_idx = base_row_offset_ + 1;
|
||||
for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
|
||||
std::vector<SizeType> &trptr = thread_rptr_[tid];
|
||||
if (i < trptr.size() +
|
||||
base_row_offset_) { // i^th row is assigned for this thread
|
||||
std::size_t thread_count =
|
||||
trptr[i - base_row_offset_]; // how many entries in this row
|
||||
trptr[i - base_row_offset_] = count + rptr_.back();
|
||||
for (std::size_t i = 0; i < trptr.size(); ++i) {
|
||||
std::size_t thread_count = trptr[i]; // how many entries in this row
|
||||
trptr[i] = count + rptr_fill_value;
|
||||
count += thread_count;
|
||||
if (offset_idx < rptr_.size()) {
|
||||
rptr_[offset_idx++] += count;
|
||||
}
|
||||
}
|
||||
}
|
||||
rptr_[i + 1] += count; // pointer accumulated from all thread
|
||||
data_.resize(rptr_.back()); // usage of empty allocator can help to improve performance
|
||||
} else {
|
||||
// set rptr to correct size
|
||||
SizeType rptr_fill_value = rptr_.empty() ? 0 : rptr_.back();
|
||||
for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
|
||||
if (rptr_.size() <= thread_rptr_[tid].size() + base_row_offset_) {
|
||||
rptr_.resize(thread_rptr_[tid].size() + base_row_offset_ + 1,
|
||||
rptr_fill_value); // key + 1
|
||||
}
|
||||
}
|
||||
// initialize rptr to be beginning of each segment
|
||||
std::size_t count = 0;
|
||||
for (std::size_t i = base_row_offset_; i + 1 < rptr_.size(); ++i) {
|
||||
for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
|
||||
std::vector<SizeType> &trptr = thread_rptr_[tid];
|
||||
if (i < trptr.size() +
|
||||
base_row_offset_) { // i^th row is assigned for this thread
|
||||
std::size_t thread_count =
|
||||
trptr[i - base_row_offset_]; // how many entries in this row
|
||||
trptr[i - base_row_offset_] = count + rptr_.back();
|
||||
count += thread_count;
|
||||
}
|
||||
}
|
||||
rptr_[i + 1] += count; // pointer accumulated from all thread
|
||||
}
|
||||
data_.resize(rptr_.back());
|
||||
}
|
||||
data_.resize(rptr_.back());
|
||||
}
|
||||
|
||||
/*!
|
||||
@ -113,7 +147,8 @@ class ParallelGroupBuilder {
|
||||
* \param threadid the id of thread that calls this function
|
||||
*/
|
||||
void Push(std::size_t key, ValueType&& value, int threadid) {
|
||||
size_t offset_key = key - base_row_offset_;
|
||||
size_t offset_key = is_row_major ? (key - base_row_offset_ - threadid * thread_displacement_)
|
||||
: (key - base_row_offset_);
|
||||
SizeType &rp = thread_rptr_[threadid][offset_key];
|
||||
data_[rp++] = std::move(value);
|
||||
}
|
||||
@ -127,6 +162,8 @@ class ParallelGroupBuilder {
|
||||
std::vector<std::vector<SizeType> > thread_rptr_;
|
||||
/** \brief Used when rows being pushed into the builder are strictly above some number. */
|
||||
size_t base_row_offset_;
|
||||
/** \brief Used for row major adapters to handle reduced thread local memory allocation */
|
||||
size_t thread_displacement_;
|
||||
};
|
||||
} // namespace common
|
||||
} // namespace xgboost
|
||||
|
||||
@ -148,6 +148,7 @@ class CSRAdapterBatch : public detail::NoMetaInfo {
|
||||
&values_[begin_offset]);
|
||||
}
|
||||
size_t Size() const { return num_rows_; }
|
||||
static constexpr bool kIsRowMajor = true;
|
||||
|
||||
private:
|
||||
const size_t* row_ptr_;
|
||||
@ -204,6 +205,7 @@ class DenseAdapterBatch : public detail::NoMetaInfo {
|
||||
const Line GetLine(size_t idx) const {
|
||||
return Line(values_ + idx * num_features_, num_features_, idx);
|
||||
}
|
||||
static constexpr bool kIsRowMajor = true;
|
||||
|
||||
private:
|
||||
const float* values_;
|
||||
@ -320,6 +322,7 @@ class CSRArrayAdapterBatch : public detail::NoMetaInfo {
|
||||
size = size == 0 ? 0 : size - 1;
|
||||
return size;
|
||||
}
|
||||
static constexpr bool kIsRowMajor = true;
|
||||
|
||||
Line const GetLine(size_t idx) const {
|
||||
auto begin_offset = indptr_.GetElement<size_t>(idx, 0);
|
||||
@ -405,6 +408,7 @@ class CSCAdapterBatch : public detail::NoMetaInfo {
|
||||
return Line(idx, end_offset - begin_offset, &row_idx_[begin_offset],
|
||||
&values_[begin_offset]);
|
||||
}
|
||||
static constexpr bool kIsRowMajor = false;
|
||||
|
||||
private:
|
||||
const size_t* col_ptr_;
|
||||
@ -537,6 +541,7 @@ class DataTableAdapterBatch : public detail::NoMetaInfo {
|
||||
const Line GetLine(size_t idx) const {
|
||||
return Line(DTGetType(feature_stypes_[idx]), num_rows_, idx, data_[idx]);
|
||||
}
|
||||
static constexpr bool kIsRowMajor = false;
|
||||
|
||||
private:
|
||||
void** data_;
|
||||
@ -600,6 +605,7 @@ class FileAdapterBatch {
|
||||
const float* BaseMargin() const { return nullptr; }
|
||||
|
||||
size_t Size() const { return block_->size; }
|
||||
static constexpr bool kIsRowMajor = true;
|
||||
|
||||
private:
|
||||
const dmlc::RowBlock<uint32_t>* block_;
|
||||
|
||||
@ -872,14 +872,20 @@ void SparsePage::Push(const SparsePage &batch) {
|
||||
|
||||
template <typename AdapterBatchT>
|
||||
uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread) {
|
||||
constexpr bool kIsRowMajor = AdapterBatchT::kIsRowMajor;
|
||||
// Allow threading only for row-major case as column-major requires O(nthread*batch_size) memory
|
||||
nthread = kIsRowMajor ? nthread : 1;
|
||||
// Set number of threads but keep old value so we can reset it after
|
||||
int nthread_original = common::OmpSetNumThreadsWithoutHT(&nthread);
|
||||
if (!kIsRowMajor) {
|
||||
CHECK_EQ(nthread, 1);
|
||||
}
|
||||
auto& offset_vec = offset.HostVector();
|
||||
auto& data_vec = data.HostVector();
|
||||
|
||||
size_t builder_base_row_offset = this->Size();
|
||||
common::ParallelGroupBuilder<
|
||||
Entry, std::remove_reference<decltype(offset_vec)>::type::value_type>
|
||||
Entry, std::remove_reference<decltype(offset_vec)>::type::value_type, kIsRowMajor>
|
||||
builder(&offset_vec, &data_vec, builder_base_row_offset);
|
||||
// Estimate expected number of rows by using last element in batch
|
||||
// This is not required to be exact but prevents unnecessary resizing
|
||||
@ -892,13 +898,15 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread
|
||||
}
|
||||
}
|
||||
size_t batch_size = batch.Size();
|
||||
const size_t thread_size = batch_size / nthread;
|
||||
builder.InitBudget(expected_rows+1, nthread);
|
||||
expected_rows = kIsRowMajor ? batch_size : expected_rows;
|
||||
uint64_t max_columns = 0;
|
||||
if (batch_size == 0) {
|
||||
omp_set_num_threads(nthread_original);
|
||||
return max_columns;
|
||||
}
|
||||
const size_t thread_size = batch_size / nthread;
|
||||
|
||||
builder.InitBudget(expected_rows, nthread);
|
||||
std::vector<std::vector<uint64_t>> max_columns_vector(nthread);
|
||||
dmlc::OMPException exec;
|
||||
std::atomic<bool> valid{true};
|
||||
|
||||
@ -91,9 +91,6 @@ BatchSet<EllpackPage> SimpleDMatrix::GetEllpackBatches(const BatchParam& param)
|
||||
|
||||
template <typename AdapterT>
|
||||
SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
|
||||
// Set number of threads but keep old value so we can reset it after
|
||||
int nthread_original = common::OmpSetNumThreadsWithoutHT(&nthread);
|
||||
|
||||
std::vector<uint64_t> qids;
|
||||
uint64_t default_max = std::numeric_limits<uint64_t>::max();
|
||||
uint64_t last_group_id = default_max;
|
||||
@ -184,7 +181,6 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
|
||||
info_.num_row_ = adapter->NumRows();
|
||||
}
|
||||
info_.num_nonzero_ = data_vec.size();
|
||||
omp_set_num_threads(nthread_original);
|
||||
}
|
||||
|
||||
SimpleDMatrix::SimpleDMatrix(dmlc::Stream* in_stream) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user