From 8825670c9c10c3c72c4f8e499c6ce43b1f51dcd3 Mon Sep 17 00:00:00 2001 From: ShvetsKS <33296480+ShvetsKS@users.noreply.github.com> Date: Fri, 26 Mar 2021 03:44:30 +0300 Subject: [PATCH] Memory consumption fix for row-major adapters (#6779) Co-authored-by: Kirill Shvets Co-authored-by: fis --- src/common/group_data.h | 85 +++++++++++++++++++++++++++----------- src/data/adapter.h | 6 +++ src/data/data.cc | 14 +++++-- src/data/simple_dmatrix.cc | 4 -- 4 files changed, 78 insertions(+), 31 deletions(-) diff --git a/src/common/group_data.h b/src/common/group_data.h index 476b4925b..3a51c6547 100644 --- a/src/common/group_data.h +++ b/src/common/group_data.h @@ -1,5 +1,5 @@ /*! - * Copyright 2014-2020 by Contributors + * Copyright 2014-2021 by Contributors * \file group_data.h * \brief this file defines utils to group data by integer keys * Input: given input sequence (key,value), (k1,v1), (k2,v2) @@ -27,8 +27,9 @@ namespace common { * \brief multi-thread version of group builder * \tparam ValueType type of entries in the sparse matrix * \tparam SizeType type of the index range holder + * \tparam is_row_major bool value helps to reduce memory for row major */ -template +template class ParallelGroupBuilder { public: /** @@ -51,14 +52,21 @@ class ParallelGroupBuilder { /*! * \brief step 1: initialize the helper, with hint of number keys * and thread used in the construction - * \param max_key number of keys in the matrix, can be smaller than expected + * \param max_key number of keys in the matrix, can be smaller than expected, + * for row major adapter max_key is equal to batch size * \param nthread number of thread that will be used in construction */ void InitBudget(std::size_t max_key, int nthread) { thread_rptr_.resize(nthread); - for (std::size_t i = 0; i < thread_rptr_.size(); ++i) { - thread_rptr_[i].resize(max_key - std::min(base_row_offset_, max_key), 0); + const size_t full_size = is_row_major ? max_key : max_key - std::min(base_row_offset_, max_key); + thread_displacement_ = is_row_major ? full_size / nthread : 0; + for (std::size_t i = 0; i < thread_rptr_.size() - 1; ++i) { + const size_t thread_size = is_row_major ? thread_displacement_ : full_size; + thread_rptr_[i].resize(thread_size, 0); } + const size_t last_thread_size = is_row_major ? (full_size - (nthread - 1)*thread_displacement_) + : full_size; + thread_rptr_[nthread - 1].resize(last_thread_size, 0); } /*! @@ -69,7 +77,8 @@ class ParallelGroupBuilder { */ void AddBudget(std::size_t key, int threadid, SizeType nelem = 1) { std::vector &trptr = thread_rptr_[threadid]; - size_t offset_key = key - base_row_offset_; + size_t offset_key = is_row_major ? (key - base_row_offset_ - threadid*thread_displacement_) + : (key - base_row_offset_); if (trptr.size() < offset_key + 1) { trptr.resize(offset_key + 1, 0); } @@ -78,30 +87,55 @@ class ParallelGroupBuilder { /*! \brief step 3: initialize the necessary storage */ inline void InitStorage() { - // set rptr to correct size - SizeType rptr_fill_value = rptr_.empty() ? 0 : rptr_.back(); - for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) { - if (rptr_.size() <= thread_rptr_[tid].size() + base_row_offset_) { - rptr_.resize(thread_rptr_[tid].size() + base_row_offset_ + 1, - rptr_fill_value); // key + 1 + if (is_row_major) { + size_t expected_rows = 0; + for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) { + expected_rows += thread_rptr_[tid].size(); } - } - // initialize rptr to be beginning of each segment - std::size_t count = 0; - for (std::size_t i = base_row_offset_; i + 1 < rptr_.size(); ++i) { + // initialize rptr to be beginning of each segment + SizeType rptr_fill_value = rptr_.empty() ? 0 : rptr_.back(); + rptr_.resize(expected_rows + base_row_offset_ + 1, rptr_fill_value); + + std::size_t count = 0; + size_t offset_idx = base_row_offset_ + 1; for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) { std::vector &trptr = thread_rptr_[tid]; - if (i < trptr.size() + - base_row_offset_) { // i^th row is assigned for this thread - std::size_t thread_count = - trptr[i - base_row_offset_]; // how many entries in this row - trptr[i - base_row_offset_] = count + rptr_.back(); + for (std::size_t i = 0; i < trptr.size(); ++i) { + std::size_t thread_count = trptr[i]; // how many entries in this row + trptr[i] = count + rptr_fill_value; count += thread_count; + if (offset_idx < rptr_.size()) { + rptr_[offset_idx++] += count; + } } } - rptr_[i + 1] += count; // pointer accumulated from all thread + data_.resize(rptr_.back()); // usage of empty allocator can help to improve performance + } else { + // set rptr to correct size + SizeType rptr_fill_value = rptr_.empty() ? 0 : rptr_.back(); + for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) { + if (rptr_.size() <= thread_rptr_[tid].size() + base_row_offset_) { + rptr_.resize(thread_rptr_[tid].size() + base_row_offset_ + 1, + rptr_fill_value); // key + 1 + } + } + // initialize rptr to be beginning of each segment + std::size_t count = 0; + for (std::size_t i = base_row_offset_; i + 1 < rptr_.size(); ++i) { + for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) { + std::vector &trptr = thread_rptr_[tid]; + if (i < trptr.size() + + base_row_offset_) { // i^th row is assigned for this thread + std::size_t thread_count = + trptr[i - base_row_offset_]; // how many entries in this row + trptr[i - base_row_offset_] = count + rptr_.back(); + count += thread_count; + } + } + rptr_[i + 1] += count; // pointer accumulated from all thread + } + data_.resize(rptr_.back()); } - data_.resize(rptr_.back()); } /*! @@ -113,7 +147,8 @@ class ParallelGroupBuilder { * \param threadid the id of thread that calls this function */ void Push(std::size_t key, ValueType&& value, int threadid) { - size_t offset_key = key - base_row_offset_; + size_t offset_key = is_row_major ? (key - base_row_offset_ - threadid * thread_displacement_) + : (key - base_row_offset_); SizeType &rp = thread_rptr_[threadid][offset_key]; data_[rp++] = std::move(value); } @@ -127,6 +162,8 @@ class ParallelGroupBuilder { std::vector > thread_rptr_; /** \brief Used when rows being pushed into the builder are strictly above some number. */ size_t base_row_offset_; + /** \brief Used for row major adapters to handle reduced thread local memory allocation */ + size_t thread_displacement_; }; } // namespace common } // namespace xgboost diff --git a/src/data/adapter.h b/src/data/adapter.h index 33fe74bed..92b4a8b9b 100644 --- a/src/data/adapter.h +++ b/src/data/adapter.h @@ -148,6 +148,7 @@ class CSRAdapterBatch : public detail::NoMetaInfo { &values_[begin_offset]); } size_t Size() const { return num_rows_; } + static constexpr bool kIsRowMajor = true; private: const size_t* row_ptr_; @@ -204,6 +205,7 @@ class DenseAdapterBatch : public detail::NoMetaInfo { const Line GetLine(size_t idx) const { return Line(values_ + idx * num_features_, num_features_, idx); } + static constexpr bool kIsRowMajor = true; private: const float* values_; @@ -320,6 +322,7 @@ class CSRArrayAdapterBatch : public detail::NoMetaInfo { size = size == 0 ? 0 : size - 1; return size; } + static constexpr bool kIsRowMajor = true; Line const GetLine(size_t idx) const { auto begin_offset = indptr_.GetElement(idx, 0); @@ -405,6 +408,7 @@ class CSCAdapterBatch : public detail::NoMetaInfo { return Line(idx, end_offset - begin_offset, &row_idx_[begin_offset], &values_[begin_offset]); } + static constexpr bool kIsRowMajor = false; private: const size_t* col_ptr_; @@ -537,6 +541,7 @@ class DataTableAdapterBatch : public detail::NoMetaInfo { const Line GetLine(size_t idx) const { return Line(DTGetType(feature_stypes_[idx]), num_rows_, idx, data_[idx]); } + static constexpr bool kIsRowMajor = false; private: void** data_; @@ -600,6 +605,7 @@ class FileAdapterBatch { const float* BaseMargin() const { return nullptr; } size_t Size() const { return block_->size; } + static constexpr bool kIsRowMajor = true; private: const dmlc::RowBlock* block_; diff --git a/src/data/data.cc b/src/data/data.cc index f2ac7bf4a..df606849f 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -872,14 +872,20 @@ void SparsePage::Push(const SparsePage &batch) { template uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread) { + constexpr bool kIsRowMajor = AdapterBatchT::kIsRowMajor; + // Allow threading only for row-major case as column-major requires O(nthread*batch_size) memory + nthread = kIsRowMajor ? nthread : 1; // Set number of threads but keep old value so we can reset it after int nthread_original = common::OmpSetNumThreadsWithoutHT(&nthread); + if (!kIsRowMajor) { + CHECK_EQ(nthread, 1); + } auto& offset_vec = offset.HostVector(); auto& data_vec = data.HostVector(); size_t builder_base_row_offset = this->Size(); common::ParallelGroupBuilder< - Entry, std::remove_reference::type::value_type> + Entry, std::remove_reference::type::value_type, kIsRowMajor> builder(&offset_vec, &data_vec, builder_base_row_offset); // Estimate expected number of rows by using last element in batch // This is not required to be exact but prevents unnecessary resizing @@ -892,13 +898,15 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread } } size_t batch_size = batch.Size(); - const size_t thread_size = batch_size / nthread; - builder.InitBudget(expected_rows+1, nthread); + expected_rows = kIsRowMajor ? batch_size : expected_rows; uint64_t max_columns = 0; if (batch_size == 0) { omp_set_num_threads(nthread_original); return max_columns; } + const size_t thread_size = batch_size / nthread; + + builder.InitBudget(expected_rows, nthread); std::vector> max_columns_vector(nthread); dmlc::OMPException exec; std::atomic valid{true}; diff --git a/src/data/simple_dmatrix.cc b/src/data/simple_dmatrix.cc index c366dfba4..ec2d5d36d 100644 --- a/src/data/simple_dmatrix.cc +++ b/src/data/simple_dmatrix.cc @@ -91,9 +91,6 @@ BatchSet SimpleDMatrix::GetEllpackBatches(const BatchParam& param) template SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) { - // Set number of threads but keep old value so we can reset it after - int nthread_original = common::OmpSetNumThreadsWithoutHT(&nthread); - std::vector qids; uint64_t default_max = std::numeric_limits::max(); uint64_t last_group_id = default_max; @@ -184,7 +181,6 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) { info_.num_row_ = adapter->NumRows(); } info_.num_nonzero_ = data_vec.size(); - omp_set_num_threads(nthread_original); } SimpleDMatrix::SimpleDMatrix(dmlc::Stream* in_stream) {