Thread-safe prediction by making the prediction cache thread-local. (#5853)
Co-authored-by: Jiaming Yuan <jm.yuan@outlook.com>
This commit is contained in:
parent
fa3715f584
commit
d268a2a463
@ -55,7 +55,6 @@ struct PredictionCacheEntry {
|
|||||||
class PredictionContainer {
|
class PredictionContainer {
|
||||||
std::unordered_map<DMatrix *, PredictionCacheEntry> container_;
|
std::unordered_map<DMatrix *, PredictionCacheEntry> container_;
|
||||||
void ClearExpiredEntries();
|
void ClearExpiredEntries();
|
||||||
std::mutex cache_lock_;
|
|
||||||
|
|
||||||
public:
|
public:
|
||||||
PredictionContainer() = default;
|
PredictionContainer() = default;
|
||||||
|
|||||||
@ -221,13 +221,13 @@ void GenericParameter::ConfigureGpuId(bool require_gpu) {
|
|||||||
using LearnerAPIThreadLocalStore =
|
using LearnerAPIThreadLocalStore =
|
||||||
dmlc::ThreadLocalStore<std::map<Learner const *, XGBAPIThreadLocalEntry>>;
|
dmlc::ThreadLocalStore<std::map<Learner const *, XGBAPIThreadLocalEntry>>;
|
||||||
|
|
||||||
|
using ThreadLocalPredictionCache =
|
||||||
|
dmlc::ThreadLocalStore<std::map<Learner const *, PredictionContainer>>;
|
||||||
|
|
||||||
class LearnerConfiguration : public Learner {
|
class LearnerConfiguration : public Learner {
|
||||||
protected:
|
protected:
|
||||||
static std::string const kEvalMetric; // NOLINT
|
static std::string const kEvalMetric; // NOLINT
|
||||||
|
|
||||||
protected:
|
|
||||||
PredictionContainer cache_;
|
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
std::atomic<bool> need_configuration_;
|
std::atomic<bool> need_configuration_;
|
||||||
std::map<std::string, std::string> cfg_;
|
std::map<std::string, std::string> cfg_;
|
||||||
@ -244,12 +244,19 @@ class LearnerConfiguration : public Learner {
|
|||||||
explicit LearnerConfiguration(std::vector<std::shared_ptr<DMatrix> > cache)
|
explicit LearnerConfiguration(std::vector<std::shared_ptr<DMatrix> > cache)
|
||||||
: need_configuration_{true} {
|
: need_configuration_{true} {
|
||||||
monitor_.Init("Learner");
|
monitor_.Init("Learner");
|
||||||
|
auto& local_cache = (*ThreadLocalPredictionCache::Get())[this];
|
||||||
for (std::shared_ptr<DMatrix> const& d : cache) {
|
for (std::shared_ptr<DMatrix> const& d : cache) {
|
||||||
cache_.Cache(d, GenericParameter::kCpuId);
|
local_cache.Cache(d, GenericParameter::kCpuId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
~LearnerConfiguration() override {
|
||||||
|
auto local_cache = ThreadLocalPredictionCache::Get();
|
||||||
|
if (local_cache->find(this) != local_cache->cend()) {
|
||||||
|
local_cache->erase(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Configuration before data is known.
|
|
||||||
|
|
||||||
|
// Configuration before data is known.
|
||||||
void Configure() override {
|
void Configure() override {
|
||||||
// Varient of double checked lock
|
// Varient of double checked lock
|
||||||
if (!this->need_configuration_) { return; }
|
if (!this->need_configuration_) { return; }
|
||||||
@ -316,6 +323,10 @@ class LearnerConfiguration : public Learner {
|
|||||||
monitor_.Stop("Configure");
|
monitor_.Stop("Configure");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
virtual PredictionContainer* GetPredictionCache() const {
|
||||||
|
return &((*ThreadLocalPredictionCache::Get())[this]);
|
||||||
|
}
|
||||||
|
|
||||||
void LoadConfig(Json const& in) override {
|
void LoadConfig(Json const& in) override {
|
||||||
CHECK(IsA<Object>(in));
|
CHECK(IsA<Object>(in));
|
||||||
Version::Load(in, true);
|
Version::Load(in, true);
|
||||||
@ -511,7 +522,8 @@ class LearnerConfiguration : public Learner {
|
|||||||
if (mparam_.num_feature == 0) {
|
if (mparam_.num_feature == 0) {
|
||||||
// TODO(hcho3): Change num_feature to 64-bit integer
|
// TODO(hcho3): Change num_feature to 64-bit integer
|
||||||
unsigned num_feature = 0;
|
unsigned num_feature = 0;
|
||||||
for (auto& matrix : cache_.Container()) {
|
auto local_cache = this->GetPredictionCache();
|
||||||
|
for (auto& matrix : local_cache->Container()) {
|
||||||
CHECK(matrix.first);
|
CHECK(matrix.first);
|
||||||
CHECK(!matrix.second.ref.expired());
|
CHECK(!matrix.second.ref.expired());
|
||||||
const uint64_t num_col = matrix.first->Info().num_col_;
|
const uint64_t num_col = matrix.first->Info().num_col_;
|
||||||
@ -948,7 +960,8 @@ class LearnerImpl : public LearnerIO {
|
|||||||
this->CheckDataSplitMode();
|
this->CheckDataSplitMode();
|
||||||
this->ValidateDMatrix(train.get(), true);
|
this->ValidateDMatrix(train.get(), true);
|
||||||
|
|
||||||
auto& predt = this->cache_.Cache(train, generic_parameters_.gpu_id);
|
auto local_cache = this->GetPredictionCache();
|
||||||
|
auto& predt = local_cache->Cache(train, generic_parameters_.gpu_id);
|
||||||
|
|
||||||
monitor_.Start("PredictRaw");
|
monitor_.Start("PredictRaw");
|
||||||
this->PredictRaw(train.get(), &predt, true);
|
this->PredictRaw(train.get(), &predt, true);
|
||||||
@ -973,9 +986,10 @@ class LearnerImpl : public LearnerIO {
|
|||||||
}
|
}
|
||||||
this->CheckDataSplitMode();
|
this->CheckDataSplitMode();
|
||||||
this->ValidateDMatrix(train.get(), true);
|
this->ValidateDMatrix(train.get(), true);
|
||||||
this->cache_.Cache(train, generic_parameters_.gpu_id);
|
auto local_cache = this->GetPredictionCache();
|
||||||
|
local_cache->Cache(train, generic_parameters_.gpu_id);
|
||||||
|
|
||||||
gbm_->DoBoost(train.get(), in_gpair, &cache_.Entry(train.get()));
|
gbm_->DoBoost(train.get(), in_gpair, &local_cache->Entry(train.get()));
|
||||||
monitor_.Stop("BoostOneIter");
|
monitor_.Stop("BoostOneIter");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -991,9 +1005,11 @@ class LearnerImpl : public LearnerIO {
|
|||||||
metrics_.emplace_back(Metric::Create(obj_->DefaultEvalMetric(), &generic_parameters_));
|
metrics_.emplace_back(Metric::Create(obj_->DefaultEvalMetric(), &generic_parameters_));
|
||||||
metrics_.back()->Configure({cfg_.begin(), cfg_.end()});
|
metrics_.back()->Configure({cfg_.begin(), cfg_.end()});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto local_cache = this->GetPredictionCache();
|
||||||
for (size_t i = 0; i < data_sets.size(); ++i) {
|
for (size_t i = 0; i < data_sets.size(); ++i) {
|
||||||
std::shared_ptr<DMatrix> m = data_sets[i];
|
std::shared_ptr<DMatrix> m = data_sets[i];
|
||||||
auto &predt = this->cache_.Cache(m, generic_parameters_.gpu_id);
|
auto &predt = local_cache->Cache(m, generic_parameters_.gpu_id);
|
||||||
this->ValidateDMatrix(m.get(), false);
|
this->ValidateDMatrix(m.get(), false);
|
||||||
this->PredictRaw(m.get(), &predt, false);
|
this->PredictRaw(m.get(), &predt, false);
|
||||||
|
|
||||||
@ -1030,7 +1046,8 @@ class LearnerImpl : public LearnerIO {
|
|||||||
} else if (pred_leaf) {
|
} else if (pred_leaf) {
|
||||||
gbm_->PredictLeaf(data.get(), &out_preds->HostVector(), ntree_limit);
|
gbm_->PredictLeaf(data.get(), &out_preds->HostVector(), ntree_limit);
|
||||||
} else {
|
} else {
|
||||||
auto& prediction = cache_.Cache(data, generic_parameters_.gpu_id);
|
auto local_cache = this->GetPredictionCache();
|
||||||
|
auto& prediction = local_cache->Cache(data, generic_parameters_.gpu_id);
|
||||||
this->PredictRaw(data.get(), &prediction, training, ntree_limit);
|
this->PredictRaw(data.get(), &prediction, training, ntree_limit);
|
||||||
// Copy the prediction cache to output prediction. out_preds comes from C API
|
// Copy the prediction cache to output prediction. out_preds comes from C API
|
||||||
out_preds->SetDevice(generic_parameters_.gpu_id);
|
out_preds->SetDevice(generic_parameters_.gpu_id);
|
||||||
|
|||||||
@ -26,7 +26,6 @@ void PredictionContainer::ClearExpiredEntries() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
PredictionCacheEntry &PredictionContainer::Cache(std::shared_ptr<DMatrix> m, int32_t device) {
|
PredictionCacheEntry &PredictionContainer::Cache(std::shared_ptr<DMatrix> m, int32_t device) {
|
||||||
std::lock_guard<std::mutex> guard { cache_lock_ };
|
|
||||||
this->ClearExpiredEntries();
|
this->ClearExpiredEntries();
|
||||||
container_[m.get()].ref = m;
|
container_[m.get()].ref = m;
|
||||||
if (device != GenericParameter::kCpuId) {
|
if (device != GenericParameter::kCpuId) {
|
||||||
|
|||||||
@ -1384,6 +1384,5 @@ XGBOOST_REGISTER_TREE_UPDATER(QuantileHistMaker, "grow_quantile_histmaker")
|
|||||||
[]() {
|
[]() {
|
||||||
return new QuantileHistMaker();
|
return new QuantileHistMaker();
|
||||||
});
|
});
|
||||||
|
|
||||||
} // namespace tree
|
} // namespace tree
|
||||||
} // namespace xgboost
|
} // namespace xgboost
|
||||||
|
|||||||
@ -3,6 +3,7 @@
|
|||||||
*/
|
*/
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
#include <thread>
|
||||||
#include "helpers.h"
|
#include "helpers.h"
|
||||||
#include <dmlc/filesystem.h>
|
#include <dmlc/filesystem.h>
|
||||||
|
|
||||||
@ -176,6 +177,48 @@ TEST(Learner, JsonModelIO) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Crashes the test runner if there are race condiditions.
|
||||||
|
//
|
||||||
|
// Build with additional cmake flags to enable thread sanitizer
|
||||||
|
// which definitely catches problems. Note that OpenMP needs to be
|
||||||
|
// disabled, otherwise thread sanitizer will also report false
|
||||||
|
// positives.
|
||||||
|
//
|
||||||
|
// ```
|
||||||
|
// -DUSE_SANITIZER=ON -DENABLED_SANITIZERS=thread -DUSE_OPENMP=OFF
|
||||||
|
// ```
|
||||||
|
TEST(Learner, MultiThreadedPredict) {
|
||||||
|
size_t constexpr kRows = 1000;
|
||||||
|
size_t constexpr kCols = 1000;
|
||||||
|
|
||||||
|
std::shared_ptr<DMatrix> p_dmat{
|
||||||
|
RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix()};
|
||||||
|
p_dmat->Info().labels_.Resize(kRows);
|
||||||
|
CHECK_NE(p_dmat->Info().num_col_, 0);
|
||||||
|
|
||||||
|
std::shared_ptr<DMatrix> p_data{
|
||||||
|
RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix()};
|
||||||
|
CHECK_NE(p_data->Info().num_col_, 0);
|
||||||
|
|
||||||
|
std::shared_ptr<Learner> learner{Learner::Create({p_dmat})};
|
||||||
|
learner->Configure();
|
||||||
|
|
||||||
|
std::vector<std::thread> threads;
|
||||||
|
for (uint32_t thread_id = 0;
|
||||||
|
thread_id < 2 * std::thread::hardware_concurrency(); ++thread_id) {
|
||||||
|
threads.emplace_back([learner, p_data] {
|
||||||
|
size_t constexpr kIters = 10;
|
||||||
|
auto &entry = learner->GetThreadLocal().prediction_entry;
|
||||||
|
for (size_t iter = 0; iter < kIters; ++iter) {
|
||||||
|
learner->Predict(p_data, false, &entry.predictions);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
for (auto &thread : threads) {
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST(Learner, BinaryModelIO) {
|
TEST(Learner, BinaryModelIO) {
|
||||||
size_t constexpr kRows = 8;
|
size_t constexpr kRows = 8;
|
||||||
int32_t constexpr kIters = 4;
|
int32_t constexpr kIters = 4;
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user