Move prediction cache to Learner. (#5220)
* Move prediction cache into Learner. * Clean-ups - Remove duplicated cache in Learner and GBM. - Remove ad-hoc fix of invalid cache. - Remove `PredictFromCache` in predictors. - Remove prediction cache for linear altogether, as it's only moving the prediction into training process but doesn't provide any actual overall speed gain. - The cache is now unique to Learner, which means the ownership is no longer shared by any other components. * Changes - Add version to prediction cache. - Use weak ptr to check expired DMatrix. - Pass shared pointer instead of raw pointer.
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright by Contributors 2017-2019
|
||||
* Copyright by Contributors 2017-2020
|
||||
*/
|
||||
#include <dmlc/omp.h>
|
||||
|
||||
@@ -46,9 +46,9 @@ class CPUPredictor : public Predictor {
|
||||
}
|
||||
}
|
||||
|
||||
void PredLoopInternal(DMatrix* p_fmat, std::vector<bst_float>* out_preds,
|
||||
gbm::GBTreeModel const& model, int32_t tree_begin,
|
||||
int32_t tree_end) {
|
||||
void PredInternal(DMatrix *p_fmat, std::vector<bst_float> *out_preds,
|
||||
gbm::GBTreeModel const &model, int32_t tree_begin,
|
||||
int32_t tree_end) {
|
||||
int32_t const num_group = model.learner_model_param_->num_output_group;
|
||||
const int nthread = omp_get_max_threads();
|
||||
InitThreadTemp(nthread, model.learner_model_param_->num_feature);
|
||||
@@ -102,27 +102,6 @@ class CPUPredictor : public Predictor {
|
||||
}
|
||||
}
|
||||
|
||||
bool PredictFromCache(DMatrix* dmat,
|
||||
HostDeviceVector<bst_float>* out_preds,
|
||||
const gbm::GBTreeModel& model,
|
||||
unsigned ntree_limit) const {
|
||||
CHECK(cache_);
|
||||
if (ntree_limit == 0 ||
|
||||
ntree_limit * model.learner_model_param_->num_output_group >= model.trees.size()) {
|
||||
auto it = cache_->find(dmat);
|
||||
if (it != cache_->end()) {
|
||||
const HostDeviceVector<bst_float>& y = it->second.predictions;
|
||||
if (y.Size() != 0) {
|
||||
out_preds->Resize(y.Size());
|
||||
std::copy(y.HostVector().begin(), y.HostVector().end(),
|
||||
out_preds->HostVector().begin());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void InitOutPredictions(const MetaInfo& info,
|
||||
HostDeviceVector<bst_float>* out_preds,
|
||||
const gbm::GBTreeModel& model) const {
|
||||
@@ -156,60 +135,78 @@ class CPUPredictor : public Predictor {
|
||||
}
|
||||
|
||||
public:
|
||||
CPUPredictor(GenericParameter const* generic_param,
|
||||
std::shared_ptr<std::unordered_map<DMatrix*, PredictionCacheEntry>> cache) :
|
||||
Predictor::Predictor{generic_param, cache} {}
|
||||
void PredictBatch(DMatrix* dmat, HostDeviceVector<bst_float>* out_preds,
|
||||
explicit CPUPredictor(GenericParameter const* generic_param) :
|
||||
Predictor::Predictor{generic_param} {}
|
||||
// ntree_limit is a very problematic parameter, as it's ambiguous in the context of
|
||||
// multi-output and forest. Same problem exists for tree_begin
|
||||
void PredictBatch(DMatrix* dmat, PredictionCacheEntry* predts,
|
||||
const gbm::GBTreeModel& model, int tree_begin,
|
||||
unsigned ntree_limit = 0) override {
|
||||
if (this->PredictFromCache(dmat, out_preds, model, ntree_limit)) {
|
||||
return;
|
||||
}
|
||||
this->InitOutPredictions(dmat->Info(), out_preds, model);
|
||||
|
||||
ntree_limit *= model.learner_model_param_->num_output_group;
|
||||
if (ntree_limit == 0 || ntree_limit > model.trees.size()) {
|
||||
ntree_limit = static_cast<unsigned>(model.trees.size());
|
||||
uint32_t const ntree_limit = 0) override {
|
||||
// tree_begin is not used, right now we just enforce it to be 0.
|
||||
CHECK_EQ(tree_begin, 0);
|
||||
auto* out_preds = &predts->predictions;
|
||||
CHECK_GE(predts->version, tree_begin);
|
||||
if (predts->version == 0) {
|
||||
CHECK_EQ(out_preds->Size(), 0);
|
||||
this->InitOutPredictions(dmat->Info(), out_preds, model);
|
||||
}
|
||||
|
||||
this->PredLoopInternal(dmat, &out_preds->HostVector(), model,
|
||||
tree_begin, ntree_limit);
|
||||
uint32_t const output_groups = model.learner_model_param_->num_output_group;
|
||||
CHECK_NE(output_groups, 0);
|
||||
// Right now we just assume ntree_limit provided by users means number of tree layers
|
||||
// in the context of multi-output model
|
||||
uint32_t real_ntree_limit = ntree_limit * output_groups;
|
||||
if (real_ntree_limit == 0 || real_ntree_limit > model.trees.size()) {
|
||||
real_ntree_limit = static_cast<uint32_t>(model.trees.size());
|
||||
}
|
||||
|
||||
auto cache_entry = this->FindCache(dmat);
|
||||
if (cache_entry == cache_->cend()) {
|
||||
return;
|
||||
uint32_t const end_version = (tree_begin + real_ntree_limit) / output_groups;
|
||||
// When users have provided ntree_limit, end_version can be lesser, cache is violated
|
||||
if (predts->version > end_version) {
|
||||
CHECK_NE(ntree_limit, 0);
|
||||
this->InitOutPredictions(dmat->Info(), out_preds, model);
|
||||
predts->version = 0;
|
||||
}
|
||||
if (cache_entry->second.predictions.Size() == 0) {
|
||||
// See comment in GPUPredictor::PredictBatch.
|
||||
InitOutPredictions(cache_entry->second.data->Info(),
|
||||
&(cache_entry->second.predictions), model);
|
||||
cache_entry->second.predictions.Copy(*out_preds);
|
||||
uint32_t const beg_version = predts->version;
|
||||
CHECK_LE(beg_version, end_version);
|
||||
|
||||
if (beg_version < end_version) {
|
||||
this->PredInternal(dmat, &out_preds->HostVector(), model,
|
||||
beg_version * output_groups,
|
||||
end_version * output_groups);
|
||||
}
|
||||
|
||||
// delta means {size of forest} * {number of newly accumulated layers}
|
||||
uint32_t delta = end_version - beg_version;
|
||||
CHECK_LE(delta, model.trees.size());
|
||||
predts->Update(delta);
|
||||
|
||||
CHECK(out_preds->Size() == output_groups * dmat->Info().num_row_ ||
|
||||
out_preds->Size() == dmat->Info().num_row_);
|
||||
}
|
||||
|
||||
void UpdatePredictionCache(
|
||||
const gbm::GBTreeModel& model,
|
||||
std::vector<std::unique_ptr<TreeUpdater>>* updaters,
|
||||
int num_new_trees) override {
|
||||
int num_new_trees,
|
||||
DMatrix* m,
|
||||
PredictionCacheEntry* predts) override {
|
||||
int old_ntree = model.trees.size() - num_new_trees;
|
||||
// update cache entry
|
||||
for (auto& kv : (*cache_)) {
|
||||
PredictionCacheEntry& e = kv.second;
|
||||
|
||||
if (e.predictions.Size() == 0) {
|
||||
InitOutPredictions(e.data->Info(), &(e.predictions), model);
|
||||
PredLoopInternal(e.data.get(), &(e.predictions.HostVector()), model, 0,
|
||||
model.trees.size());
|
||||
} else if (model.learner_model_param_->num_output_group == 1 && updaters->size() > 0 &&
|
||||
num_new_trees == 1 &&
|
||||
updaters->back()->UpdatePredictionCache(e.data.get(),
|
||||
&(e.predictions))) {
|
||||
{} // do nothing
|
||||
} else {
|
||||
PredLoopInternal(e.data.get(), &(e.predictions.HostVector()), model, old_ntree,
|
||||
model.trees.size());
|
||||
}
|
||||
auto* out = &predts->predictions;
|
||||
if (predts->predictions.Size() == 0) {
|
||||
this->InitOutPredictions(m->Info(), out, model);
|
||||
this->PredInternal(m, &out->HostVector(), model, 0, model.trees.size());
|
||||
} else if (model.learner_model_param_->num_output_group == 1 &&
|
||||
updaters->size() > 0 &&
|
||||
num_new_trees == 1 &&
|
||||
updaters->back()->UpdatePredictionCache(m, out)) {
|
||||
{}
|
||||
} else {
|
||||
PredInternal(m, &out->HostVector(), model, old_ntree, model.trees.size());
|
||||
}
|
||||
auto delta = num_new_trees / model.learner_model_param_->num_output_group;
|
||||
predts->Update(delta);
|
||||
}
|
||||
|
||||
void PredictInstance(const SparsePage::Inst& inst,
|
||||
@@ -387,9 +384,8 @@ class CPUPredictor : public Predictor {
|
||||
|
||||
XGBOOST_REGISTER_PREDICTOR(CPUPredictor, "cpu_predictor")
|
||||
.describe("Make predictions using CPU.")
|
||||
.set_body([](GenericParameter const* generic_param,
|
||||
std::shared_ptr<std::unordered_map<DMatrix*, PredictionCacheEntry>> cache) {
|
||||
return new CPUPredictor(generic_param, cache);
|
||||
.set_body([](GenericParameter const* generic_param) {
|
||||
return new CPUPredictor(generic_param);
|
||||
});
|
||||
} // namespace predictor
|
||||
} // namespace xgboost
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*!
|
||||
* Copyright 2017-2018 by Contributors
|
||||
* Copyright 2017-2020 by Contributors
|
||||
*/
|
||||
#include <thrust/copy.h>
|
||||
#include <thrust/device_ptr.h>
|
||||
@@ -295,9 +295,8 @@ class GPUPredictor : public xgboost::Predictor {
|
||||
}
|
||||
|
||||
public:
|
||||
GPUPredictor(GenericParameter const* generic_param,
|
||||
std::shared_ptr<std::unordered_map<DMatrix*, PredictionCacheEntry>> cache) :
|
||||
Predictor::Predictor{generic_param, cache} {}
|
||||
explicit GPUPredictor(GenericParameter const* generic_param) :
|
||||
Predictor::Predictor{generic_param} {}
|
||||
|
||||
~GPUPredictor() override {
|
||||
if (generic_param_->gpu_id >= 0) {
|
||||
@@ -305,43 +304,53 @@ class GPUPredictor : public xgboost::Predictor {
|
||||
}
|
||||
}
|
||||
|
||||
void PredictBatch(DMatrix* dmat, HostDeviceVector<bst_float>* out_preds,
|
||||
void PredictBatch(DMatrix* dmat, PredictionCacheEntry* predts,
|
||||
const gbm::GBTreeModel& model, int tree_begin,
|
||||
unsigned ntree_limit = 0) override {
|
||||
// This function is duplicated with CPU predictor PredictBatch, see comments in there.
|
||||
// FIXME(trivialfis): Remove the duplication.
|
||||
int device = generic_param_->gpu_id;
|
||||
CHECK_GE(device, 0) << "Set `gpu_id' to positive value for processing GPU data.";
|
||||
ConfigureDevice(device);
|
||||
|
||||
if (this->PredictFromCache(dmat, out_preds, model, ntree_limit)) {
|
||||
return;
|
||||
}
|
||||
this->InitOutPredictions(dmat->Info(), out_preds, model);
|
||||
|
||||
int32_t tree_end = ntree_limit * model.learner_model_param_->num_output_group;
|
||||
|
||||
if (ntree_limit == 0 || ntree_limit > model.trees.size()) {
|
||||
tree_end = static_cast<unsigned>(model.trees.size());
|
||||
CHECK_EQ(tree_begin, 0);
|
||||
auto* out_preds = &predts->predictions;
|
||||
CHECK_GE(predts->version, tree_begin);
|
||||
if (predts->version == 0) {
|
||||
CHECK_EQ(out_preds->Size(), 0);
|
||||
this->InitOutPredictions(dmat->Info(), out_preds, model);
|
||||
}
|
||||
|
||||
DevicePredictInternal(dmat, out_preds, model, tree_begin, tree_end);
|
||||
uint32_t const output_groups = model.learner_model_param_->num_output_group;
|
||||
CHECK_NE(output_groups, 0);
|
||||
|
||||
auto cache_emtry = this->FindCache(dmat);
|
||||
if (cache_emtry == cache_->cend()) { return; }
|
||||
if (cache_emtry->second.predictions.Size() == 0) {
|
||||
// Initialise the cache on first iteration, this comes useful
|
||||
// when performing training continuation:
|
||||
//
|
||||
// 1. PredictBatch
|
||||
// 2. CommitModel
|
||||
// - updater->UpdatePredictionCache
|
||||
//
|
||||
// If we don't initialise this cache, the 2 step will recieve an invalid cache as
|
||||
// the first step only modifies prediction store in learner without following code.
|
||||
InitOutPredictions(cache_emtry->second.data->Info(),
|
||||
&(cache_emtry->second.predictions), model);
|
||||
CHECK_EQ(cache_emtry->second.predictions.Size(), out_preds->Size());
|
||||
cache_emtry->second.predictions.Copy(*out_preds);
|
||||
uint32_t real_ntree_limit = ntree_limit * output_groups;
|
||||
if (real_ntree_limit == 0 || real_ntree_limit > model.trees.size()) {
|
||||
real_ntree_limit = static_cast<uint32_t>(model.trees.size());
|
||||
}
|
||||
|
||||
uint32_t const end_version = (tree_begin + real_ntree_limit) / output_groups;
|
||||
|
||||
if (predts->version > end_version) {
|
||||
CHECK_NE(ntree_limit, 0);
|
||||
this->InitOutPredictions(dmat->Info(), out_preds, model);
|
||||
predts->version = 0;
|
||||
}
|
||||
uint32_t const beg_version = predts->version;
|
||||
CHECK_LE(beg_version, end_version);
|
||||
|
||||
if (beg_version < end_version) {
|
||||
this->DevicePredictInternal(dmat, out_preds, model,
|
||||
beg_version * output_groups,
|
||||
end_version * output_groups);
|
||||
}
|
||||
|
||||
uint32_t delta = end_version - beg_version;
|
||||
CHECK_LE(delta, model.trees.size());
|
||||
predts->Update(delta);
|
||||
|
||||
CHECK(out_preds->Size() == output_groups * dmat->Info().num_row_ ||
|
||||
out_preds->Size() == dmat->Info().num_row_);
|
||||
}
|
||||
|
||||
protected:
|
||||
@@ -361,49 +370,30 @@ class GPUPredictor : public xgboost::Predictor {
|
||||
}
|
||||
}
|
||||
|
||||
bool PredictFromCache(DMatrix* dmat, HostDeviceVector<bst_float>* out_preds,
|
||||
const gbm::GBTreeModel& model, unsigned ntree_limit) {
|
||||
if (ntree_limit == 0 ||
|
||||
ntree_limit * model.learner_model_param_->num_output_group >= model.trees.size()) {
|
||||
auto it = (*cache_).find(dmat);
|
||||
if (it != cache_->cend()) {
|
||||
const HostDeviceVector<bst_float>& y = it->second.predictions;
|
||||
if (y.Size() != 0) {
|
||||
monitor_.StartCuda("PredictFromCache");
|
||||
out_preds->SetDevice(y.DeviceIdx());
|
||||
out_preds->Resize(y.Size());
|
||||
out_preds->Copy(y);
|
||||
monitor_.StopCuda("PredictFromCache");
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void UpdatePredictionCache(
|
||||
const gbm::GBTreeModel& model,
|
||||
std::vector<std::unique_ptr<TreeUpdater>>* updaters,
|
||||
int num_new_trees) override {
|
||||
int num_new_trees,
|
||||
DMatrix* m,
|
||||
PredictionCacheEntry* predts) override {
|
||||
int device = generic_param_->gpu_id;
|
||||
ConfigureDevice(device);
|
||||
auto old_ntree = model.trees.size() - num_new_trees;
|
||||
// update cache entry
|
||||
for (auto& kv : (*cache_)) {
|
||||
PredictionCacheEntry& e = kv.second;
|
||||
DMatrix* dmat = kv.first;
|
||||
HostDeviceVector<bst_float>& predictions = e.predictions;
|
||||
|
||||
if (predictions.Size() == 0) {
|
||||
this->InitOutPredictions(dmat->Info(), &predictions, model);
|
||||
}
|
||||
|
||||
if (model.learner_model_param_->num_output_group == 1 && updaters->size() > 0 &&
|
||||
num_new_trees == 1 &&
|
||||
updaters->back()->UpdatePredictionCache(e.data.get(), &predictions)) {
|
||||
// do nothing
|
||||
} else {
|
||||
DevicePredictInternal(dmat, &predictions, model, old_ntree, model.trees.size());
|
||||
}
|
||||
auto* out = &predts->predictions;
|
||||
if (predts->predictions.Size() == 0) {
|
||||
InitOutPredictions(m->Info(), out, model);
|
||||
DevicePredictInternal(m, out, model, 0, model.trees.size());
|
||||
} else if (model.learner_model_param_->num_output_group == 1 &&
|
||||
updaters->size() > 0 &&
|
||||
num_new_trees == 1 &&
|
||||
updaters->back()->UpdatePredictionCache(m, out)) {
|
||||
{}
|
||||
} else {
|
||||
DevicePredictInternal(m, out, model, old_ntree, model.trees.size());
|
||||
}
|
||||
auto delta = num_new_trees / model.learner_model_param_->num_output_group;
|
||||
predts->Update(delta);
|
||||
}
|
||||
|
||||
void PredictInstance(const SparsePage::Inst& inst,
|
||||
@@ -442,11 +432,6 @@ class GPUPredictor : public xgboost::Predictor {
|
||||
|
||||
void Configure(const std::vector<std::pair<std::string, std::string>>& cfg) override {
|
||||
Predictor::Configure(cfg);
|
||||
|
||||
int device = generic_param_->gpu_id;
|
||||
if (device >= 0) {
|
||||
ConfigureDevice(device);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
@@ -469,9 +454,8 @@ class GPUPredictor : public xgboost::Predictor {
|
||||
|
||||
XGBOOST_REGISTER_PREDICTOR(GPUPredictor, "gpu_predictor")
|
||||
.describe("Make predictions using GPU.")
|
||||
.set_body([](GenericParameter const* generic_param,
|
||||
std::shared_ptr<std::unordered_map<DMatrix*, PredictionCacheEntry>> cache) {
|
||||
return new GPUPredictor(generic_param, cache);
|
||||
.set_body([](GenericParameter const* generic_param) {
|
||||
return new GPUPredictor(generic_param);
|
||||
});
|
||||
|
||||
} // namespace predictor
|
||||
|
||||
@@ -1,24 +1,60 @@
|
||||
/*!
|
||||
* Copyright by Contributors 2017
|
||||
* Copyright 2017-2020 by Contributors
|
||||
*/
|
||||
#include <dmlc/registry.h>
|
||||
#include <xgboost/predictor.h>
|
||||
|
||||
#include "xgboost/data.h"
|
||||
#include "xgboost/generic_parameters.h"
|
||||
|
||||
namespace dmlc {
|
||||
DMLC_REGISTRY_ENABLE(::xgboost::PredictorReg);
|
||||
} // namespace dmlc
|
||||
namespace xgboost {
|
||||
|
||||
void PredictionContainer::ClearExpiredEntries() {
|
||||
std::vector<DMatrix*> expired;
|
||||
for (auto& kv : container_) {
|
||||
if (kv.second.ref.expired()) {
|
||||
expired.emplace_back(kv.first);
|
||||
}
|
||||
}
|
||||
for (auto const& ptr : expired) {
|
||||
container_.erase(ptr);
|
||||
}
|
||||
}
|
||||
|
||||
PredictionCacheEntry &PredictionContainer::Cache(std::shared_ptr<DMatrix> m, int32_t device) {
|
||||
this->ClearExpiredEntries();
|
||||
container_[m.get()].ref = m;
|
||||
if (device != GenericParameter::kCpuId) {
|
||||
container_[m.get()].predictions.SetDevice(device);
|
||||
}
|
||||
return container_[m.get()];
|
||||
}
|
||||
|
||||
PredictionCacheEntry &PredictionContainer::Entry(DMatrix *m) {
|
||||
CHECK(container_.find(m) != container_.cend());
|
||||
CHECK(container_.at(m).ref.lock())
|
||||
<< "[Internal error]: DMatrix: " << m << " has expired.";
|
||||
return container_.at(m);
|
||||
}
|
||||
|
||||
decltype(PredictionContainer::container_) const& PredictionContainer::Container() {
|
||||
this->ClearExpiredEntries();
|
||||
return container_;
|
||||
}
|
||||
|
||||
void Predictor::Configure(
|
||||
const std::vector<std::pair<std::string, std::string>>& cfg) {
|
||||
}
|
||||
Predictor* Predictor::Create(
|
||||
std::string const& name, GenericParameter const* generic_param,
|
||||
std::shared_ptr<std::unordered_map<DMatrix*, PredictionCacheEntry>> cache) {
|
||||
std::string const& name, GenericParameter const* generic_param) {
|
||||
auto* e = ::dmlc::Registry<PredictorReg>::Get()->Find(name);
|
||||
if (e == nullptr) {
|
||||
LOG(FATAL) << "Unknown predictor type " << name;
|
||||
}
|
||||
auto p_predictor = (e->body)(generic_param, cache);
|
||||
auto p_predictor = (e->body)(generic_param);
|
||||
return p_predictor;
|
||||
}
|
||||
} // namespace xgboost
|
||||
|
||||
Reference in New Issue
Block a user