From d268a2a4631077c21562b606680c94a743ab7b18 Mon Sep 17 00:00:00 2001 From: boxdot Date: Thu, 30 Jul 2020 06:33:50 +0200 Subject: [PATCH] Thread-safe prediction by making the prediction cache thread-local. (#5853) Co-authored-by: Jiaming Yuan --- include/xgboost/predictor.h | 1 - src/learner.cc | 39 ++++++++++++++++++++-------- src/predictor/predictor.cc | 1 - src/tree/updater_quantile_hist.cc | 1 - tests/cpp/test_learner.cc | 43 +++++++++++++++++++++++++++++++ 5 files changed, 71 insertions(+), 14 deletions(-) diff --git a/include/xgboost/predictor.h b/include/xgboost/predictor.h index 0e3f7568c..0396ee262 100644 --- a/include/xgboost/predictor.h +++ b/include/xgboost/predictor.h @@ -55,7 +55,6 @@ struct PredictionCacheEntry { class PredictionContainer { std::unordered_map container_; void ClearExpiredEntries(); - std::mutex cache_lock_; public: PredictionContainer() = default; diff --git a/src/learner.cc b/src/learner.cc index f7e59a735..9aceb70a9 100644 --- a/src/learner.cc +++ b/src/learner.cc @@ -221,13 +221,13 @@ void GenericParameter::ConfigureGpuId(bool require_gpu) { using LearnerAPIThreadLocalStore = dmlc::ThreadLocalStore>; +using ThreadLocalPredictionCache = + dmlc::ThreadLocalStore>; + class LearnerConfiguration : public Learner { protected: static std::string const kEvalMetric; // NOLINT - protected: - PredictionContainer cache_; - protected: std::atomic need_configuration_; std::map cfg_; @@ -244,12 +244,19 @@ class LearnerConfiguration : public Learner { explicit LearnerConfiguration(std::vector > cache) : need_configuration_{true} { monitor_.Init("Learner"); + auto& local_cache = (*ThreadLocalPredictionCache::Get())[this]; for (std::shared_ptr const& d : cache) { - cache_.Cache(d, GenericParameter::kCpuId); + local_cache.Cache(d, GenericParameter::kCpuId); + } + } + ~LearnerConfiguration() override { + auto local_cache = ThreadLocalPredictionCache::Get(); + if (local_cache->find(this) != local_cache->cend()) { + local_cache->erase(this); } } - // Configuration before data is known. + // Configuration before data is known. void Configure() override { // Varient of double checked lock if (!this->need_configuration_) { return; } @@ -316,6 +323,10 @@ class LearnerConfiguration : public Learner { monitor_.Stop("Configure"); } + virtual PredictionContainer* GetPredictionCache() const { + return &((*ThreadLocalPredictionCache::Get())[this]); + } + void LoadConfig(Json const& in) override { CHECK(IsA(in)); Version::Load(in, true); @@ -511,7 +522,8 @@ class LearnerConfiguration : public Learner { if (mparam_.num_feature == 0) { // TODO(hcho3): Change num_feature to 64-bit integer unsigned num_feature = 0; - for (auto& matrix : cache_.Container()) { + auto local_cache = this->GetPredictionCache(); + for (auto& matrix : local_cache->Container()) { CHECK(matrix.first); CHECK(!matrix.second.ref.expired()); const uint64_t num_col = matrix.first->Info().num_col_; @@ -948,7 +960,8 @@ class LearnerImpl : public LearnerIO { this->CheckDataSplitMode(); this->ValidateDMatrix(train.get(), true); - auto& predt = this->cache_.Cache(train, generic_parameters_.gpu_id); + auto local_cache = this->GetPredictionCache(); + auto& predt = local_cache->Cache(train, generic_parameters_.gpu_id); monitor_.Start("PredictRaw"); this->PredictRaw(train.get(), &predt, true); @@ -973,9 +986,10 @@ class LearnerImpl : public LearnerIO { } this->CheckDataSplitMode(); this->ValidateDMatrix(train.get(), true); - this->cache_.Cache(train, generic_parameters_.gpu_id); + auto local_cache = this->GetPredictionCache(); + local_cache->Cache(train, generic_parameters_.gpu_id); - gbm_->DoBoost(train.get(), in_gpair, &cache_.Entry(train.get())); + gbm_->DoBoost(train.get(), in_gpair, &local_cache->Entry(train.get())); monitor_.Stop("BoostOneIter"); } @@ -991,9 +1005,11 @@ class LearnerImpl : public LearnerIO { metrics_.emplace_back(Metric::Create(obj_->DefaultEvalMetric(), &generic_parameters_)); metrics_.back()->Configure({cfg_.begin(), cfg_.end()}); } + + auto local_cache = this->GetPredictionCache(); for (size_t i = 0; i < data_sets.size(); ++i) { std::shared_ptr m = data_sets[i]; - auto &predt = this->cache_.Cache(m, generic_parameters_.gpu_id); + auto &predt = local_cache->Cache(m, generic_parameters_.gpu_id); this->ValidateDMatrix(m.get(), false); this->PredictRaw(m.get(), &predt, false); @@ -1030,7 +1046,8 @@ class LearnerImpl : public LearnerIO { } else if (pred_leaf) { gbm_->PredictLeaf(data.get(), &out_preds->HostVector(), ntree_limit); } else { - auto& prediction = cache_.Cache(data, generic_parameters_.gpu_id); + auto local_cache = this->GetPredictionCache(); + auto& prediction = local_cache->Cache(data, generic_parameters_.gpu_id); this->PredictRaw(data.get(), &prediction, training, ntree_limit); // Copy the prediction cache to output prediction. out_preds comes from C API out_preds->SetDevice(generic_parameters_.gpu_id); diff --git a/src/predictor/predictor.cc b/src/predictor/predictor.cc index eb68d382b..23e7301af 100644 --- a/src/predictor/predictor.cc +++ b/src/predictor/predictor.cc @@ -26,7 +26,6 @@ void PredictionContainer::ClearExpiredEntries() { } PredictionCacheEntry &PredictionContainer::Cache(std::shared_ptr m, int32_t device) { - std::lock_guard guard { cache_lock_ }; this->ClearExpiredEntries(); container_[m.get()].ref = m; if (device != GenericParameter::kCpuId) { diff --git a/src/tree/updater_quantile_hist.cc b/src/tree/updater_quantile_hist.cc index cee3d1e0d..37a90dfeb 100644 --- a/src/tree/updater_quantile_hist.cc +++ b/src/tree/updater_quantile_hist.cc @@ -1384,6 +1384,5 @@ XGBOOST_REGISTER_TREE_UPDATER(QuantileHistMaker, "grow_quantile_histmaker") []() { return new QuantileHistMaker(); }); - } // namespace tree } // namespace xgboost diff --git a/tests/cpp/test_learner.cc b/tests/cpp/test_learner.cc index a69fce19d..7d473f00c 100644 --- a/tests/cpp/test_learner.cc +++ b/tests/cpp/test_learner.cc @@ -3,6 +3,7 @@ */ #include #include +#include #include "helpers.h" #include @@ -176,6 +177,48 @@ TEST(Learner, JsonModelIO) { } } +// Crashes the test runner if there are race condiditions. +// +// Build with additional cmake flags to enable thread sanitizer +// which definitely catches problems. Note that OpenMP needs to be +// disabled, otherwise thread sanitizer will also report false +// positives. +// +// ``` +// -DUSE_SANITIZER=ON -DENABLED_SANITIZERS=thread -DUSE_OPENMP=OFF +// ``` +TEST(Learner, MultiThreadedPredict) { + size_t constexpr kRows = 1000; + size_t constexpr kCols = 1000; + + std::shared_ptr p_dmat{ + RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix()}; + p_dmat->Info().labels_.Resize(kRows); + CHECK_NE(p_dmat->Info().num_col_, 0); + + std::shared_ptr p_data{ + RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix()}; + CHECK_NE(p_data->Info().num_col_, 0); + + std::shared_ptr learner{Learner::Create({p_dmat})}; + learner->Configure(); + + std::vector threads; + for (uint32_t thread_id = 0; + thread_id < 2 * std::thread::hardware_concurrency(); ++thread_id) { + threads.emplace_back([learner, p_data] { + size_t constexpr kIters = 10; + auto &entry = learner->GetThreadLocal().prediction_entry; + for (size_t iter = 0; iter < kIters; ++iter) { + learner->Predict(p_data, false, &entry.predictions); + } + }); + } + for (auto &thread : threads) { + thread.join(); + } +} + TEST(Learner, BinaryModelIO) { size_t constexpr kRows = 8; int32_t constexpr kIters = 4;