[BLOCKING] Handle empty rows in data iterators correctly (#5929)

* [jvm-packages] Handle empty rows in data iterators correctly

* Fix clang-tidy error

* last empty row

* Add comments [skip ci]

Co-authored-by: Nan Zhu <nanzhu@uber.com>
This commit is contained in:
Philip Hyunsu Cho
2020-07-25 13:46:19 -07:00
committed by GitHub
parent a4de2f68e4
commit 487ab0ce73
5 changed files with 79 additions and 19 deletions

View File

@@ -833,9 +833,9 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread
uint64_t max_columns = 0;
// First-pass over the batch counting valid elements
size_t num_lines = batch.Size();
size_t batch_size = batch.Size();
#pragma omp parallel for schedule(static)
for (omp_ulong i = 0; i < static_cast<omp_ulong>(num_lines);
for (omp_ulong i = 0; i < static_cast<omp_ulong>(batch_size);
++i) { // NOLINT(*)
int tid = omp_get_thread_num();
auto line = batch.GetLine(i);
@@ -847,7 +847,7 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread
size_t key = element.row_idx - base_rowid;
// Adapter row index is absolute, here we want it relative to
// current page
CHECK_GE(key, builder_base_row_offset);
CHECK_GE(key, builder_base_row_offset);
builder.AddBudget(key, tid);
}
}
@@ -856,7 +856,7 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread
// Second pass over batch, placing elements in correct position
#pragma omp parallel for schedule(static)
for (omp_ulong i = 0; i < static_cast<omp_ulong>(num_lines);
for (omp_ulong i = 0; i < static_cast<omp_ulong>(batch_size);
++i) { // NOLINT(*)
int tid = omp_get_thread_num();
auto line = batch.GetLine(i);

View File

@@ -6,6 +6,7 @@
*/
#include <vector>
#include <limits>
#include <type_traits>
#include <algorithm>
#include "xgboost/data.h"
@@ -103,6 +104,8 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
auto& offset_vec = sparse_page_.offset.HostVector();
auto& data_vec = sparse_page_.data.HostVector();
uint64_t inferred_num_columns = 0;
uint64_t total_batch_size = 0;
// batch_size is either number of rows or cols, depending on data layout
adapter->BeforeFirst();
// Iterate over batches of input data
@@ -110,6 +113,7 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
auto& batch = adapter->Value();
auto batch_max_columns = sparse_page_.Push(batch, missing, nthread);
inferred_num_columns = std::max(batch_max_columns, inferred_num_columns);
total_batch_size += batch.Size();
// Append meta information if available
if (batch.Labels() != nullptr) {
auto& labels = info_.labels_.HostVector();
@@ -153,16 +157,30 @@ SimpleDMatrix::SimpleDMatrix(AdapterT* adapter, float missing, int nthread) {
info_.num_col_ = adapter->NumColumns();
}
// Synchronise worker columns
rabit::Allreduce<rabit::op::Max>(&info_.num_col_, 1);
if (adapter->NumRows() == kAdapterUnknownSize) {
info_.num_row_ = offset_vec.size() - 1;
using IteratorAdapterT
= IteratorAdapter<DataIterHandle, XGBCallbackDataIterNext, XGBoostBatchCSR>;
// If AdapterT is either IteratorAdapter or FileAdapter type, use the total batch size to
// determine the correct number of rows, as offset_vec may be too short
if (std::is_same<AdapterT, IteratorAdapterT>::value
|| std::is_same<AdapterT, FileAdapter>::value) {
info_.num_row_ = total_batch_size;
// Ensure offset_vec.size() - 1 == [number of rows]
while (offset_vec.size() - 1 < total_batch_size) {
offset_vec.emplace_back(offset_vec.back());
}
} else {
CHECK((std::is_same<AdapterT, CSCAdapter>::value)) << "Expecting CSCAdapter";
info_.num_row_ = offset_vec.size() - 1;
}
} else {
if (offset_vec.empty()) {
offset_vec.emplace_back(0);
}
while (offset_vec.size() - 1 < adapter->NumRows()) {
offset_vec.emplace_back(offset_vec.back());
}