Support building SimpleDMatrix from Arrow data format (#7512)

* Integrate with Arrow C data API.
* Support Arrow dataset.
* Support Arrow table.

Co-authored-by: Xiaochang Wu <xiaochang.wu@intel.com>
Co-authored-by: Jiaming Yuan <jm.yuan@outlook.com>
Co-authored-by: Zhang Zhang <zhang.zhang@intel.com>
This commit is contained in:
Xiaochang Wu
2022-03-14 22:25:19 -07:00
committed by GitHub
parent 6b6849b001
commit 613ec36c5a
14 changed files with 732 additions and 10 deletions

View File

@@ -249,5 +249,70 @@ template SimpleDMatrix::SimpleDMatrix(
IteratorAdapter<DataIterHandle, XGBCallbackDataIterNext, XGBoostBatchCSR>
*adapter,
float missing, int nthread);
template <>
SimpleDMatrix::SimpleDMatrix(RecordBatchesIterAdapter* adapter, float missing, int nthread) {
auto& offset_vec = sparse_page_->offset.HostVector();
auto& data_vec = sparse_page_->data.HostVector();
uint64_t total_batch_size = 0;
uint64_t total_elements = 0;
adapter->BeforeFirst();
// Iterate over batches of input data
while (adapter->Next()) {
auto& batches = adapter->Value();
size_t num_elements = 0;
size_t num_rows = 0;
// Import Arrow RecordBatches
#pragma omp parallel for reduction(+ : num_elements, num_rows) num_threads(nthread)
for (int i = 0; i < static_cast<int>(batches.size()); ++i) { // NOLINT
num_elements += batches[i]->Import(missing);
num_rows += batches[i]->Size();
}
total_elements += num_elements;
total_batch_size += num_rows;
// Compute global offset for every row and starting row for every batch
std::vector<uint64_t> batch_offsets(batches.size());
for (size_t i = 0; i < batches.size(); ++i) {
if (i == 0) {
batch_offsets[i] = total_batch_size - num_rows;
batches[i]->ShiftRowOffsets(total_elements - num_elements);
} else {
batch_offsets[i] = batch_offsets[i - 1] + batches[i - 1]->Size();
batches[i]->ShiftRowOffsets(batches[i - 1]->RowOffsets().back());
}
}
// Pre-allocate DMatrix memory
data_vec.resize(total_elements);
offset_vec.resize(total_batch_size + 1);
// Copy data into DMatrix
#pragma omp parallel num_threads(nthread)
{
#pragma omp for nowait
for (int i = 0; i < static_cast<int>(batches.size()); ++i) { // NOLINT
size_t begin = batches[i]->RowOffsets()[0];
for (size_t k = 0; k < batches[i]->Size(); ++k) {
for (size_t j = 0; j < batches[i]->NumColumns(); ++j) {
auto element = batches[i]->GetColumn(j).GetElement(k);
if (!std::isnan(element.value)) {
data_vec[begin++] = Entry(element.column_idx, element.value);
}
}
}
}
#pragma omp for nowait
for (int i = 0; i < static_cast<int>(batches.size()); ++i) {
auto& offsets = batches[i]->RowOffsets();
std::copy(offsets.begin() + 1, offsets.end(), offset_vec.begin() + batch_offsets[i] + 1);
}
}
}
// Synchronise worker columns
info_.num_col_ = adapter->NumColumns();
rabit::Allreduce<rabit::op::Max>(&info_.num_col_, 1);
info_.num_row_ = total_batch_size;
info_.num_nonzero_ = data_vec.size();
CHECK_EQ(offset_vec.back(), info_.num_nonzero_);
}
} // namespace data
} // namespace xgboost