Improve OpenMP exception handling (#6680)

This commit is contained in:
Louis Desreumaux
2021-02-25 06:56:16 +01:00
committed by GitHub
parent c375173dca
commit 9b530e5697
26 changed files with 610 additions and 475 deletions

View File

@@ -47,12 +47,16 @@ class ElementWiseMetricsReduction {
bst_float residue_sum = 0;
bst_float weights_sum = 0;
dmlc::OMPException exc;
#pragma omp parallel for reduction(+: residue_sum, weights_sum) schedule(static)
for (omp_ulong i = 0; i < ndata; ++i) {
const bst_float wt = h_weights.size() > 0 ? h_weights[i] : 1.0f;
residue_sum += policy_.EvalRow(h_labels[i], h_preds[i]) * wt;
weights_sum += wt;
exc.Run([&]() {
const bst_float wt = h_weights.size() > 0 ? h_weights[i] : 1.0f;
residue_sum += policy_.EvalRow(h_labels[i], h_preds[i]) * wt;
weights_sum += wt;
});
}
exc.Rethrow();
PackedReduceResult res { residue_sum, weights_sum };
return res;
}

View File

@@ -53,18 +53,23 @@ class MultiClassMetricsReduction {
int label_error = 0;
bool const is_null_weight = weights.Size() == 0;
dmlc::OMPException exc;
#pragma omp parallel for reduction(+: residue_sum, weights_sum) schedule(static)
for (omp_ulong idx = 0; idx < ndata; ++idx) {
bst_float weight = is_null_weight ? 1.0f : h_weights[idx];
auto label = static_cast<int>(h_labels[idx]);
if (label >= 0 && label < static_cast<int>(n_class)) {
residue_sum += EvalRowPolicy::EvalRow(
label, h_preds.data() + idx * n_class, n_class) * weight;
weights_sum += weight;
} else {
label_error = label;
}
exc.Run([&]() {
bst_float weight = is_null_weight ? 1.0f : h_weights[idx];
auto label = static_cast<int>(h_labels[idx]);
if (label >= 0 && label < static_cast<int>(n_class)) {
residue_sum += EvalRowPolicy::EvalRow(
label, h_preds.data() + idx * n_class, n_class) * weight;
weights_sum += weight;
} else {
label_error = label;
}
});
}
exc.Rethrow();
CheckLabelError(label_error, n_class);
PackedReduceResult res { residue_sum, weights_sum };

View File

@@ -29,6 +29,7 @@
#include "xgboost/host_device_vector.h"
#include "../common/math.h"
#include "../common/threading_utils.h"
#include "metric_common.h"
namespace {
@@ -111,10 +112,9 @@ struct EvalAMS : public Metric {
PredIndPairContainer rec(ndata);
const auto &h_preds = preds.ConstHostVector();
#pragma omp parallel for schedule(static)
for (bst_omp_uint i = 0; i < ndata; ++i) {
common::ParallelFor(ndata, [&](bst_omp_uint i) {
rec[i] = std::make_pair(h_preds[i], i);
}
});
XGBOOST_PARALLEL_SORT(rec.begin(), rec.end(), common::CmpFirst);
auto ntop = static_cast<unsigned>(ratio_ * ndata);
if (ntop == 0) ntop = ndata;
@@ -175,49 +175,57 @@ struct EvalAuc : public Metric {
const auto& labels = info.labels_.ConstHostVector();
const auto &h_preds = preds.ConstHostVector();
dmlc::OMPException exc;
#pragma omp parallel reduction(+:sum_auc, auc_error) if (ngroups > 1)
{
// Each thread works on a distinct group and sorts the predictions in that group
PredIndPairContainer rec;
#pragma omp for schedule(static)
for (bst_omp_uint group_id = 0; group_id < ngroups; ++group_id) {
// Same thread can work on multiple groups one after another; hence, resize
// the predictions array based on the current group
rec.resize(gptr[group_id + 1] - gptr[group_id]);
#pragma omp parallel for schedule(static) if (!omp_in_parallel())
for (bst_omp_uint j = gptr[group_id]; j < gptr[group_id + 1]; ++j) {
rec[j - gptr[group_id]] = {h_preds[j], j};
}
exc.Run([&]() {
// Each thread works on a distinct group and sorts the predictions in that group
PredIndPairContainer rec;
#pragma omp for schedule(static)
for (bst_omp_uint group_id = 0; group_id < ngroups; ++group_id) {
exc.Run([&]() {
// Same thread can work on multiple groups one after another; hence, resize
// the predictions array based on the current group
rec.resize(gptr[group_id + 1] - gptr[group_id]);
#pragma omp parallel for schedule(static) if (!omp_in_parallel())
for (bst_omp_uint j = gptr[group_id]; j < gptr[group_id + 1]; ++j) {
exc.Run([&]() {
rec[j - gptr[group_id]] = {h_preds[j], j};
});
}
XGBOOST_PARALLEL_SORT(rec.begin(), rec.end(), common::CmpFirst);
// calculate AUC
double sum_pospair = 0.0;
double sum_npos = 0.0, sum_nneg = 0.0, buf_pos = 0.0, buf_neg = 0.0;
for (size_t j = 0; j < rec.size(); ++j) {
const bst_float wt = WeightPolicy::GetWeightOfSortedRecord(info, rec, j, group_id);
const bst_float ctr = labels[rec[j].second];
// keep bucketing predictions in same bucket
if (j != 0 && rec[j].first != rec[j - 1].first) {
XGBOOST_PARALLEL_SORT(rec.begin(), rec.end(), common::CmpFirst);
// calculate AUC
double sum_pospair = 0.0;
double sum_npos = 0.0, sum_nneg = 0.0, buf_pos = 0.0, buf_neg = 0.0;
for (size_t j = 0; j < rec.size(); ++j) {
const bst_float wt = WeightPolicy::GetWeightOfSortedRecord(info, rec, j, group_id);
const bst_float ctr = labels[rec[j].second];
// keep bucketing predictions in same bucket
if (j != 0 && rec[j].first != rec[j - 1].first) {
sum_pospair += buf_neg * (sum_npos + buf_pos * 0.5);
sum_npos += buf_pos;
sum_nneg += buf_neg;
buf_neg = buf_pos = 0.0f;
}
buf_pos += ctr * wt;
buf_neg += (1.0f - ctr) * wt;
}
sum_pospair += buf_neg * (sum_npos + buf_pos * 0.5);
sum_npos += buf_pos;
sum_nneg += buf_neg;
buf_neg = buf_pos = 0.0f;
}
buf_pos += ctr * wt;
buf_neg += (1.0f - ctr) * wt;
// check weird conditions
if (sum_npos <= 0.0 || sum_nneg <= 0.0) {
auc_error += 1;
} else {
// this is the AUC
sum_auc += sum_pospair / (sum_npos * sum_nneg);
}
});
}
sum_pospair += buf_neg * (sum_npos + buf_pos * 0.5);
sum_npos += buf_pos;
sum_nneg += buf_neg;
// check weird conditions
if (sum_npos <= 0.0 || sum_nneg <= 0.0) {
auc_error += 1;
} else {
// this is the AUC
sum_auc += sum_pospair / (sum_npos * sum_nneg);
}
}
});
}
exc.Rethrow();
// Report average AUC across all groups
// In distributed mode, workers which only contains pos or neg samples
@@ -316,19 +324,25 @@ struct EvalRank : public Metric, public EvalRankConfig {
const auto &labels = info.labels_.ConstHostVector();
const auto &h_preds = preds.ConstHostVector();
dmlc::OMPException exc;
#pragma omp parallel reduction(+:sum_metric)
{
// each thread takes a local rec
PredIndPairContainer rec;
#pragma omp for schedule(static)
for (bst_omp_uint k = 0; k < ngroups; ++k) {
rec.clear();
for (unsigned j = gptr[k]; j < gptr[k + 1]; ++j) {
rec.emplace_back(h_preds[j], static_cast<int>(labels[j]));
exc.Run([&]() {
// each thread takes a local rec
PredIndPairContainer rec;
#pragma omp for schedule(static)
for (bst_omp_uint k = 0; k < ngroups; ++k) {
exc.Run([&]() {
rec.clear();
for (unsigned j = gptr[k]; j < gptr[k + 1]; ++j) {
rec.emplace_back(h_preds[j], static_cast<int>(labels[j]));
}
sum_metric += this->EvalGroup(&rec);
});
}
sum_metric += this->EvalGroup(&rec);
}
});
}
exc.Rethrow();
}
if (distributed) {
@@ -526,66 +540,75 @@ struct EvalAucPR : public Metric {
const auto &h_labels = info.labels_.ConstHostVector();
const auto &h_preds = preds.ConstHostVector();
dmlc::OMPException exc;
#pragma omp parallel reduction(+:sum_auc, auc_error) if (ngroups > 1)
{
// Each thread works on a distinct group and sorts the predictions in that group
PredIndPairContainer rec;
#pragma omp for schedule(static)
for (bst_omp_uint group_id = 0; group_id < ngroups; ++group_id) {
double total_pos = 0.0;
double total_neg = 0.0;
// Same thread can work on multiple groups one after another; hence, resize
// the predictions array based on the current group
rec.resize(gptr[group_id + 1] - gptr[group_id]);
#pragma omp parallel for schedule(static) reduction(+:total_pos, total_neg) \
if (!omp_in_parallel()) // NOLINT
for (bst_omp_uint j = gptr[group_id]; j < gptr[group_id + 1]; ++j) {
const bst_float wt = WeightPolicy::GetWeightOfInstance(info, j, group_id);
total_pos += wt * h_labels[j];
total_neg += wt * (1.0f - h_labels[j]);
rec[j - gptr[group_id]] = {h_preds[j], j};
}
// we need pos > 0 && neg > 0
if (total_pos <= 0.0 || total_neg <= 0.0) {
auc_error += 1;
continue;
}
XGBOOST_PARALLEL_SORT(rec.begin(), rec.end(), common::CmpFirst);
// calculate AUC
double tp = 0.0, prevtp = 0.0, fp = 0.0, prevfp = 0.0, h = 0.0, a = 0.0, b = 0.0;
for (size_t j = 0; j < rec.size(); ++j) {
const bst_float wt = WeightPolicy::GetWeightOfSortedRecord(info, rec, j, group_id);
tp += wt * h_labels[rec[j].second];
fp += wt * (1.0f - h_labels[rec[j].second]);
if ((j < rec.size() - 1 && rec[j].first != rec[j + 1].first) || j == rec.size() - 1) {
if (tp == prevtp) {
a = 1.0;
b = 0.0;
} else {
h = (fp - prevfp) / (tp - prevtp);
a = 1.0 + h;
b = (prevfp - h * prevtp) / total_pos;
exc.Run([&]() {
// Each thread works on a distinct group and sorts the predictions in that group
PredIndPairContainer rec;
#pragma omp for schedule(static)
for (bst_omp_uint group_id = 0; group_id < ngroups; ++group_id) {
exc.Run([&]() {
double total_pos = 0.0;
double total_neg = 0.0;
// Same thread can work on multiple groups one after another; hence, resize
// the predictions array based on the current group
rec.resize(gptr[group_id + 1] - gptr[group_id]);
#pragma omp parallel for schedule(static) reduction(+:total_pos, total_neg) \
if (!omp_in_parallel()) // NOLINT
for (bst_omp_uint j = gptr[group_id]; j < gptr[group_id + 1]; ++j) {
exc.Run([&]() {
const bst_float wt = WeightPolicy::GetWeightOfInstance(info, j, group_id);
total_pos += wt * h_labels[j];
total_neg += wt * (1.0f - h_labels[j]);
rec[j - gptr[group_id]] = {h_preds[j], j};
});
}
if (0.0 != b) {
sum_auc += (tp / total_pos - prevtp / total_pos -
b / a * (std::log(a * tp / total_pos + b) -
std::log(a * prevtp / total_pos + b))) / a;
} else {
sum_auc += (tp / total_pos - prevtp / total_pos) / a;
// we need pos > 0 && neg > 0
if (total_pos <= 0.0 || total_neg <= 0.0) {
auc_error += 1;
return;
}
prevtp = tp;
prevfp = fp;
}
XGBOOST_PARALLEL_SORT(rec.begin(), rec.end(), common::CmpFirst);
// calculate AUC
double tp = 0.0, prevtp = 0.0, fp = 0.0, prevfp = 0.0, h = 0.0, a = 0.0, b = 0.0;
for (size_t j = 0; j < rec.size(); ++j) {
const bst_float wt = WeightPolicy::GetWeightOfSortedRecord(info, rec, j, group_id);
tp += wt * h_labels[rec[j].second];
fp += wt * (1.0f - h_labels[rec[j].second]);
if ((j < rec.size() - 1 && rec[j].first != rec[j + 1].first) ||
j == rec.size() - 1) {
if (tp == prevtp) {
a = 1.0;
b = 0.0;
} else {
h = (fp - prevfp) / (tp - prevtp);
a = 1.0 + h;
b = (prevfp - h * prevtp) / total_pos;
}
if (0.0 != b) {
sum_auc += (tp / total_pos - prevtp / total_pos -
b / a * (std::log(a * tp / total_pos + b) -
std::log(a * prevtp / total_pos + b))) / a;
} else {
sum_auc += (tp / total_pos - prevtp / total_pos) / a;
}
prevtp = tp;
prevfp = fp;
}
}
// sanity check
if (tp < 0 || prevtp < 0 || fp < 0 || prevfp < 0) {
CHECK(!auc_error) << "AUC-PR: error in calculation";
}
});
}
// sanity check
if (tp < 0 || prevtp < 0 || fp < 0 || prevfp < 0) {
CHECK(!auc_error) << "AUC-PR: error in calculation";
}
}
});
}
exc.Rethrow();
// Report average AUC-PR across all groups
// In distributed mode, workers which only contains pos or neg samples

View File

@@ -58,15 +58,19 @@ class ElementWiseSurvivalMetricsReduction {
double residue_sum = 0;
double weights_sum = 0;
dmlc::OMPException exc;
#pragma omp parallel for reduction(+: residue_sum, weights_sum) schedule(static)
for (omp_ulong i = 0; i < ndata; ++i) {
const double wt = h_weights.empty() ? 1.0 : static_cast<double>(h_weights[i]);
residue_sum += policy_.EvalRow(
static_cast<double>(h_labels_lower_bound[i]),
static_cast<double>(h_labels_upper_bound[i]),
static_cast<double>(h_preds[i])) * wt;
weights_sum += wt;
exc.Run([&]() {
const double wt = h_weights.empty() ? 1.0 : static_cast<double>(h_weights[i]);
residue_sum += policy_.EvalRow(
static_cast<double>(h_labels_lower_bound[i]),
static_cast<double>(h_labels_upper_bound[i]),
static_cast<double>(h_preds[i])) * wt;
weights_sum += wt;
});
}
exc.Rethrow();
PackedReduceResult res{residue_sum, weights_sum};
return res;
}