Add PushCSC for SparsePage. (#4193)

* Add PushCSC for SparsePage.

* Move Push* definitions into cc file.
* Add std:: prefix to `size_t` make clang++ happy.
* Address monitor count == 0.
This commit is contained in:
Jiaming Yuan 2019-03-02 01:58:08 +08:00 committed by GitHub
parent 74009afcac
commit 7ea5675679
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 199 additions and 50 deletions

View File

@ -250,42 +250,17 @@ class SparsePage {
* \brief Push row block into the page.
* \param batch the row batch.
*/
inline void Push(const dmlc::RowBlock<uint32_t>& batch) {
auto& data_vec = data.HostVector();
auto& offset_vec = offset.HostVector();
data_vec.reserve(data.Size() + batch.offset[batch.size] - batch.offset[0]);
offset_vec.reserve(offset.Size() + batch.size);
CHECK(batch.index != nullptr);
for (size_t i = 0; i < batch.size; ++i) {
offset_vec.push_back(offset_vec.back() + batch.offset[i + 1] - batch.offset[i]);
}
for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) {
uint32_t index = batch.index[i];
bst_float fvalue = batch.value == nullptr ? 1.0f : batch.value[i];
data_vec.emplace_back(index, fvalue);
}
CHECK_EQ(offset_vec.back(), data.Size());
}
void Push(const dmlc::RowBlock<uint32_t>& batch);
/*!
* \brief Push a sparse page
* \param batch the row page
*/
inline void Push(const SparsePage &batch) {
auto& data_vec = data.HostVector();
auto& offset_vec = offset.HostVector();
const auto& batch_offset_vec = batch.offset.HostVector();
const auto& batch_data_vec = batch.data.HostVector();
size_t top = offset_vec.back();
data_vec.resize(top + batch.data.Size());
std::memcpy(dmlc::BeginPtr(data_vec) + top,
dmlc::BeginPtr(batch_data_vec),
sizeof(Entry) * batch.data.Size());
size_t begin = offset.Size();
offset_vec.resize(begin + batch.Size());
for (size_t i = 0; i < batch.Size(); ++i) {
offset_vec[i + begin] = top + batch_offset_vec[i + 1];
}
}
void Push(const SparsePage &batch);
/*!
* \brief Push a SparsePage stored in CSC format
* \param batch The row batch to be pushed
*/
void PushCSC(const SparsePage& batch);
/*!
* \brief Push one instance into page
* \param inst an instance row

View File

@ -23,7 +23,7 @@ namespace common {
* \tparam ValueType type of entries in the sparse matrix
* \tparam SizeType type of the index range holder
*/
template<typename ValueType, typename SizeType = size_t>
template<typename ValueType, typename SizeType = std::size_t>
struct ParallelGroupBuilder {
public:
// parallel group builder of data
@ -44,9 +44,9 @@ struct ParallelGroupBuilder {
* \param nkeys number of keys in the matrix, can be smaller than expected
* \param nthread number of thread that will be used in construction
*/
inline void InitBudget(size_t nkeys, int nthread) {
inline void InitBudget(std::size_t nkeys, int nthread) {
thread_rptr_.resize(nthread);
for (size_t i = 0; i < thread_rptr_.size(); ++i) {
for (std::size_t i = 0; i < thread_rptr_.size(); ++i) {
thread_rptr_[i].resize(nkeys);
std::fill(thread_rptr_[i].begin(), thread_rptr_[i].end(), 0);
}
@ -57,7 +57,7 @@ struct ParallelGroupBuilder {
* \param threadid the id of thread that calls this function
* \param nelem number of element budget add to this row
*/
inline void AddBudget(size_t key, int threadid, SizeType nelem = 1) {
inline void AddBudget(std::size_t key, int threadid, SizeType nelem = 1) {
std::vector<SizeType> &trptr = thread_rptr_[threadid];
if (trptr.size() < key + 1) {
trptr.resize(key + 1, 0);
@ -67,23 +67,23 @@ struct ParallelGroupBuilder {
/*! \brief step 3: initialize the necessary storage */
inline void InitStorage() {
// set rptr to correct size
for (size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
if (rptr_.size() <= thread_rptr_[tid].size()) {
rptr_.resize(thread_rptr_[tid].size() + 1);
rptr_.resize(thread_rptr_[tid].size() + 1); // key + 1
}
}
// initialize rptr to be beginning of each segment
size_t start = 0;
for (size_t i = 0; i + 1 < rptr_.size(); ++i) {
for (size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
std::size_t start = 0;
for (std::size_t i = 0; i + 1 < rptr_.size(); ++i) {
for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) {
std::vector<SizeType> &trptr = thread_rptr_[tid];
if (i < trptr.size()) {
size_t ncnt = trptr[i];
if (i < trptr.size()) { // i^th row is assigned for this thread
std::size_t ncnt = trptr[i]; // how many entries in this row
trptr[i] = start;
start += ncnt;
}
}
rptr_[i + 1] = start;
rptr_[i + 1] = start; // pointer accumulated from all thread
}
data_.resize(start);
}
@ -95,7 +95,7 @@ struct ParallelGroupBuilder {
* \param value The value to be pushed to the group.
* \param threadid the id of thread that calls this function
*/
inline void Push(size_t key, ValueType value, int threadid) {
void Push(std::size_t key, ValueType value, int threadid) {
SizeType &rp = thread_rptr_[threadid][key];
data_[rp++] = value;
}

View File

@ -61,6 +61,11 @@ struct Monitor {
LOG(CONSOLE) << "======== Monitor: " << label << " ========";
for (auto &kv : statistics_map) {
if (kv.second.count == 0) {
LOG(WARNING) <<
"Timer for " << kv.first << " did not get stopped properly.";
continue;
}
LOG(CONSOLE) << kv.first << ": " << kv.second.timer.ElapsedSeconds()
<< "s, " << kv.second.count << " calls @ "
<< std::chrono::duration_cast<std::chrono::microseconds>(

View File

@ -317,6 +317,95 @@ data::SparsePageFormat::DecideFormat(const std::string& cache_prefix) {
}
}
void SparsePage::Push(const SparsePage &batch) {
auto& data_vec = data.HostVector();
auto& offset_vec = offset.HostVector();
const auto& batch_offset_vec = batch.offset.HostVector();
const auto& batch_data_vec = batch.data.HostVector();
size_t top = offset_vec.back();
data_vec.resize(top + batch.data.Size());
std::memcpy(dmlc::BeginPtr(data_vec) + top,
dmlc::BeginPtr(batch_data_vec),
sizeof(Entry) * batch.data.Size());
size_t begin = offset.Size();
offset_vec.resize(begin + batch.Size());
for (size_t i = 0; i < batch.Size(); ++i) {
offset_vec[i + begin] = top + batch_offset_vec[i + 1];
}
}
void SparsePage::Push(const dmlc::RowBlock<uint32_t>& batch) {
auto& data_vec = data.HostVector();
auto& offset_vec = offset.HostVector();
data_vec.reserve(data.Size() + batch.offset[batch.size] - batch.offset[0]);
offset_vec.reserve(offset.Size() + batch.size);
CHECK(batch.index != nullptr);
for (size_t i = 0; i < batch.size; ++i) {
offset_vec.push_back(offset_vec.back() + batch.offset[i + 1] - batch.offset[i]);
}
for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) {
uint32_t index = batch.index[i];
bst_float fvalue = batch.value == nullptr ? 1.0f : batch.value[i];
data_vec.emplace_back(index, fvalue);
}
CHECK_EQ(offset_vec.back(), data.Size());
}
void SparsePage::PushCSC(const SparsePage &batch) {
std::vector<xgboost::Entry>& self_data = data.HostVector();
std::vector<size_t>& self_offset = offset.HostVector();
auto const& other_data = batch.data.ConstHostVector();
auto const& other_offset = batch.offset.ConstHostVector();
if (other_data.empty()) {
return;
}
if (!self_data.empty()) {
CHECK_EQ(self_offset.size(), other_offset.size())
<< "self_data.size(): " << this->data.Size() << ", "
<< "other_data.size(): " << other_data.size() << std::flush;
} else {
self_data = other_data;
self_offset = other_offset;
return;
}
std::vector<size_t> offset(other_offset.size());
offset[0] = 0;
std::vector<xgboost::Entry> data(self_data.size() + batch.data.Size());
// n_cols in original csr data matrix, here in csc is n_rows
size_t const n_features = other_offset.size() - 1;
size_t beg = 0;
size_t ptr = 1;
for (size_t i = 0; i < n_features; ++i) {
size_t const self_beg = self_offset.at(i);
size_t const self_length = self_offset.at(i+1) - self_beg;
CHECK_LT(beg, data.size());
std::memcpy(dmlc::BeginPtr(data)+beg,
dmlc::BeginPtr(self_data) + self_beg,
sizeof(Entry) * self_length);
beg += self_length;
size_t const other_beg = other_offset.at(i);
size_t const other_length = other_offset.at(i+1) - other_beg;
CHECK_LT(beg, data.size());
std::memcpy(dmlc::BeginPtr(data)+beg,
dmlc::BeginPtr(other_data) + other_beg,
sizeof(Entry) * other_length);
beg += other_length;
CHECK_LT(ptr, offset.size());
offset.at(ptr) = beg;
ptr++;
}
self_data = std::move(data);
self_offset = std::move(offset);
}
namespace data {
// List of files that will be force linked in static links.
DMLC_REGISTRY_LINK_TAG(sparse_page_raw_format);

View File

@ -216,7 +216,8 @@ void SparsePageSource::CreateRowPage(dmlc::Parser<uint32_t>* src,
CHECK(info.qids_.empty() || info.qids_.size() == info.num_row_);
info.SaveBinary(fo.get());
}
LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info;
LOG(CONSOLE) << "SparsePageSource::CreateRowPage Finished writing to "
<< name_info;
}
void SparsePageSource::CreatePageFromDMatrix(DMatrix* src,
@ -246,9 +247,9 @@ void SparsePageSource::CreatePageFromDMatrix(DMatrix* src,
} else if (page_type == ".col.page") {
page->Push(batch.GetTranspose(src->Info().num_col_));
} else if (page_type == ".sorted.col.page") {
auto tmp = batch.GetTranspose(src->Info().num_col_);
tmp.SortRows();
page->Push(tmp);
SparsePage tmp = batch.GetTranspose(src->Info().num_col_);
page->PushCSC(tmp);
page->SortRows();
} else {
LOG(FATAL) << "Unknown page type: " << page_type;
}

View File

@ -49,6 +49,7 @@ class BaseMaker: public TreeUpdater {
for (bst_uint fid = 0; fid < batch.Size(); ++fid) {
auto c = batch[fid];
if (c.size() != 0) {
CHECK_LT(fid * 2, fminmax_.size());
fminmax_[fid * 2 + 0] =
std::max(-c[0].fvalue, fminmax_[fid * 2 + 0]);
fminmax_[fid * 2 + 1] =

View File

@ -0,0 +1,55 @@
#include <gtest/gtest.h>
#include <vector>
#include "xgboost/data.h"
namespace xgboost {
TEST(SparsePage, PushCSC) {
std::vector<size_t> offset {0};
std::vector<Entry> data;
SparsePage page;
page.offset.HostVector() = offset;
page.data.HostVector() = data;
offset = {0, 1, 4};
for (size_t i = 0; i < offset.back(); ++i) {
data.push_back(Entry(i, 0.1f));
}
SparsePage other;
other.offset.HostVector() = offset;
other.data.HostVector() = data;
page.PushCSC(other);
ASSERT_EQ(page.offset.HostVector().size(), offset.size());
ASSERT_EQ(page.data.HostVector().size(), data.size());
for (size_t i = 0; i < offset.size(); ++i) {
ASSERT_EQ(page.offset.HostVector()[i], offset[i]);
}
for (size_t i = 0; i < data.size(); ++i) {
ASSERT_EQ(page.data.HostVector()[i].index, data[i].index);
}
page.PushCSC(other);
ASSERT_EQ(page.offset.HostVector().size(), offset.size());
ASSERT_EQ(page.data.Size(), data.size() * 2);
for (size_t i = 0; i < offset.size(); ++i) {
ASSERT_EQ(page.offset.HostVector()[i], offset[i] * 2);
}
auto inst = page[0];
ASSERT_EQ(inst.size(), 2);
for (auto entry : inst) {
ASSERT_EQ(entry.index, 0);
}
inst = page[1];
ASSERT_EQ(inst.size(), 6);
std::vector<size_t> indices_sol {1, 2, 3};
for (size_t i = 0; i < inst.size(); ++i) {
ASSERT_EQ(inst[i].index, indices_sol[i % 3]);
}
}
}

View File

@ -3,6 +3,7 @@
#include <vector>
#include "helpers.h"
#include "xgboost/learner.h"
#include "dmlc/filesystem.h"
namespace xgboost {
@ -92,4 +93,26 @@ TEST(Learner, CheckGroup) {
delete pp_mat;
}
TEST(Learner, SLOW_CheckMultiBatch) {
using Arg = std::pair<std::string, std::string>;
// Create sufficiently large data to make two row pages
dmlc::TemporaryDirectory tempdir;
const std::string tmp_file = tempdir.path + "/big.libsvm";
CreateBigTestData(tmp_file, 5000000);
std::shared_ptr<DMatrix> dmat(xgboost::DMatrix::Load( tmp_file + "#" + tmp_file + ".cache", true, false));
EXPECT_TRUE(FileExists(tmp_file + ".cache.row.page"));
EXPECT_FALSE(dmat->SingleColBlock());
size_t num_row = dmat->Info().num_row_;
std::vector<bst_float> labels(num_row);
for (size_t i = 0; i < num_row; ++i) {
labels[i] = i % 2;
}
dmat->Info().SetInfo("label", labels.data(), DataType::kFloat32, num_row);
std::vector<std::shared_ptr<DMatrix>> mat{dmat};
auto learner = std::unique_ptr<Learner>(Learner::Create(mat));
learner->Configure({Arg{"objective", "binary:logistic"}});
learner->InitModel();
learner->UpdateOneIter(0, dmat.get());
}
} // namespace xgboost