[HOTFIX] distributed training with hist method (#4716)

* add parallel test for hist.EvalualiteSplit

* update test_openmp.py

* update test_openmp.py

* update test_openmp.py

* update test_openmp.py

* update test_openmp.py

* fix OMP schedule policy

* fix clang-tidy

* add logging: total_num_bins

* fix

* fix

* test

* replace guided OPENMP policy with static in updater_quantile_hist.cc
This commit is contained in:
Xu Xiao 2019-08-14 02:27:29 +08:00 committed by Philip Hyunsu Cho
parent c0ffe65f5c
commit ef9af33a00
4 changed files with 70 additions and 30 deletions

View File

@ -48,6 +48,7 @@ void HistogramCuts::Build(DMatrix* dmat, uint32_t const max_num_bins) {
DenseCuts cuts(this);
cuts.Build(dmat, max_num_bins);
}
LOG(INFO) << "Total number of hist bins: " << cut_ptrs_.back();
}
bool CutsBuilder::UseGroup(DMatrix* dmat) {

View File

@ -556,7 +556,7 @@ void QuantileHistMaker::Builder::BuildHistsBatch(const std::vector<ExpandEntry>&
reinterpret_cast<const GradientPair::ValueT*>(gpair.data());
// 2. Build partial histograms for each node
#pragma omp parallel for schedule(guided)
#pragma omp parallel for schedule(static)
for (int32_t itask = 0; itask < n_hist_buidling_tasks; ++itask) {
const size_t tid = omp_get_thread_num();
const int32_t nid = task_nid[itask];
@ -856,7 +856,7 @@ bool QuantileHistMaker::Builder::UpdatePredictionCache(
}
}
#pragma omp parallel for schedule(guided)
#pragma omp parallel for schedule(static)
for (omp_ulong k = 0; k < tasks_elem.size(); ++k) {
const RowSetCollection::Elem rowset = tasks_elem[k];
if (rowset.begin != nullptr && rowset.end != nullptr && rowset.node_id != -1) {
@ -1079,7 +1079,7 @@ void QuantileHistMaker::Builder::EvaluateSplitsBatch(
// partial results
std::vector<std::pair<SplitEntry, SplitEntry>> splits(tasks.size());
// parallel enumeration
#pragma omp parallel for schedule(guided)
#pragma omp parallel for schedule(static)
for (omp_ulong i = 0; i < tasks.size(); ++i) {
// node_idx : offset within `nodes` list
const int32_t node_idx = tasks[i].first;

View File

@ -225,6 +225,14 @@ class QuantileHistMock : public QuantileHistMaker {
delete dmat;
}
void TestEvaluateSplitParallel(const GHistIndexBlockMatrix &quantile_index_block,
const RegTree &tree) {
omp_set_num_threads(2);
TestEvaluateSplit(quantile_index_block, tree);
omp_set_num_threads(1);
}
};
int static constexpr kNRows = 8, kNCols = 16;

View File

@ -1,43 +1,74 @@
# -*- coding: utf-8 -*-
from scipy.sparse import csr_matrix
import xgboost as xgb
import unittest
import numpy as np
class TestOMP(unittest.TestCase):
def test_omp(self):
# a contrived example where one node has an instance set of size 2.
data = [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
indices = [2, 1, 1, 2, 0, 0, 2, 0, 1, 3]
indptr = [0, 1, 2, 4, 5, 7, 9, 10]
A = csr_matrix((data, indices, indptr), shape=(7, 4))
y = [1, 1, 0, 0, 0, 1, 1]
dtrain = xgb.DMatrix(A, label=y)
dpath = 'demo/data/'
dtrain = xgb.DMatrix(dpath + 'agaricus.txt.train')
dtest = xgb.DMatrix(dpath + 'agaricus.txt.test')
# 1. use 3 threads to train a tree with an instance set of size 2
param = {'booster': 'gbtree',
'objective': 'binary:logistic',
'grow_policy': 'lossguide',
'grow_policy': 'depthwise',
'tree_method': 'hist',
'eval_metric': 'auc',
'max_depth': 0,
'max_leaves': 1024,
'min_child_weight': 0,
'nthread': 3}
'eval_metric': 'error',
'max_depth': 5,
'min_child_weight': 0}
watchlist = [(dtrain, 'train')]
num_round = 1
res = {}
xgb.train(param, dtrain, num_round, watchlist, evals_result=res)
assert res['train']['auc'][-1] > 0.99
watchlist = [(dtest, 'eval'), (dtrain, 'train')]
num_round = 5
def run_trial():
res = {}
bst = xgb.train(param, dtrain, num_round, watchlist, evals_result=res)
metrics = [res['train']['error'][-1], res['eval']['error'][-1]]
preds = bst.predict(dtest)
return metrics, preds
def consist_test(title, n):
auc, pred = run_trial()
for i in range(n-1):
auc2, pred2 = run_trial()
try:
assert auc == auc2
assert np.array_equal(pred, pred2)
except Exception as e:
print('-------test %s failed, num_trial: %d-------' % (title, i))
raise e
auc, pred = auc2, pred2
return auc, pred
print('test approx ...')
param['tree_method'] = 'approx'
# 2. vary number of threads and test whether you get the same result
param['nthread'] = 1
res2 = {}
xgb.train(param, dtrain, num_round, watchlist, evals_result=res2)
assert res['train']['auc'][-1] == res2['train']['auc'][-1]
auc_1, pred_1 = consist_test('approx_thread_1', 100)
param['nthread'] = 2
res3 = {}
xgb.train(param, dtrain, num_round, watchlist, evals_result=res3)
assert res['train']['auc'][-1] == res3['train']['auc'][-1]
auc_2, pred_2 = consist_test('approx_thread_2', 100)
param['nthread'] = 3
auc_3, pred_3 = consist_test('approx_thread_3', 100)
assert auc_1 == auc_2 == auc_3
assert np.array_equal(auc_1, auc_2)
assert np.array_equal(auc_1, auc_3)
print('test hist ...')
param['tree_method'] = 'hist'
param['nthread'] = 1
auc_1, pred_1 = consist_test('hist_thread_1', 100)
param['nthread'] = 2
auc_2, pred_2 = consist_test('hist_thread_2', 100)
param['nthread'] = 3
auc_3, pred_3 = consist_test('hist_thread_3', 100)
assert auc_1 == auc_2 == auc_3
assert np.array_equal(auc_1, auc_2)
assert np.array_equal(auc_1, auc_3)