[MEM] Add rowset struct to save memory with billion level rows

This commit is contained in:
tqchen 2016-01-19 16:40:07 -08:00
parent 2230f1273f
commit 88447ca32e
9 changed files with 101 additions and 30 deletions

View File

@ -183,6 +183,41 @@ class DataSource : public dmlc::DataIter<RowBatch> {
MetaInfo info;
};
/*!
* \brief A vector-like structure to represent set of rows.
* But saves the memory when all rows are in the set (common case in xgb)
*/
struct RowSet {
public:
/*! \return i-th row index */
inline bst_uint operator[](size_t i) const;
/*! \return the size of the set. */
inline size_t size() const;
/*! \brief push the index back to the set */
inline void push_back(bst_uint i);
/*! \brief clear the set */
inline void clear();
/*!
* \brief save rowset to file.
* \param fo The file to be saved.
*/
inline void Save(dmlc::Stream* fo) const;
/*!
* \brief Load rowset from file.
* \param fi The file to be loaded.
* \return if read is successful.
*/
inline bool Load(dmlc::Stream* fi);
/*! \brief constructor */
RowSet() : size_(0) {}
private:
/*! \brief The internal data structure of size */
uint64_t size_;
/*! \brief The internal data structure of row set if not all*/
std::vector<bst_uint> rows_;
};
/*!
* \brief Internal data structured used by XGBoost during training.
* There are two ways to create a customized DMatrix that reads in user defined-format.
@ -235,7 +270,7 @@ class DMatrix {
/*! \brief get column density */
virtual float GetColDensity(size_t cidx) const = 0;
/*! \return reference of buffered rowset, in column access */
virtual const std::vector<bst_uint>& buffered_rowset() const = 0;
virtual const RowSet& buffered_rowset() const = 0;
/*! \brief virtual destructor */
virtual ~DMatrix() {}
/*!
@ -290,9 +325,48 @@ class DMatrix {
LearnerImpl* cache_learner_ptr_;
};
// implementation of inline functions
inline bst_uint RowSet::operator[](size_t i) const {
return rows_.size() == 0 ? i : rows_[i];
}
inline size_t RowSet::size() const {
return size_;
}
inline void RowSet::clear() {
rows_.clear(); size_ = 0;
}
inline void RowSet::push_back(bst_uint i) {
if (rows_.size() == 0) {
if (i == size_) {
++size_; return;
} else {
rows_.resize(size_);
for (size_t i = 0; i < size_; ++i) {
rows_[i] = static_cast<bst_uint>(i);
}
}
}
rows_.push_back(i);
++size_;
}
inline void RowSet::Save(dmlc::Stream* fo) const {
fo->Write(rows_);
fo->Write(&size_, sizeof(size_));
}
inline bool RowSet::Load(dmlc::Stream* fi) {
if (!fi->Read(&rows_)) return false;
if (rows_.size() != 0) return true;
return fi->Read(&size_, sizeof(size_)) == sizeof(size_);
}
} // namespace xgboost
namespace dmlc {
DMLC_DECLARE_TRAITS(is_pod, xgboost::SparseBatch::Entry, true);
DMLC_DECLARE_TRAITS(has_saveload, xgboost::RowSet, true);
}
#endif // XGBOOST_DATA_H_

View File

@ -184,9 +184,7 @@ void SimpleDMatrix::MakeManyBatch(const std::vector<bool>& enabled,
}
if (tmp.Size() >= max_row_perbatch) {
std::unique_ptr<SparsePage> page(new SparsePage());
this->MakeColPage(tmp.GetRowBatch(0),
dmlc::BeginPtr(buffered_rowset_) + btop,
enabled, page.get());
this->MakeColPage(tmp.GetRowBatch(0), btop, enabled, page.get());
col_iter_.cpages_.push_back(std::move(page));
btop = buffered_rowset_.size();
tmp.Clear();
@ -196,16 +194,14 @@ void SimpleDMatrix::MakeManyBatch(const std::vector<bool>& enabled,
if (tmp.Size() != 0) {
std::unique_ptr<SparsePage> page(new SparsePage());
this->MakeColPage(tmp.GetRowBatch(0),
dmlc::BeginPtr(buffered_rowset_) + btop,
enabled, page.get());
this->MakeColPage(tmp.GetRowBatch(0), btop, enabled, page.get());
col_iter_.cpages_.push_back(std::move(page));
}
}
// make column page from subset of rowbatchs
void SimpleDMatrix::MakeColPage(const RowBatch& batch,
const bst_uint* ridx,
size_t buffer_begin,
const std::vector<bool>& enabled,
SparsePage* pcol) {
int nthread;
@ -240,9 +236,10 @@ void SimpleDMatrix::MakeColPage(const RowBatch& batch,
RowBatch::Inst inst = batch[i];
for (bst_uint j = 0; j < inst.length; ++j) {
const SparseBatch::Entry &e = inst[j];
builder.Push(e.index,
SparseBatch::Entry(ridx[i], e.fvalue),
tid);
builder.Push(
e.index,
SparseBatch::Entry(buffered_rowset_[i + buffer_begin], e.fvalue),
tid);
}
}
CHECK_EQ(pcol->Size(), info().num_col);

View File

@ -40,7 +40,7 @@ class SimpleDMatrix : public DMatrix {
return col_size_.size() != 0;
}
const std::vector<bst_uint>& buffered_rowset() const override {
const RowSet& buffered_rowset() const override {
return buffered_rowset_;
}
@ -96,7 +96,7 @@ class SimpleDMatrix : public DMatrix {
// column iterator
ColBatchIter col_iter_;
// list of row index that are buffered.
std::vector<bst_uint> buffered_rowset_;
RowSet buffered_rowset_;
/*! \brief sizeof column data */
std::vector<size_t> col_size_;
@ -110,7 +110,7 @@ class SimpleDMatrix : public DMatrix {
size_t max_row_perbatch);
void MakeColPage(const RowBatch& batch,
const bst_uint* ridx,
size_t buffer_begin,
const std::vector<bool>& enabled,
SparsePage* pcol);
};

