check pipe, commit optimization for hist

This commit is contained in:
tqchen 2014-11-20 11:22:09 -08:00
parent 6b674b491f
commit 974202eb55
5 changed files with 182 additions and 85 deletions

View File

@ -15,4 +15,4 @@ Notes
* The code is multi-threaded, so you want to run one xgboost-mpi per node
* Row-based solver split data by row, each node work on subset of rows, it uses an approximate histogram count algorithm,
and will only examine subset of potential split points as opposed to all split points.
* ```colsample_bytree``` is not enabled in row split mode so far

View File

@ -14,7 +14,6 @@ gamma = 1.0
min_child_weight = 1
# maximum depth of a tree
max_depth = 3
# Task parameters
# the number of round to do boosting
num_round = 2

View File

@ -12,7 +12,7 @@ k=$1
python splitrows.py ../../demo/data/agaricus.txt.train train $k
# run xgboost mpi
mpirun -n $k ../../xgboost-mpi mushroom-row.conf dsplit=row nthread=1
mpirun -n $k ../../xgboost-mpi mushroom-row.conf dsplit=row nthread=1
# the model can be directly loaded by single machine xgboost solver, as usuall
../../xgboost mushroom-row.conf task=dump model_in=0002.model fmap=../../demo/data/featmap.txt name_dump=dump.nice.$k.txt

View File

