diff --git a/src/common/group_data.h b/src/common/group_data.h index 0b7af3b2c..22c143190 100644 --- a/src/common/group_data.h +++ b/src/common/group_data.h @@ -15,6 +15,7 @@ #define XGBOOST_COMMON_GROUP_DATA_H_ #include +#include #include "xgboost/base.h" @@ -26,30 +27,44 @@ namespace common { * \tparam SizeType type of the index range holder */ template -struct ParallelGroupBuilder { +class ParallelGroupBuilder { public: - // parallel group builder of data - ParallelGroupBuilder(std::vector *p_rptr, - std::vector *p_data) - : rptr_(*p_rptr), data_(*p_data), thread_rptr_(tmp_thread_rptr_) { - } + /** + * \brief parallel group builder of data. + * + * \param [in,out] p_rptr Row offsets for CSR matrix. + * \param [in,out] p_data Data vector to populate + * \param base_row_offset (Optional) If the matrix we are building + * is already partially populated, use this to indicate the row index we are + * starting from. This saves considerable amounts of time/memory when + * incrementaly building. + */ ParallelGroupBuilder(std::vector *p_rptr, std::vector *p_data, - std::vector< std::vector > *p_thread_rptr) - : rptr_(*p_rptr), data_(*p_data), thread_rptr_(*p_thread_rptr) { - } + size_t base_row_offset = 0) + : rptr_(*p_rptr), + data_(*p_data), + thread_rptr_(tmp_thread_rptr_), + base_row_offset_(base_row_offset) {} + ParallelGroupBuilder(std::vector *p_rptr, + std::vector *p_data, + std::vector > *p_thread_rptr, + size_t base_row_offset = 0) + : rptr_(*p_rptr), + data_(*p_data), + thread_rptr_(*p_thread_rptr), + base_row_offset_(base_row_offset) {} - public: /*! * \brief step 1: initialize the helper, with hint of number keys * and thread used in the construction - * \param nkeys number of keys in the matrix, can be smaller than expected + * \param max_key number of keys in the matrix, can be smaller than expected * \param nthread number of thread that will be used in construction */ - inline void InitBudget(std::size_t nkeys, int nthread) { + inline void InitBudget(std::size_t max_key, int nthread) { thread_rptr_.resize(nthread); - for (std::size_t i = 0; i < thread_rptr_.size(); ++i) { - thread_rptr_[i].resize(nkeys); + for (std::size_t i = 0; i < thread_rptr_.size(); ++i) { + thread_rptr_[i].resize(max_key - std::min(base_row_offset_, max_key)); std::fill(thread_rptr_[i].begin(), thread_rptr_[i].end(), 0); } } @@ -61,28 +76,32 @@ struct ParallelGroupBuilder { */ inline void AddBudget(std::size_t key, int threadid, SizeType nelem = 1) { std::vector &trptr = thread_rptr_[threadid]; - if (trptr.size() < key + 1) { - trptr.resize(key + 1, 0); + size_t offset_key = key - base_row_offset_; + if (trptr.size() < offset_key + 1) { + trptr.resize(offset_key + 1, 0); } - trptr[key] += nelem; + trptr[offset_key] += nelem; } /*! \brief step 3: initialize the necessary storage */ inline void InitStorage() { // set rptr to correct size SizeType rptr_fill_value = rptr_.empty() ? 0 : rptr_.back(); for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) { - if (rptr_.size() <= thread_rptr_[tid].size()) { - rptr_.resize(thread_rptr_[tid].size() + 1, rptr_fill_value); // key + 1 + if (rptr_.size() <= thread_rptr_[tid].size() + base_row_offset_) { + rptr_.resize(thread_rptr_[tid].size() + base_row_offset_ + 1, + rptr_fill_value); // key + 1 } } // initialize rptr to be beginning of each segment std::size_t count = 0; - for (std::size_t i = 0; i + 1 < rptr_.size(); ++i) { + for (std::size_t i = base_row_offset_; i + 1 < rptr_.size(); ++i) { for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) { std::vector &trptr = thread_rptr_[tid]; - if (i < trptr.size()) { // i^th row is assigned for this thread - std::size_t thread_count = trptr[i]; // how many entries in this row - trptr[i] = count + rptr_.back(); + if (i < trptr.size() + + base_row_offset_) { // i^th row is assigned for this thread + std::size_t thread_count = + trptr[i - base_row_offset_]; // how many entries in this row + trptr[i - base_row_offset_] = count + rptr_.back(); count += thread_count; } } @@ -99,7 +118,8 @@ struct ParallelGroupBuilder { * \param threadid the id of thread that calls this function */ void Push(std::size_t key, ValueType value, int threadid) { - SizeType &rp = thread_rptr_[threadid][key]; + size_t offset_key = key - base_row_offset_; + SizeType &rp = thread_rptr_[threadid][offset_key]; data_[rp++] = value; } @@ -112,6 +132,8 @@ struct ParallelGroupBuilder { std::vector > &thread_rptr_; /*! \brief local temp thread ptr, use this if not specified by the constructor */ std::vector > tmp_thread_rptr_; + /** \brief Used when rows being pushed into the builder are strictly above some number. */ + size_t base_row_offset_; }; } // namespace common } // namespace xgboost diff --git a/src/data/simple_dmatrix.h b/src/data/simple_dmatrix.h index 6ea85bc0d..1a5905ff2 100644 --- a/src/data/simple_dmatrix.h +++ b/src/data/simple_dmatrix.h @@ -51,10 +51,21 @@ class SimpleDMatrix : public DMatrix { // Iterate over batches of input data while (adapter->Next()) { auto &batch = adapter->Value(); + + size_t base_row_offset = offset_vec.empty() ? 0 : offset_vec.size() - 1; common::ParallelGroupBuilder< Entry, std::remove_reference::type::value_type> - builder(&offset_vec, &data_vec); - builder.InitBudget(0, nthread); + builder(&offset_vec, &data_vec, base_row_offset); + // Estimate expected number of rows by using last element in batch + // This is not required to be exact but prevents unnecessary resizing + size_t expected_rows = 0; + if (batch.Size() > 0) { + auto last_line = batch.GetLine(batch.Size() - 1); + if (last_line.Size() > 0) { + expected_rows = last_line.GetElement(last_line.Size() - 1).row_idx; + } + } + builder.InitBudget(expected_rows, nthread); // First-pass over the batch counting valid elements size_t num_lines = batch.Size(); diff --git a/tests/cpp/common/test_group_data.cc b/tests/cpp/common/test_group_data.cc index 095f10d75..d71315999 100644 --- a/tests/cpp/common/test_group_data.cc +++ b/tests/cpp/common/test_group_data.cc @@ -35,7 +35,8 @@ TEST(group_data, ParallelGroupBuilder) { EXPECT_EQ(offsets, expected_offsets); // Create new builder, add one more row given already populated offsets/data - ParallelGroupBuilder builder2(&offsets, &data); + ParallelGroupBuilder builder2(&offsets, &data, + offsets.size() - 1); builder2.InitBudget(0, 1); builder2.AddBudget(2, 0, 2); builder2.InitStorage(); diff --git a/tests/cpp/data/test_adapter.cc b/tests/cpp/data/test_adapter.cc index 3e0607362..2429d949c 100644 --- a/tests/cpp/data/test_adapter.cc +++ b/tests/cpp/data/test_adapter.cc @@ -30,7 +30,7 @@ TEST(c_api, CSRAdapter) { EXPECT_EQ(line2 .GetElement(0).row_idx, 2); EXPECT_EQ(line2 .GetElement(0).column_idx, 1); - data::SimpleDMatrix dmat(&adapter, -1, std::nan("")); + data::SimpleDMatrix dmat(&adapter, std::nan(""), -1); EXPECT_EQ(dmat.Info().num_col_, 2); EXPECT_EQ(dmat.Info().num_row_, 3); EXPECT_EQ(dmat.Info().num_nonzero_, 5); @@ -51,7 +51,7 @@ TEST(c_api, DenseAdapter) { int n = 2; std::vector data = {1, 2, 3, 4, 5, 6}; data::DenseAdapter adapter(data.data(), m, m*n, n); - data::SimpleDMatrix dmat(&adapter,-1,std::numeric_limits::quiet_NaN()); + data::SimpleDMatrix dmat(&adapter, std::numeric_limits::quiet_NaN(), -1); EXPECT_EQ(dmat.Info().num_col_, 2); EXPECT_EQ(dmat.Info().num_row_, 3); EXPECT_EQ(dmat.Info().num_nonzero_, 6); @@ -73,7 +73,7 @@ TEST(c_api, CSCAdapter) { std::vector row_idx = {0, 1, 0, 1, 2}; std::vector col_ptr = {0, 2, 5}; data::CSCAdapter adapter(col_ptr.data(), row_idx.data(), data.data(), 2, 3); - data::SimpleDMatrix dmat(&adapter,-1,std::numeric_limits::quiet_NaN()); + data::SimpleDMatrix dmat(&adapter, std::numeric_limits::quiet_NaN(), -1); EXPECT_EQ(dmat.Info().num_col_, 2); EXPECT_EQ(dmat.Info().num_row_, 3); EXPECT_EQ(dmat.Info().num_nonzero_, 5); @@ -96,6 +96,39 @@ TEST(c_api, CSCAdapter) { EXPECT_EQ(inst[0].index, 1); } +TEST(c_api, CSCAdapterColsMoreThanRows) { + std::vector data = {1, 2, 3, 4, 5, 6, 7, 8}; + std::vector row_idx = {0, 1, 0, 1, 0, 1, 0, 1}; + std::vector col_ptr = {0, 2, 4, 6, 8}; + // Infer row count + data::CSCAdapter adapter(col_ptr.data(), row_idx.data(), data.data(), 4, 0); + data::SimpleDMatrix dmat(&adapter, std::numeric_limits::quiet_NaN(), -1); + EXPECT_EQ(dmat.Info().num_col_, 4); + EXPECT_EQ(dmat.Info().num_row_, 2); + EXPECT_EQ(dmat.Info().num_nonzero_, 8); + + auto &batch = *dmat.GetBatches().begin(); + auto inst = batch[0]; + EXPECT_EQ(inst[0].fvalue, 1); + EXPECT_EQ(inst[0].index, 0); + EXPECT_EQ(inst[1].fvalue, 3); + EXPECT_EQ(inst[1].index, 1); + EXPECT_EQ(inst[2].fvalue, 5); + EXPECT_EQ(inst[2].index, 2); + EXPECT_EQ(inst[3].fvalue, 7); + EXPECT_EQ(inst[3].index, 3); + + inst = batch[1]; + EXPECT_EQ(inst[0].fvalue, 2); + EXPECT_EQ(inst[0].index, 0); + EXPECT_EQ(inst[1].fvalue, 4); + EXPECT_EQ(inst[1].index, 1); + EXPECT_EQ(inst[2].fvalue, 6); + EXPECT_EQ(inst[2].index, 2); + EXPECT_EQ(inst[3].fvalue, 8); + EXPECT_EQ(inst[3].index, 3); +} + TEST(c_api, FileAdapter) { std::string filename = "test.libsvm"; CreateBigTestData(filename, 10); diff --git a/tests/cpp/data/test_simple_dmatrix.cc b/tests/cpp/data/test_simple_dmatrix.cc index 24858f0e1..5bd4ba44e 100644 --- a/tests/cpp/data/test_simple_dmatrix.cc +++ b/tests/cpp/data/test_simple_dmatrix.cc @@ -126,3 +126,32 @@ TEST(SimpleDMatrix, EmptyRow) { CHECK_EQ(dmat.Info().num_row_, 2); CHECK_EQ(dmat.Info().num_col_, 2); } + +TEST(SimpleDMatrix, FromFile) { + std::string filename = "test.libsvm"; + CreateBigTestData(filename, 3 * 5); + std::unique_ptr> parser( + dmlc::Parser::Create(filename.c_str(), 0, 1, "auto")); + data::FileAdapter adapter(parser.get()); + data::SimpleDMatrix dmat(&adapter, std::numeric_limits::quiet_NaN(), + 1); + for (auto &batch : dmat.GetBatches()) { + EXPECT_EQ(batch.Size(), 5); + EXPECT_EQ(batch.offset.HostVector(), + std::vector({0, 3, 6, 9, 12, 15})); + EXPECT_EQ(batch.base_rowid, 0); + + for (auto i = 0ull; i < batch.Size(); i++) { + if (i%2== 0) { + EXPECT_EQ(batch[i][0].index, 0); + EXPECT_EQ(batch[i][1].index, 1); + EXPECT_EQ(batch[i][2].index, 2); + } + else { + EXPECT_EQ(batch[i][0].index, 0); + EXPECT_EQ(batch[i][1].index, 3); + EXPECT_EQ(batch[i][2].index, 4); + } + } + } +}