View File

@ -165,10 +165,10 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
// function to create the page.
auto make_col_batch = [&] (
const SparsePage& prow,
const bst_uint* ridx,
size_t begin,
SparsePage *pcol) {
pcol->Clear();
pcol->min_index = ridx[0];
pcol->min_index = buffered_rowset_[begin];
int nthread;
#pragma omp parallel
{
@ -196,7 +196,7 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
for (size_t j = prow.offset[i]; j < prow.offset[i+1]; ++j) {
const SparseBatch::Entry &e = prow.data[j];
builder.Push(e.index,
SparseBatch::Entry(ridx[i], e.fvalue),
SparseBatch::Entry(buffered_rowset_[i + begin], e.fvalue),
tid);
}
}
@ -230,7 +230,7 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
if (tmp.Size() >= max_row_perbatch ||
tmp.MemCostBytes() >= kPageSize) {
make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr);
make_col_batch(tmp, btop, dptr);
batch_ptr = i + 1;
return true;
}
@ -243,7 +243,7 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
}
if (tmp.Size() != 0) {
make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr);
make_col_batch(tmp, btop, dptr);
return true;
} else {
return false;

View File

@ -44,7 +44,7 @@ class SparsePageDMatrix : public DMatrix {
return col_iter_.get() != nullptr;
}
const std::vector<bst_uint>& buffered_rowset() const override {
const RowSet& buffered_rowset() const override {
return buffered_rowset_;
}
@ -120,7 +120,7 @@ class SparsePageDMatrix : public DMatrix {
// the cache prefix
std::string cache_info_;
/*! \brief list of row index that are buffered */
std::vector<bst_uint> buffered_rowset_;
RowSet buffered_rowset_;
// count for column data
std::vector<size_t> col_size_;
// internal column iter.

View File

@ -109,7 +109,7 @@ class GBLinear : public GradientBooster {
std::vector<bst_gpair> &gpair = *in_gpair;
const int ngroup = model.param.num_output_group;
const std::vector<bst_uint> &rowset = p_fmat->buffered_rowset();
const RowSet &rowset = p_fmat->buffered_rowset();
// for all the output group
for (int gid = 0; gid < ngroup; ++gid) {
double sum_grad = 0.0, sum_hess = 0.0;

View File

@ -325,7 +325,7 @@ class GBTree : public GradientBooster {
int bst_group,
const RegTree &new_tree,
const int* leaf_position) {
const std::vector<bst_uint>& rowset = p_fmat->buffered_rowset();
const RowSet& rowset = p_fmat->buffered_rowset();
const bst_omp_uint ndata = static_cast<bst_omp_uint>(rowset.size());
#pragma omp parallel for schedule(static)
for (bst_omp_uint i = 0; i < ndata; ++i) {

View File

@ -207,7 +207,7 @@ class BaseMaker: public TreeUpdater {
// set the positions in the nondefault
this->SetNonDefaultPositionCol(nodes, p_fmat, tree);
// set rest of instances to default position
const std::vector<bst_uint> &rowset = p_fmat->buffered_rowset();
const RowSet &rowset = p_fmat->buffered_rowset();
// set default direct nodes to default
// for leaf nodes that are not fresh, mark then to ~nid,
// so that they are ignored in future statistics collection
@ -297,7 +297,7 @@ class BaseMaker: public TreeUpdater {
thread_temp[tid][nid].Clear();
}
}
const std::vector<bst_uint> &rowset = fmat.buffered_rowset();
const RowSet &rowset = fmat.buffered_rowset();
// setup position
const bst_omp_uint ndata = static_cast<bst_omp_uint>(rowset.size());
#pragma omp parallel for schedule(static)

View File

@ -117,7 +117,7 @@ class ColMaker: public TreeUpdater {
CHECK_EQ(tree.param.num_nodes, tree.param.num_roots)
<< "ColMaker: can only grow new tree";
const std::vector<unsigned>& root_index = fmat.info().root_index;
const std::vector<bst_uint>& rowset = fmat.buffered_rowset();
const RowSet& rowset = fmat.buffered_rowset();
{
// setup position
position.resize(gpair.size());
@ -200,7 +200,7 @@ class ColMaker: public TreeUpdater {
}
snode.resize(tree.param.num_nodes, NodeEntry(param));
}
const std::vector<bst_uint> &rowset = fmat.buffered_rowset();
const RowSet &rowset = fmat.buffered_rowset();
const MetaInfo& info = fmat.info();
// setup position
const bst_omp_uint ndata = static_cast<bst_omp_uint>(rowset.size());
@ -620,7 +620,7 @@ class ColMaker: public TreeUpdater {
// set the positions in the nondefault
this->SetNonDefaultPosition(qexpand, p_fmat, tree);
// set rest of instances to default position
const std::vector<bst_uint> &rowset = p_fmat->buffered_rowset();
const RowSet &rowset = p_fmat->buffered_rowset();
// set default direct nodes to default
// for leaf nodes that are not fresh, mark then to ~nid,
// so that they are ignored in future statistics collection
@ -761,7 +761,7 @@ class DistColMaker : public ColMaker<TStats> {
: ColMaker<TStats>::Builder(param) {
}
inline void UpdatePosition(DMatrix* p_fmat, const RegTree &tree) {
const std::vector<bst_uint> &rowset = p_fmat->buffered_rowset();
const RowSet &rowset = p_fmat->buffered_rowset();
const bst_omp_uint ndata = static_cast<bst_omp_uint>(rowset.size());
#pragma omp parallel for schedule(static)
for (bst_omp_uint i = 0; i < ndata; ++i) {
@ -831,7 +831,7 @@ class DistColMaker : public ColMaker<TStats> {
bitmap.InitFromBool(boolmap);
// communicate bitmap
rabit::Allreduce<rabit::op::BitOR>(dmlc::BeginPtr(bitmap.data), bitmap.data.size());
const std::vector<bst_uint> &rowset = p_fmat->buffered_rowset();
const RowSet &rowset = p_fmat->buffered_rowset();
// get the new position
const bst_omp_uint ndata = static_cast<bst_omp_uint>(rowset.size());
#pragma omp parallel for schedule(static)