@ -7,6 +7,7 @@
*/
#include <vector>
#include <algorithm>
#include <limits>
#include "../utils/random.h"
#include "../utils/quantile.h"
@ -24,8 +25,73 @@ class BaseMaker: public IUpdater {
virtual void SetParam(const char *name, const char *val) {
param.SetParam(name, val);
}
protected:
// helper to collect and query feature meta information
struct FMetaHelper {
public:
/*! \brief find type of each feature, use column format */
inline void InitByCol(IFMatrix *p_fmat,
const RegTree &tree) {
fminmax.resize(tree.param.num_feature * 2);
std::fill(fminmax.begin(), fminmax.end(),
-std::numeric_limits<bst_float>::max());
// start accumulating statistics
utils::IIterator<ColBatch> *iter = p_fmat->ColIterator();
iter->BeforeFirst();
while (iter->Next()) {
const ColBatch &batch = iter->Value();
for (bst_uint i = 0; i < batch.size; ++i) {
const bst_uint fid = batch.col_index[i];
const ColBatch::Inst &c = batch[i];
if (c.length != 0) {
fminmax[fid * 2 + 0] = std::max(-c[0].fvalue, fminmax[fid * 2 + 0]);
fminmax[fid * 2 + 1] = std::max(c[c.length - 1].fvalue, fminmax[fid * 2 + 1]);
}
}
}
sync::AllReduce(BeginPtr(fminmax), fminmax.size(), sync::kMax);
}
// get feature type, 0:empty 1:binary 2:real
inline int Type(bst_uint fid) const {
utils::Assert(fid * 2 + 1 < fminmax.size(),
"FeatHelper fid exceed query bound ");
bst_float a = fminmax[fid * 2];
bst_float b = fminmax[fid * 2 + 1];
if (a == -std::numeric_limits<bst_float>::max()) return 0;
if (-a == b) return 1;
else return 2;
}
inline bst_float MaxValue(bst_uint fid) const {
return fminmax[fid *2 + 1];
}
inline void SampleCol(float p, std::vector<bst_uint> *p_findex) const {
std::vector<bst_uint> &findex = *p_findex;
findex.clear();
for (size_t i = 0; i < fminmax.size(); i += 2) {
if (this->Type(i / 2) != 0) findex.push_back(i / 2);
}
unsigned n = static_cast<unsigned>(p * findex.size());
random::Shuffle(findex);
findex.resize(n);
if (n != findex.size()) {
// sync the findex if it is subsample
std::string s_cache;
utils::MemoryBufferStream fc(&s_cache);
utils::IStream &fs = fc;
if (sync::GetRank() == 0) {
fs.Write(findex);
sync::Bcast(&s_cache, 0);
} else {
sync::Bcast(&s_cache, 0);
fs.Read(&findex);
}
}
}
private:
std::vector<bst_float> fminmax;
};
// ------static helper functions ------
// helper function to get to next level of the tree
/*! \brief this is helper function for row based data*/

View File

@ -118,19 +118,22 @@ class HistMaker: public BaseMaker {
ThreadWSpace wspace;
// reducer for histogram
sync::Reducer<TStats> histred;
// set of working features
std::vector<bst_uint> fwork_set;
// update function implementation
virtual void Update(const std::vector<bst_gpair> &gpair,
IFMatrix *p_fmat,
const BoosterInfo &info,
RegTree *p_tree) {
this->InitData(gpair, *p_fmat, info.root_index, *p_tree);
this->InitWorkSet(p_fmat, *p_tree, &fwork_set);
for (int depth = 0; depth < param.max_depth; ++depth) {
// reset and propose candidate split
this->ResetPosAndPropose(gpair, p_fmat, info, *p_tree);
this->ResetPosAndPropose(gpair, p_fmat, info, fwork_set, *p_tree);
// create histogram
this->CreateHist(gpair, p_fmat, info, *p_tree);
this->CreateHist(gpair, p_fmat, info, fwork_set, *p_tree);
// find split based on histogram statistics
this->FindSplit(depth, gpair, p_fmat, info, p_tree);
this->FindSplit(depth, gpair, p_fmat, info, fwork_set, p_tree);
// reset position after split
this->ResetPositionAfterSplit(p_fmat, *p_tree);
this->UpdateQueueExpand(*p_tree);
@ -148,7 +151,17 @@ class HistMaker: public BaseMaker {
virtual void ResetPosAndPropose(const std::vector<bst_gpair> &gpair,
IFMatrix *p_fmat,
const BoosterInfo &info,
const RegTree &tree) = 0;
const std::vector <bst_uint> &fset,
const RegTree &tree) = 0;
// initialize the current working set of features in this round
virtual void InitWorkSet(IFMatrix *p_fmat,
const RegTree &tree,
std::vector<bst_uint> *p_fset) {
p_fset->resize(tree.param.num_feature);
for (size_t i = 0; i < p_fset->size(); ++i) {
(*p_fset)[i] = i;
}
}
// reset position after split, this is not a must, depending on implementation
virtual void ResetPositionAfterSplit(IFMatrix *p_fmat,
const RegTree &tree) {
@ -156,45 +169,8 @@ class HistMaker: public BaseMaker {
virtual void CreateHist(const std::vector<bst_gpair> &gpair,
IFMatrix *p_fmat,
const BoosterInfo &info,
const RegTree &tree) {
bst_uint num_feature = tree.param.num_feature;
// intialize work space
wspace.Init(param, this->get_nthread());
// start accumulating statistics
utils::IIterator<RowBatch> *iter = p_fmat->RowIterator();
iter->BeforeFirst();
while (iter->Next()) {
const RowBatch &batch = iter->Value();
utils::Check(batch.size < std::numeric_limits<unsigned>::max(),
"too large batch size ");
const bst_omp_uint nbatch = static_cast<bst_omp_uint>(batch.size);
#pragma omp parallel for schedule(static)
for (bst_omp_uint i = 0; i < nbatch; ++i) {
RowBatch::Inst inst = batch[i];
const int tid = omp_get_thread_num();
HistSet &hset = wspace.hset[tid];
const bst_uint ridx = static_cast<bst_uint>(batch.base_rowid + i);
int nid = position[ridx];
if (nid >= 0) {
const int wid = this->node2workindex[nid];
for (bst_uint i = 0; i < inst.length; ++i) {
utils::Assert(inst[i].index < num_feature, "feature index exceed bound");
// feature histogram
hset[inst[i].index + wid * (num_feature+1)]
.Add(inst[i].fvalue, gpair, info, ridx);
}
// node histogram, use num_feature to borrow space
hset[num_feature + wid * (num_feature + 1)]
.data[0].Add(gpair, info, ridx);
}
}
}
// accumulating statistics together
wspace.Aggregate();
// sync the histogram
histred.AllReduce(BeginPtr(wspace.hset[0].data), wspace.hset[0].data.size());
}
const std::vector <bst_uint> &fset,
const RegTree &tree) = 0;
private:
inline void EnumerateSplit(const HistUnit &hist,
const TStats &node_sum,
@ -235,8 +211,9 @@ class HistMaker: public BaseMaker {
const std::vector<bst_gpair> &gpair,
IFMatrix *p_fmat,
const BoosterInfo &info,
const std::vector <bst_uint> &fset,
RegTree *p_tree) {
const bst_uint num_feature = p_tree->param.num_feature;
const size_t num_feature = fset.size();
// get the best split condition for each node
std::vector<SplitEntry> sol(qexpand.size());
std::vector<TStats> left_sum(qexpand.size());
@ -248,9 +225,9 @@ class HistMaker: public BaseMaker {
"node2workindex inconsistent");
SplitEntry &best = sol[wid];
TStats &node_sum = wspace.hset[0][num_feature + wid * (num_feature + 1)].data[0];
for (bst_uint fid = 0; fid < num_feature; ++ fid) {
EnumerateSplit(this->wspace.hset[0][fid + wid * (num_feature+1)],
node_sum, fid, &best, &left_sum[wid]);
for (size_t i = 0; i < fset.size(); ++ i) {
EnumerateSplit(this->wspace.hset[0][i + wid * (num_feature+1)],
node_sum, fset[i], &best, &left_sum[wid]);
}
}
// get the best result, we can synchronize the solution
@ -306,15 +283,32 @@ class CQHistMaker: public HistMaker<TStats> {
hist.data[istart].Add(gpair, info, ridx);
}
};
// sketch type used for this
typedef utils::WXQuantileSketch<bst_float, bst_float> WXQSketch;
// initialize the work set of tree
virtual void InitWorkSet(IFMatrix *p_fmat,
const RegTree &tree,
std::vector<bst_uint> *p_fset) {
feat_helper.InitByCol(p_fmat, tree);
feat_helper.SampleCol(this->param.colsample_bytree, p_fset);
}
// code to create histogram
virtual void CreateHist(const std::vector<bst_gpair> &gpair,
IFMatrix *p_fmat,
const BoosterInfo &info,
const std::vector<bst_uint> &fset,
const RegTree &tree) {
// fill in reverse map
feat2workindex.resize(tree.param.num_feature);
std::fill(feat2workindex.begin(), feat2workindex.end(), -1);
for (size_t i = 0; i < fset.size(); ++i) {
feat2workindex[fset[i]] = static_cast<int>(i);
}
// start to work
this->wspace.Init(this->param, 1);
thread_hist.resize(this->get_nthread());
// start accumulating statistics
utils::IIterator<ColBatch> *iter = p_fmat->ColIterator();
utils::IIterator<ColBatch> *iter = p_fmat->ColIterator(fset);
iter->BeforeFirst();
while (iter->Next()) {
const ColBatch &batch = iter->Value();
@ -322,15 +316,18 @@ class CQHistMaker: public HistMaker<TStats> {
const bst_omp_uint nsize = static_cast<bst_omp_uint>(batch.size);
#pragma omp parallel for schedule(dynamic, 1)
for (bst_omp_uint i = 0; i < nsize; ++i) {
this->UpdateHistCol(gpair, batch[i], info, tree,
batch.col_index[i],
&thread_hist[omp_get_thread_num()]);
int offset = feat2workindex[batch.col_index[i]];
if (offset >= 0) {
this->UpdateHistCol(gpair, batch[i], info, tree,
fset, offset,
&thread_hist[omp_get_thread_num()]);
}
}
}
for (size_t i = 0; i < this->qexpand.size(); ++i) {
const int nid = this->qexpand[i];
const int wid = this->node2workindex[nid];
this->wspace.hset[0][tree.param.num_feature + wid * (tree.param.num_feature+1)]
this->wspace.hset[0][fset.size() + wid * (fset.size()+1)]
.data[0] = node_stats[nid];
}
// sync the histogram
@ -343,10 +340,24 @@ class CQHistMaker: public HistMaker<TStats> {
virtual void ResetPosAndPropose(const std::vector<bst_gpair> &gpair,
IFMatrix *p_fmat,
const BoosterInfo &info,
const std::vector<bst_uint> &fset,
const RegTree &tree) {
// fill in reverse map
feat2workindex.resize(tree.param.num_feature);
std::fill(feat2workindex.begin(), feat2workindex.end(), -1);
freal_set.clear();
for (size_t i = 0; i < fset.size(); ++i) {
if (feat_helper.Type(fset[i]) == 2) {
feat2workindex[fset[i]] = static_cast<int>(freal_set.size());
freal_set.push_back(fset[i]);
} else {
feat2workindex[fset[i]] = -2;
}
}
this->GetNodeStats(gpair, *p_fmat, tree, info,
&thread_stats, &node_stats);
sketchs.resize(this->qexpand.size() * tree.param.num_feature);
sketchs.resize(this->qexpand.size() * freal_set.size());
for (size_t i = 0; i < sketchs.size(); ++i) {
sketchs[i].Init(info.num_row, this->param.sketch_eps);
}
@ -354,7 +365,7 @@ class CQHistMaker: public HistMaker<TStats> {
// number of rows in
const size_t nrows = p_fmat->buffered_rowset().size();
// start accumulating statistics
utils::IIterator<ColBatch> *iter = p_fmat->ColIterator();
utils::IIterator<ColBatch> *iter = p_fmat->ColIterator(freal_set);
iter->BeforeFirst();
while (iter->Next()) {
const ColBatch &batch = iter->Value();
@ -362,11 +373,14 @@ class CQHistMaker: public HistMaker<TStats> {
const bst_omp_uint nsize = static_cast<bst_omp_uint>(batch.size);
#pragma omp parallel for schedule(dynamic, 1)
for (bst_omp_uint i = 0; i < nsize; ++i) {
this->UpdateSketchCol(gpair, batch[i], tree,
node_stats,
batch.col_index[i],
batch[i].length == nrows,
&thread_sketch[omp_get_thread_num()]);
int offset = feat2workindex[batch.col_index[i]];
if (offset >= 0) {
this->UpdateSketchCol(gpair, batch[i], tree,
node_stats,
freal_set, offset,
batch[i].length == nrows,
&thread_sketch[omp_get_thread_num()]);
}
}
}
// setup maximum size
@ -379,36 +393,46 @@ class CQHistMaker: public HistMaker<TStats> {
summary_array[i].Reserve(max_size);
summary_array[i].SetPrune(out, max_size);
}
size_t n4bytes = (WXQSketch::SummaryContainer::CalcMemCost(max_size) + 3) / 4;
sreducer.AllReduce(BeginPtr(summary_array), n4bytes, summary_array.size());
if (summary_array.size() != 0) {
size_t n4bytes = (WXQSketch::SummaryContainer::CalcMemCost(max_size) + 3) / 4;
sreducer.AllReduce(BeginPtr(summary_array), n4bytes, summary_array.size());
}
// now we get the final result of sketch, setup the cut
this->wspace.cut.clear();
this->wspace.rptr.clear();
this->wspace.rptr.push_back(0);
for (size_t wid = 0; wid < this->qexpand.size(); ++wid) {
for (int fid = 0; fid < tree.param.num_feature; ++fid) {
const WXQSketch::Summary &a = summary_array[wid * tree.param.num_feature + fid];
for (size_t i = 1; i < a.size; ++i) {
bst_float cpt = a.data[i].value - rt_eps;
if (i == 1 || cpt > this->wspace.cut.back()) {
this->wspace.cut.push_back(cpt);
for (size_t i = 0; i < fset.size(); ++i) {
int offset = feat2workindex[fset[i]];
if (offset >= 0) {
const WXQSketch::Summary &a = summary_array[wid * freal_set.size() + offset];
for (size_t i = 1; i < a.size; ++i) {
bst_float cpt = a.data[i].value - rt_eps;
if (i == 1 || cpt > this->wspace.cut.back()) {
this->wspace.cut.push_back(cpt);
}
}
// push a value that is greater than anything
if (a.size != 0) {
bst_float cpt = a.data[a.size - 1].value;
// this must be bigger than last value in a scale
bst_float last = cpt + fabs(cpt) + rt_eps;
this->wspace.cut.push_back(last);
}
this->wspace.rptr.push_back(this->wspace.cut.size());
} else {
utils::Assert(offset == -2, "BUG in mark");
bst_float cpt = feat_helper.MaxValue(fset[i]);
this->wspace.cut.push_back(cpt + fabs(cpt) + rt_eps);
this->wspace.rptr.push_back(this->wspace.cut.size());
}
// push a value that is greater than anything
if (a.size != 0) {
bst_float cpt = a.data[a.size - 1].value;
// this must be bigger than last value in a scale
bst_float last = cpt + fabs(cpt) + rt_eps;
this->wspace.cut.push_back(last);
}
this->wspace.rptr.push_back(this->wspace.cut.size());
}
// reserve last value for global statistics
this->wspace.cut.push_back(0.0f);
this->wspace.rptr.push_back(this->wspace.cut.size());
}
utils::Assert(this->wspace.rptr.size() ==
(tree.param.num_feature + 1) * this->qexpand.size() + 1,
(fset.size() + 1) * this->qexpand.size() + 1,
"cut space inconsistent");
}
@ -417,7 +441,8 @@ class CQHistMaker: public HistMaker<TStats> {
const ColBatch::Inst &c,
const BoosterInfo &info,
const RegTree &tree,
bst_uint fid,
const std::vector<bst_uint> &fset,
bst_uint fid_offset,
std::vector<HistEntry> *p_temp) {
if (c.length == 0) return;
// initialize sbuilder for use
@ -427,7 +452,7 @@ class CQHistMaker: public HistMaker<TStats> {
const unsigned nid = this->qexpand[i];
const unsigned wid = this->node2workindex[nid];
hbuilder[nid].istart = 0;
hbuilder[nid].hist = this->wspace.hset[0][fid + wid * (tree.param.num_feature+1)];
hbuilder[nid].hist = this->wspace.hset[0][fid_offset + wid * (fset.size()+1)];
}
for (bst_uint j = 0; j < c.length; ++j) {
const bst_uint ridx = c[j].index;
@ -441,7 +466,8 @@ class CQHistMaker: public HistMaker<TStats> {
const ColBatch::Inst &c,
const RegTree &tree,
const std::vector<TStats> &nstats,
bst_uint fid,
const std::vector<bst_uint> &frealset,
bst_uint offset,
bool col_full,
std::vector<BaseMaker::SketchEntry> *p_temp) {
if (c.length == 0) return;
@ -452,7 +478,7 @@ class CQHistMaker: public HistMaker<TStats> {
const unsigned nid = this->qexpand[i];
const unsigned wid = this->node2workindex[nid];
sbuilder[nid].sum_total = 0.0f;
sbuilder[nid].sketch = &sketchs[wid * tree.param.num_feature + fid];
sbuilder[nid].sketch = &sketchs[wid * frealset.size() + offset];
}
if (!col_full) {
@ -497,7 +523,12 @@ class CQHistMaker: public HistMaker<TStats> {
sbuilder[nid].Finalize(max_size);
}
}
// feature helper
BaseMaker::FMetaHelper feat_helper;
// temp space to map feature id to working index
std::vector<int> feat2workindex;
// set of index from fset that are real
std::vector<bst_uint> freal_set;
// thread temp data
std::vector< std::vector<BaseMaker::SketchEntry> > thread_sketch;
// used to hold statistics
@ -521,6 +552,7 @@ class QuantileHistMaker: public HistMaker<TStats> {
virtual void ResetPosAndPropose(const std::vector<bst_gpair> &gpair,
IFMatrix *p_fmat,
const BoosterInfo &info,
const std::vector <bst_uint> &fset,
const RegTree &tree) {
// initialize the data structure
int nthread = BaseMaker::get_nthread();