- fix issues with training with external memory on cpu (#4487)

* - fix issues with training with external memory on cpu
   - use the batch size to determine the correct number of rows in a batch
   - use the right number of threads in omp parallalization if the batch size
     is less than the default omp max threads (applicable for the last batch)

* - handle scenarios where last batch size is < available number of threads
- augment tests such that we can test all scenarios (batch size <, >, = number of threads)
This commit is contained in:
sriramch 2019-05-28 17:31:30 -07:00 committed by Rory Mitchell
parent 972f693eaf
commit a3fedbeaa8
2 changed files with 34 additions and 14 deletions

View File

@ -178,8 +178,7 @@ uint32_t HistCutMatrix::GetBinIdx(const Entry& e) {
void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) { void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) {
cut.Init(p_fmat, max_num_bins); cut.Init(p_fmat, max_num_bins);
const int32_t nthread = omp_get_max_threads(); const size_t nthread = omp_get_max_threads();
// const int nthread = 1;
const uint32_t nbins = cut.row_ptr.back(); const uint32_t nbins = cut.row_ptr.back();
hit_count.resize(nbins, 0); hit_count.resize(nbins, 0);
hit_count_tloc_.resize(nthread * nbins, 0); hit_count_tloc_.resize(nthread * nbins, 0);
@ -197,17 +196,21 @@ void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) {
size_t prev_sum = 0; size_t prev_sum = 0;
for (const auto &batch : p_fmat->GetRowBatches()) { for (const auto &batch : p_fmat->GetRowBatches()) {
MemStackAllocator<size_t, 128> partial_sums(nthread); // The number of threads is pegged to the batch size. If the OMP
// block is parallelized on anything other than the batch/block size,
// it should be reassigned
const size_t batch_threads = std::min(batch.Size(), static_cast<size_t>(omp_get_max_threads()));
MemStackAllocator<size_t, 128> partial_sums(batch_threads);
size_t* p_part = partial_sums.Get(); size_t* p_part = partial_sums.Get();
size_t block_size = batch.Size() / nthread; size_t block_size = batch.Size() / batch_threads;
#pragma omp parallel num_threads(nthread) #pragma omp parallel num_threads(batch_threads)
{ {
#pragma omp for #pragma omp for
for (int32_t tid = 0; tid < nthread; ++tid) { for (int32_t tid = 0; tid < batch_threads; ++tid) {
size_t ibegin = block_size * tid; size_t ibegin = block_size * tid;
size_t iend = (tid == (nthread-1) ? batch.Size() : (block_size * (tid+1))); size_t iend = (tid == (batch_threads-1) ? batch.Size() : (block_size * (tid+1)));
size_t sum = 0; size_t sum = 0;
for (size_t i = ibegin; i < iend; ++i) { for (size_t i = ibegin; i < iend; ++i) {
@ -219,15 +222,15 @@ void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) {
#pragma omp single #pragma omp single
{ {
p_part[0] = prev_sum; p_part[0] = prev_sum;
for (int32_t i = 1; i < nthread; ++i) { for (int32_t i = 1; i < batch_threads; ++i) {
p_part[i] = p_part[i - 1] + row_ptr[rbegin + i*block_size]; p_part[i] = p_part[i - 1] + row_ptr[rbegin + i*block_size];
} }
} }
#pragma omp for #pragma omp for
for (int32_t tid = 0; tid < nthread; ++tid) { for (int32_t tid = 0; tid < batch_threads; ++tid) {
size_t ibegin = block_size * tid; size_t ibegin = block_size * tid;
size_t iend = (tid == (nthread-1) ? batch.Size() : (block_size * (tid+1))); size_t iend = (tid == (batch_threads-1) ? batch.Size() : (block_size * (tid+1)));
for (size_t i = ibegin; i < iend; ++i) { for (size_t i = ibegin; i < iend; ++i) {
row_ptr[rbegin + 1 + i] += p_part[tid]; row_ptr[rbegin + 1 + i] += p_part[tid];
@ -235,13 +238,12 @@ void GHistIndexMatrix::Init(DMatrix* p_fmat, int max_num_bins) {
} }
} }
index.resize(row_ptr.back()); index.resize(row_ptr[rbegin + batch.Size()]);
CHECK_GT(cut.cut.size(), 0U); CHECK_GT(cut.cut.size(), 0U);
auto bsize = static_cast<omp_ulong>(batch.Size()); #pragma omp parallel for num_threads(batch_threads) schedule(static)
#pragma omp parallel for num_threads(nthread) schedule(static) for (omp_ulong i = 0; i < batch.Size(); ++i) { // NOLINT(*)
for (omp_ulong i = 0; i < bsize; ++i) { // NOLINT(*)
const int tid = omp_get_thread_num(); const int tid = omp_get_thread_num();
size_t ibegin = row_ptr[rbegin + i]; size_t ibegin = row_ptr[rbegin + i];
size_t iend = row_ptr[rbegin + i + 1]; size_t iend = row_ptr[rbegin + i + 1];

View File

@ -50,5 +50,23 @@ TEST(DenseColumnWithMissing, Test) {
} }
delete dmat; delete dmat;
} }
void
TestGHistIndexMatrixCreation(size_t nthreads) {
/* This should create multiple sparse pages */
std::unique_ptr<DMatrix> dmat = CreateSparsePageDMatrix(1024, 1024);
omp_set_num_threads(nthreads);
GHistIndexMatrix gmat;
gmat.Init(dmat.get(), 256);
}
TEST(HistIndexCreationWithExternalMemory, Test) {
// Vary the number of threads to make sure that the last batch
// is distributed properly to the available number of threads
// in the thread pool
TestGHistIndexMatrixCreation(20);
TestGHistIndexMatrixCreation(30);
TestGHistIndexMatrixCreation(40);
}
} // namespace common } // namespace common
} // namespace xgboost } // namespace xgboost