From fb1a9e6bc58c787a75dc9eaf8efebe2efa7a16d2 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Sun, 17 Oct 2021 15:55:49 +0800 Subject: [PATCH] Avoid omp reduction in coordinate descent and aft metrics. (#7316) Aside from the omp issue, parameter configuration for aft metric is simplified. --- src/linear/coordinate_common.h | 37 ++++---- src/linear/updater_coordinate.cc | 4 +- src/metric/survival_metric.cu | 102 +++++++++++------------ tests/cpp/metric/test_survival_metric.cu | 39 +++++++++ 4 files changed, 112 insertions(+), 70 deletions(-) diff --git a/src/linear/coordinate_common.h b/src/linear/coordinate_common.h index e301d062c..d01bce826 100644 --- a/src/linear/coordinate_common.h +++ b/src/linear/coordinate_common.h @@ -108,27 +108,32 @@ inline std::pair GetGradient(int group_idx, int num_group, int f * * \return The gradient and diagonal Hessian entry for a given feature. */ -inline std::pair GetGradientParallel(int group_idx, int num_group, int fidx, - const std::vector &gpair, - DMatrix *p_fmat) { - double sum_grad = 0.0, sum_hess = 0.0; +inline std::pair +GetGradientParallel(GenericParameter const *ctx, int group_idx, int num_group, + int fidx, const std::vector &gpair, + DMatrix *p_fmat) { + std::vector sum_grad_tloc(ctx->Threads(), 0.0); + std::vector sum_hess_tloc(ctx->Threads(), 0.0); + for (const auto &batch : p_fmat->GetBatches()) { auto page = batch.GetView(); auto col = page[fidx]; const auto ndata = static_cast(col.size()); - dmlc::OMPException exc; -#pragma omp parallel for schedule(static) reduction(+ : sum_grad, sum_hess) - for (bst_omp_uint j = 0; j < ndata; ++j) { - exc.Run([&]() { - const bst_float v = col[j].fvalue; - auto &p = gpair[col[j].index * num_group + group_idx]; - if (p.GetHess() < 0.0f) return; - sum_grad += p.GetGrad() * v; - sum_hess += p.GetHess() * v * v; - }); - } - exc.Rethrow(); + common::ParallelFor(ndata, ctx->Threads(), [&](size_t j) { + const bst_float v = col[j].fvalue; + auto &p = gpair[col[j].index * num_group + group_idx]; + if (p.GetHess() < 0.0f) { + return; + } + auto t_idx = omp_get_thread_num(); + sum_grad_tloc[t_idx] += p.GetGrad() * v; + sum_hess_tloc[t_idx] += p.GetHess() * v * v; + }); } + double sum_grad = + std::accumulate(sum_grad_tloc.cbegin(), sum_grad_tloc.cend(), 0.0); + double sum_hess = + std::accumulate(sum_hess_tloc.cbegin(), sum_hess_tloc.cend(), 0.0); return std::make_pair(sum_grad, sum_hess); } diff --git a/src/linear/updater_coordinate.cc b/src/linear/updater_coordinate.cc index 23f227a54..ae070f5d5 100644 --- a/src/linear/updater_coordinate.cc +++ b/src/linear/updater_coordinate.cc @@ -80,8 +80,8 @@ class CoordinateUpdater : public LinearUpdater { DMatrix *p_fmat, gbm::GBLinearModel *model) { const int ngroup = model->learner_model_param->num_output_group; bst_float &w = (*model)[fidx][group_idx]; - auto gradient = - GetGradientParallel(group_idx, ngroup, fidx, *in_gpair, p_fmat); + auto gradient = GetGradientParallel(learner_param_, group_idx, ngroup, fidx, + *in_gpair, p_fmat); auto dw = static_cast( tparam_.learning_rate * CoordinateDelta(gradient.first, gradient.second, w, tparam_.reg_alpha_denorm, diff --git a/src/metric/survival_metric.cu b/src/metric/survival_metric.cu index c53344c5e..d0263456d 100644 --- a/src/metric/survival_metric.cu +++ b/src/metric/survival_metric.cu @@ -18,6 +18,7 @@ #include "metric_common.h" #include "../common/math.h" #include "../common/survival_util.h" +#include "../common/threading_utils.h" #if defined(XGBOOST_USE_CUDA) #include // thrust::cuda::par @@ -42,11 +43,12 @@ class ElementWiseSurvivalMetricsReduction { policy_ = policy; } - PackedReduceResult CpuReduceMetrics( - const HostDeviceVector& weights, - const HostDeviceVector& labels_lower_bound, - const HostDeviceVector& labels_upper_bound, - const HostDeviceVector& preds) const { + PackedReduceResult + CpuReduceMetrics(const HostDeviceVector &weights, + const HostDeviceVector &labels_lower_bound, + const HostDeviceVector &labels_upper_bound, + const HostDeviceVector &preds, + int32_t n_threads) const { size_t ndata = labels_lower_bound.Size(); CHECK_EQ(ndata, labels_upper_bound.Size()); @@ -55,22 +57,24 @@ class ElementWiseSurvivalMetricsReduction { const auto& h_weights = weights.HostVector(); const auto& h_preds = preds.HostVector(); - double residue_sum = 0; - double weights_sum = 0; + std::vector score_tloc(n_threads, 0.0); + std::vector weight_tloc(n_threads, 0.0); + + common::ParallelFor(ndata, n_threads, [&](size_t i) { + const double wt = + h_weights.empty() ? 1.0 : static_cast(h_weights[i]); + auto t_idx = omp_get_thread_num(); + score_tloc[t_idx] += + policy_.EvalRow(static_cast(h_labels_lower_bound[i]), + static_cast(h_labels_upper_bound[i]), + static_cast(h_preds[i])) * + wt; + weight_tloc[t_idx] += wt; + }); + + double residue_sum = std::accumulate(score_tloc.cbegin(), score_tloc.cend(), 0.0); + double weights_sum = std::accumulate(weight_tloc.cbegin(), weight_tloc.cend(), 0.0); - dmlc::OMPException exc; -#pragma omp parallel for reduction(+: residue_sum, weights_sum) schedule(static) - for (omp_ulong i = 0; i < ndata; ++i) { - exc.Run([&]() { - const double wt = h_weights.empty() ? 1.0 : static_cast(h_weights[i]); - residue_sum += policy_.EvalRow( - static_cast(h_labels_lower_bound[i]), - static_cast(h_labels_upper_bound[i]), - static_cast(h_preds[i])) * wt; - weights_sum += wt; - }); - } - exc.Rethrow(); PackedReduceResult res{residue_sum, weights_sum}; return res; } @@ -119,25 +123,25 @@ class ElementWiseSurvivalMetricsReduction { #endif // XGBOOST_USE_CUDA PackedReduceResult Reduce( - int device, + const GenericParameter &ctx, const HostDeviceVector& weights, const HostDeviceVector& labels_lower_bound, const HostDeviceVector& labels_upper_bound, const HostDeviceVector& preds) { PackedReduceResult result; - if (device < 0) { - result = CpuReduceMetrics(weights, labels_lower_bound, labels_upper_bound, preds); + if (ctx.gpu_id < 0) { + result = CpuReduceMetrics(weights, labels_lower_bound, labels_upper_bound, + preds, ctx.Threads()); } #if defined(XGBOOST_USE_CUDA) else { // NOLINT - device_ = device; - preds.SetDevice(device_); - labels_lower_bound.SetDevice(device_); - labels_upper_bound.SetDevice(device_); - weights.SetDevice(device_); + preds.SetDevice(ctx.gpu_id); + labels_lower_bound.SetDevice(ctx.gpu_id); + labels_upper_bound.SetDevice(ctx.gpu_id); + weights.SetDevice(ctx.gpu_id); - dh::safe_cuda(cudaSetDevice(device_)); + dh::safe_cuda(cudaSetDevice(ctx.gpu_id)); result = DeviceReduceMetrics(weights, labels_lower_bound, labels_upper_bound, preds); } #endif // defined(XGBOOST_USE_CUDA) @@ -146,9 +150,6 @@ class ElementWiseSurvivalMetricsReduction { private: EvalRow policy_; -#if defined(XGBOOST_USE_CUDA) - int device_{-1}; -#endif // defined(XGBOOST_USE_CUDA) }; struct EvalIntervalRegressionAccuracy { @@ -193,18 +194,16 @@ struct EvalAFTNLogLik { AFTParam param_; }; -template -struct EvalEWiseSurvivalBase : public Metric { +template struct EvalEWiseSurvivalBase : public Metric { + explicit EvalEWiseSurvivalBase(GenericParameter const *ctx) { + tparam_ = ctx; + } EvalEWiseSurvivalBase() = default; void Configure(const Args& args) override { policy_.Configure(args); - for (const auto& e : args) { - if (e.first == "gpu_id") { - device_ = dmlc::ParseSignedInt(e.second.c_str(), nullptr, 10); - } - } reducer_.Configure(policy_); + CHECK(tparam_); } bst_float Eval(const HostDeviceVector& preds, @@ -212,9 +211,10 @@ struct EvalEWiseSurvivalBase : public Metric { bool distributed) override { CHECK_EQ(preds.Size(), info.labels_lower_bound_.Size()); CHECK_EQ(preds.Size(), info.labels_upper_bound_.Size()); - - auto result = reducer_.Reduce( - device_, info.weights_, info.labels_lower_bound_, info.labels_upper_bound_, preds); + CHECK(tparam_); + auto result = + reducer_.Reduce(*tparam_, info.weights_, info.labels_lower_bound_, + info.labels_upper_bound_, preds); double dat[2] {result.Residue(), result.Weights()}; @@ -252,24 +252,22 @@ struct AFTNLogLikDispatcher : public Metric { param_.UpdateAllowUnknown(args); switch (param_.aft_loss_distribution) { case common::ProbabilityDistributionType::kNormal: - metric_.reset(new EvalEWiseSurvivalBase>()); + metric_.reset( + new EvalEWiseSurvivalBase>( + tparam_)); break; case common::ProbabilityDistributionType::kLogistic: - metric_.reset(new EvalEWiseSurvivalBase>()); + metric_.reset(new EvalEWiseSurvivalBase< + EvalAFTNLogLik>(tparam_)); break; case common::ProbabilityDistributionType::kExtreme: - metric_.reset(new EvalEWiseSurvivalBase>()); + metric_.reset(new EvalEWiseSurvivalBase< + EvalAFTNLogLik>(tparam_)); break; default: LOG(FATAL) << "Unknown probability distribution"; } - Args new_args{args}; - // tparam_ doesn't get propagated to the inner metric object because we didn't use - // Metric::Create(). I don't think it's a good idea to pollute the metric registry with - // specialized versions of the AFT metric, so as a work-around, manually pass the GPU ID - // into the inner metric via configuration. - new_args.emplace_back("gpu_id", std::to_string(tparam_->gpu_id)); - metric_->Configure(new_args); + metric_->Configure(args); } void SaveConfig(Json* p_out) const override { diff --git a/tests/cpp/metric/test_survival_metric.cu b/tests/cpp/metric/test_survival_metric.cu index 81d69f939..0dcdd27d3 100644 --- a/tests/cpp/metric/test_survival_metric.cu +++ b/tests/cpp/metric/test_survival_metric.cu @@ -11,6 +11,41 @@ namespace xgboost { namespace common { +namespace { +inline void CheckDeterministicMetricElementWise(StringView name, int32_t device) { + auto lparam = CreateEmptyGenericParam(device); + std::unique_ptr metric{Metric::Create(name.c_str(), &lparam)}; + metric->Configure(Args{}); + + HostDeviceVector predts; + MetaInfo info; + auto &h_predts = predts.HostVector(); + + SimpleLCG lcg; + SimpleRealUniformDistribution dist{0.0f, 1.0f}; + + size_t n_samples = 2048; + h_predts.resize(n_samples); + + for (size_t i = 0; i < n_samples; ++i) { + h_predts[i] = dist(&lcg); + } + + auto &h_upper = info.labels_upper_bound_.HostVector(); + auto &h_lower = info.labels_lower_bound_.HostVector(); + h_lower.resize(n_samples); + h_upper.resize(n_samples); + for (size_t i = 0; i < n_samples; ++i) { + h_lower[i] = 1; + h_upper[i] = 10; + } + + auto result = metric->Eval(predts, info, false); + for (size_t i = 0; i < 8; ++i) { + ASSERT_EQ(metric->Eval(predts, info, false), result); + } +} +} // anonymous namespace TEST(Metric, DeclareUnifiedTest(AFTNegLogLik)) { auto lparam = xgboost::CreateEmptyGenericParam(GPUIDX); @@ -61,6 +96,8 @@ TEST(Metric, DeclareUnifiedTest(IntervalRegressionAccuracy)) { EXPECT_FLOAT_EQ(metric->Eval(preds, info, false), 0.50f); info.labels_lower_bound_.HostVector()[0] = 70.0f; EXPECT_FLOAT_EQ(metric->Eval(preds, info, false), 0.25f); + + CheckDeterministicMetricElementWise(StringView{"interval-regression-accuracy"}, GPUIDX); } // Test configuration of AFT metric @@ -75,6 +112,8 @@ TEST(AFTNegLogLikMetric, DeclareUnifiedTest(Configuration)) { auto aft_param_json = j_obj["aft_loss_param"]; EXPECT_EQ(get(aft_param_json["aft_loss_distribution"]), "normal"); EXPECT_EQ(get(aft_param_json["aft_loss_distribution_scale"]), "10"); + + CheckDeterministicMetricElementWise(StringView{"aft-nloglik"}, GPUIDX); } } // namespace common