From 7ea567567933a4aa11348495241da272246bb4ca Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Sat, 2 Mar 2019 01:58:08 +0800 Subject: [PATCH] Add PushCSC for SparsePage. (#4193) * Add PushCSC for SparsePage. * Move Push* definitions into cc file. * Add std:: prefix to `size_t` make clang++ happy. * Address monitor count == 0. --- include/xgboost/data.h | 39 +++----------- src/common/group_data.h | 26 +++++----- src/common/timer.h | 5 ++ src/data/data.cc | 89 ++++++++++++++++++++++++++++++++ src/data/sparse_page_source.cc | 11 ++-- src/tree/updater_basemaker-inl.h | 1 + tests/cpp/data/test_data.cc | 55 ++++++++++++++++++++ tests/cpp/test_learner.cc | 23 +++++++++ 8 files changed, 199 insertions(+), 50 deletions(-) create mode 100644 tests/cpp/data/test_data.cc diff --git a/include/xgboost/data.h b/include/xgboost/data.h index e2d800ca4..81565d194 100644 --- a/include/xgboost/data.h +++ b/include/xgboost/data.h @@ -250,42 +250,17 @@ class SparsePage { * \brief Push row block into the page. * \param batch the row batch. */ - inline void Push(const dmlc::RowBlock& batch) { - auto& data_vec = data.HostVector(); - auto& offset_vec = offset.HostVector(); - data_vec.reserve(data.Size() + batch.offset[batch.size] - batch.offset[0]); - offset_vec.reserve(offset.Size() + batch.size); - CHECK(batch.index != nullptr); - for (size_t i = 0; i < batch.size; ++i) { - offset_vec.push_back(offset_vec.back() + batch.offset[i + 1] - batch.offset[i]); - } - for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) { - uint32_t index = batch.index[i]; - bst_float fvalue = batch.value == nullptr ? 1.0f : batch.value[i]; - data_vec.emplace_back(index, fvalue); - } - CHECK_EQ(offset_vec.back(), data.Size()); - } + void Push(const dmlc::RowBlock& batch); /*! * \brief Push a sparse page * \param batch the row page */ - inline void Push(const SparsePage &batch) { - auto& data_vec = data.HostVector(); - auto& offset_vec = offset.HostVector(); - const auto& batch_offset_vec = batch.offset.HostVector(); - const auto& batch_data_vec = batch.data.HostVector(); - size_t top = offset_vec.back(); - data_vec.resize(top + batch.data.Size()); - std::memcpy(dmlc::BeginPtr(data_vec) + top, - dmlc::BeginPtr(batch_data_vec), - sizeof(Entry) * batch.data.Size()); - size_t begin = offset.Size(); - offset_vec.resize(begin + batch.Size()); - for (size_t i = 0; i < batch.Size(); ++i) { - offset_vec[i + begin] = top + batch_offset_vec[i + 1]; - } - } + void Push(const SparsePage &batch); + /*! + * \brief Push a SparsePage stored in CSC format + * \param batch The row batch to be pushed + */ + void PushCSC(const SparsePage& batch); /*! * \brief Push one instance into page * \param inst an instance row diff --git a/src/common/group_data.h b/src/common/group_data.h index 6b5f59c47..f43398932 100644 --- a/src/common/group_data.h +++ b/src/common/group_data.h @@ -23,7 +23,7 @@ namespace common { * \tparam ValueType type of entries in the sparse matrix * \tparam SizeType type of the index range holder */ -template +template struct ParallelGroupBuilder { public: // parallel group builder of data @@ -44,9 +44,9 @@ struct ParallelGroupBuilder { * \param nkeys number of keys in the matrix, can be smaller than expected * \param nthread number of thread that will be used in construction */ - inline void InitBudget(size_t nkeys, int nthread) { + inline void InitBudget(std::size_t nkeys, int nthread) { thread_rptr_.resize(nthread); - for (size_t i = 0; i < thread_rptr_.size(); ++i) { + for (std::size_t i = 0; i < thread_rptr_.size(); ++i) { thread_rptr_[i].resize(nkeys); std::fill(thread_rptr_[i].begin(), thread_rptr_[i].end(), 0); } @@ -57,7 +57,7 @@ struct ParallelGroupBuilder { * \param threadid the id of thread that calls this function * \param nelem number of element budget add to this row */ - inline void AddBudget(size_t key, int threadid, SizeType nelem = 1) { + inline void AddBudget(std::size_t key, int threadid, SizeType nelem = 1) { std::vector &trptr = thread_rptr_[threadid]; if (trptr.size() < key + 1) { trptr.resize(key + 1, 0); @@ -67,23 +67,23 @@ struct ParallelGroupBuilder { /*! \brief step 3: initialize the necessary storage */ inline void InitStorage() { // set rptr to correct size - for (size_t tid = 0; tid < thread_rptr_.size(); ++tid) { + for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) { if (rptr_.size() <= thread_rptr_[tid].size()) { - rptr_.resize(thread_rptr_[tid].size() + 1); + rptr_.resize(thread_rptr_[tid].size() + 1); // key + 1 } } // initialize rptr to be beginning of each segment - size_t start = 0; - for (size_t i = 0; i + 1 < rptr_.size(); ++i) { - for (size_t tid = 0; tid < thread_rptr_.size(); ++tid) { + std::size_t start = 0; + for (std::size_t i = 0; i + 1 < rptr_.size(); ++i) { + for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) { std::vector &trptr = thread_rptr_[tid]; - if (i < trptr.size()) { - size_t ncnt = trptr[i]; + if (i < trptr.size()) { // i^th row is assigned for this thread + std::size_t ncnt = trptr[i]; // how many entries in this row trptr[i] = start; start += ncnt; } } - rptr_[i + 1] = start; + rptr_[i + 1] = start; // pointer accumulated from all thread } data_.resize(start); } @@ -95,7 +95,7 @@ struct ParallelGroupBuilder { * \param value The value to be pushed to the group. * \param threadid the id of thread that calls this function */ - inline void Push(size_t key, ValueType value, int threadid) { + void Push(std::size_t key, ValueType value, int threadid) { SizeType &rp = thread_rptr_[threadid][key]; data_[rp++] = value; } diff --git a/src/common/timer.h b/src/common/timer.h index 970eeb808..a4b631783 100644 --- a/src/common/timer.h +++ b/src/common/timer.h @@ -61,6 +61,11 @@ struct Monitor { LOG(CONSOLE) << "======== Monitor: " << label << " ========"; for (auto &kv : statistics_map) { + if (kv.second.count == 0) { + LOG(WARNING) << + "Timer for " << kv.first << " did not get stopped properly."; + continue; + } LOG(CONSOLE) << kv.first << ": " << kv.second.timer.ElapsedSeconds() << "s, " << kv.second.count << " calls @ " << std::chrono::duration_cast( diff --git a/src/data/data.cc b/src/data/data.cc index 24791d7a1..99d249b4b 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -317,6 +317,95 @@ data::SparsePageFormat::DecideFormat(const std::string& cache_prefix) { } } +void SparsePage::Push(const SparsePage &batch) { + auto& data_vec = data.HostVector(); + auto& offset_vec = offset.HostVector(); + const auto& batch_offset_vec = batch.offset.HostVector(); + const auto& batch_data_vec = batch.data.HostVector(); + size_t top = offset_vec.back(); + data_vec.resize(top + batch.data.Size()); + std::memcpy(dmlc::BeginPtr(data_vec) + top, + dmlc::BeginPtr(batch_data_vec), + sizeof(Entry) * batch.data.Size()); + size_t begin = offset.Size(); + offset_vec.resize(begin + batch.Size()); + for (size_t i = 0; i < batch.Size(); ++i) { + offset_vec[i + begin] = top + batch_offset_vec[i + 1]; + } +} + +void SparsePage::Push(const dmlc::RowBlock& batch) { + auto& data_vec = data.HostVector(); + auto& offset_vec = offset.HostVector(); + data_vec.reserve(data.Size() + batch.offset[batch.size] - batch.offset[0]); + offset_vec.reserve(offset.Size() + batch.size); + CHECK(batch.index != nullptr); + for (size_t i = 0; i < batch.size; ++i) { + offset_vec.push_back(offset_vec.back() + batch.offset[i + 1] - batch.offset[i]); + } + for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) { + uint32_t index = batch.index[i]; + bst_float fvalue = batch.value == nullptr ? 1.0f : batch.value[i]; + data_vec.emplace_back(index, fvalue); + } + CHECK_EQ(offset_vec.back(), data.Size()); +} + +void SparsePage::PushCSC(const SparsePage &batch) { + std::vector& self_data = data.HostVector(); + std::vector& self_offset = offset.HostVector(); + + auto const& other_data = batch.data.ConstHostVector(); + auto const& other_offset = batch.offset.ConstHostVector(); + + if (other_data.empty()) { + return; + } + if (!self_data.empty()) { + CHECK_EQ(self_offset.size(), other_offset.size()) + << "self_data.size(): " << this->data.Size() << ", " + << "other_data.size(): " << other_data.size() << std::flush; + } else { + self_data = other_data; + self_offset = other_offset; + return; + } + + std::vector offset(other_offset.size()); + offset[0] = 0; + + std::vector data(self_data.size() + batch.data.Size()); + + // n_cols in original csr data matrix, here in csc is n_rows + size_t const n_features = other_offset.size() - 1; + size_t beg = 0; + size_t ptr = 1; + for (size_t i = 0; i < n_features; ++i) { + size_t const self_beg = self_offset.at(i); + size_t const self_length = self_offset.at(i+1) - self_beg; + CHECK_LT(beg, data.size()); + std::memcpy(dmlc::BeginPtr(data)+beg, + dmlc::BeginPtr(self_data) + self_beg, + sizeof(Entry) * self_length); + beg += self_length; + + size_t const other_beg = other_offset.at(i); + size_t const other_length = other_offset.at(i+1) - other_beg; + CHECK_LT(beg, data.size()); + std::memcpy(dmlc::BeginPtr(data)+beg, + dmlc::BeginPtr(other_data) + other_beg, + sizeof(Entry) * other_length); + beg += other_length; + + CHECK_LT(ptr, offset.size()); + offset.at(ptr) = beg; + ptr++; + } + + self_data = std::move(data); + self_offset = std::move(offset); +} + namespace data { // List of files that will be force linked in static links. DMLC_REGISTRY_LINK_TAG(sparse_page_raw_format); diff --git a/src/data/sparse_page_source.cc b/src/data/sparse_page_source.cc index e8f701d42..0bec51997 100644 --- a/src/data/sparse_page_source.cc +++ b/src/data/sparse_page_source.cc @@ -126,7 +126,7 @@ bool SparsePageSource::CacheExist(const std::string& cache_info, } void SparsePageSource::CreateRowPage(dmlc::Parser* src, - const std::string& cache_info) { + const std::string& cache_info) { const std::string page_type = ".row.page"; std::vector cache_shards = GetCacheShards(cache_info); CHECK_NE(cache_shards.size(), 0U); @@ -216,7 +216,8 @@ void SparsePageSource::CreateRowPage(dmlc::Parser* src, CHECK(info.qids_.empty() || info.qids_.size() == info.num_row_); info.SaveBinary(fo.get()); } - LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info; + LOG(CONSOLE) << "SparsePageSource::CreateRowPage Finished writing to " + << name_info; } void SparsePageSource::CreatePageFromDMatrix(DMatrix* src, @@ -246,9 +247,9 @@ void SparsePageSource::CreatePageFromDMatrix(DMatrix* src, } else if (page_type == ".col.page") { page->Push(batch.GetTranspose(src->Info().num_col_)); } else if (page_type == ".sorted.col.page") { - auto tmp = batch.GetTranspose(src->Info().num_col_); - tmp.SortRows(); - page->Push(tmp); + SparsePage tmp = batch.GetTranspose(src->Info().num_col_); + page->PushCSC(tmp); + page->SortRows(); } else { LOG(FATAL) << "Unknown page type: " << page_type; } diff --git a/src/tree/updater_basemaker-inl.h b/src/tree/updater_basemaker-inl.h index 1a8238e75..123e6c489 100644 --- a/src/tree/updater_basemaker-inl.h +++ b/src/tree/updater_basemaker-inl.h @@ -49,6 +49,7 @@ class BaseMaker: public TreeUpdater { for (bst_uint fid = 0; fid < batch.Size(); ++fid) { auto c = batch[fid]; if (c.size() != 0) { + CHECK_LT(fid * 2, fminmax_.size()); fminmax_[fid * 2 + 0] = std::max(-c[0].fvalue, fminmax_[fid * 2 + 0]); fminmax_[fid * 2 + 1] = diff --git a/tests/cpp/data/test_data.cc b/tests/cpp/data/test_data.cc new file mode 100644 index 000000000..7baea3a39 --- /dev/null +++ b/tests/cpp/data/test_data.cc @@ -0,0 +1,55 @@ +#include +#include + +#include "xgboost/data.h" + +namespace xgboost { +TEST(SparsePage, PushCSC) { + std::vector offset {0}; + std::vector data; + SparsePage page; + page.offset.HostVector() = offset; + page.data.HostVector() = data; + + offset = {0, 1, 4}; + for (size_t i = 0; i < offset.back(); ++i) { + data.push_back(Entry(i, 0.1f)); + } + + SparsePage other; + other.offset.HostVector() = offset; + other.data.HostVector() = data; + + page.PushCSC(other); + + ASSERT_EQ(page.offset.HostVector().size(), offset.size()); + ASSERT_EQ(page.data.HostVector().size(), data.size()); + for (size_t i = 0; i < offset.size(); ++i) { + ASSERT_EQ(page.offset.HostVector()[i], offset[i]); + } + for (size_t i = 0; i < data.size(); ++i) { + ASSERT_EQ(page.data.HostVector()[i].index, data[i].index); + } + + page.PushCSC(other); + ASSERT_EQ(page.offset.HostVector().size(), offset.size()); + ASSERT_EQ(page.data.Size(), data.size() * 2); + + for (size_t i = 0; i < offset.size(); ++i) { + ASSERT_EQ(page.offset.HostVector()[i], offset[i] * 2); + } + + auto inst = page[0]; + ASSERT_EQ(inst.size(), 2); + for (auto entry : inst) { + ASSERT_EQ(entry.index, 0); + } + + inst = page[1]; + ASSERT_EQ(inst.size(), 6); + std::vector indices_sol {1, 2, 3}; + for (size_t i = 0; i < inst.size(); ++i) { + ASSERT_EQ(inst[i].index, indices_sol[i % 3]); + } +} +} diff --git a/tests/cpp/test_learner.cc b/tests/cpp/test_learner.cc index e1c16f296..b0181bd30 100644 --- a/tests/cpp/test_learner.cc +++ b/tests/cpp/test_learner.cc @@ -3,6 +3,7 @@ #include #include "helpers.h" #include "xgboost/learner.h" +#include "dmlc/filesystem.h" namespace xgboost { @@ -92,4 +93,26 @@ TEST(Learner, CheckGroup) { delete pp_mat; } +TEST(Learner, SLOW_CheckMultiBatch) { + using Arg = std::pair; + // Create sufficiently large data to make two row pages + dmlc::TemporaryDirectory tempdir; + const std::string tmp_file = tempdir.path + "/big.libsvm"; + CreateBigTestData(tmp_file, 5000000); + std::shared_ptr dmat(xgboost::DMatrix::Load( tmp_file + "#" + tmp_file + ".cache", true, false)); + EXPECT_TRUE(FileExists(tmp_file + ".cache.row.page")); + EXPECT_FALSE(dmat->SingleColBlock()); + size_t num_row = dmat->Info().num_row_; + std::vector labels(num_row); + for (size_t i = 0; i < num_row; ++i) { + labels[i] = i % 2; + } + dmat->Info().SetInfo("label", labels.data(), DataType::kFloat32, num_row); + std::vector> mat{dmat}; + auto learner = std::unique_ptr(Learner::Create(mat)); + learner->Configure({Arg{"objective", "binary:logistic"}}); + learner->InitModel(); + learner->UpdateOneIter(0, dmat.get()); +} + } // namespace xgboost