Add number of columns to native data iterator. (#5202)

* Change native data iter into an adapter.
This commit is contained in:
Jiaming Yuan
2020-02-25 23:42:01 +08:00
committed by GitHub
parent e0509b3307
commit f2b8cd2922
11 changed files with 244 additions and 156 deletions

View File

@@ -11,7 +11,7 @@
#include <string>
#include <memory>
#include "xgboost/base.h"
#include "xgboost/data.h"
#include "xgboost/learner.h"
#include "xgboost/c_api.h"
@@ -24,116 +24,6 @@
#include "../data/adapter.h"
#include "../data/simple_dmatrix.h"
namespace xgboost {
// declare the data callback.
XGB_EXTERN_C int XGBoostNativeDataIterSetData(
void *handle, XGBoostBatchCSR batch);
/*! \brief Native data iterator that takes callback to return data */
class NativeDataIter : public dmlc::Parser<uint32_t> {
public:
NativeDataIter(DataIterHandle data_handle,
XGBCallbackDataIterNext* next_callback)
: at_first_(true), bytes_read_(0),
data_handle_(data_handle), next_callback_(next_callback) {
}
// override functions
void BeforeFirst() override {
CHECK(at_first_) << "cannot reset NativeDataIter";
}
bool Next() override {
if ((*next_callback_)(data_handle_,
XGBoostNativeDataIterSetData,
this) != 0) {
at_first_ = false;
return true;
} else {
return false;
}
}
const dmlc::RowBlock<uint32_t>& Value() const override {
return block_;
}
size_t BytesRead() const override {
return bytes_read_;
}
// callback to set the data
void SetData(const XGBoostBatchCSR& batch) {
offset_.clear();
label_.clear();
weight_.clear();
index_.clear();
value_.clear();
offset_.insert(offset_.end(), batch.offset, batch.offset + batch.size + 1);
if (batch.label != nullptr) {
label_.insert(label_.end(), batch.label, batch.label + batch.size);
}
if (batch.weight != nullptr) {
weight_.insert(weight_.end(), batch.weight, batch.weight + batch.size);
}
if (batch.index != nullptr) {
index_.insert(index_.end(), batch.index + offset_[0], batch.index + offset_.back());
}
if (batch.value != nullptr) {
value_.insert(value_.end(), batch.value + offset_[0], batch.value + offset_.back());
}
if (offset_[0] != 0) {
size_t base = offset_[0];
for (size_t& item : offset_) {
item -= base;
}
}
block_.size = batch.size;
block_.offset = dmlc::BeginPtr(offset_);
block_.label = dmlc::BeginPtr(label_);
block_.weight = dmlc::BeginPtr(weight_);
block_.qid = nullptr;
block_.field = nullptr;
block_.index = dmlc::BeginPtr(index_);
block_.value = dmlc::BeginPtr(value_);
bytes_read_ += offset_.size() * sizeof(size_t) +
label_.size() * sizeof(dmlc::real_t) +
weight_.size() * sizeof(dmlc::real_t) +
index_.size() * sizeof(uint32_t) +
value_.size() * sizeof(dmlc::real_t);
}
private:
// at the beinning.
bool at_first_;
// bytes that is read.
size_t bytes_read_;
// handle to the iterator,
DataIterHandle data_handle_;
// call back to get the data.
XGBCallbackDataIterNext* next_callback_;
// internal offset
std::vector<size_t> offset_;
// internal label data
std::vector<dmlc::real_t> label_;
// internal weight data
std::vector<dmlc::real_t> weight_;
// internal index.
std::vector<uint32_t> index_;
// internal value.
std::vector<dmlc::real_t> value_;
// internal Rowblock
dmlc::RowBlock<uint32_t> block_;
};
int XGBoostNativeDataIterSetData(
void *handle, XGBoostBatchCSR batch) {
API_BEGIN();
static_cast<xgboost::NativeDataIter*>(handle)->SetData(batch);
API_END();
}
} // namespace xgboost
using namespace xgboost; // NOLINT(*);
/*! \brief entry to to easily hold returning information */
@@ -186,21 +76,23 @@ int XGDMatrixCreateFromFile(const char *fname,
API_END();
}
int XGDMatrixCreateFromDataIter(
void* data_handle,
XGBCallbackDataIterNext* callback,
const char *cache_info,
DMatrixHandle *out) {
XGB_DLL int XGDMatrixCreateFromDataIter(
void *data_handle, // a Java interator
XGBCallbackDataIterNext *callback, // C++ callback defined in xgboost4j.cpp
const char *cache_info, DMatrixHandle *out) {
API_BEGIN();
std::string scache;
if (cache_info != nullptr) {
scache = cache_info;
}
NativeDataIter parser(data_handle, callback);
data::FileAdapter adapter(&parser);
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(
&adapter, std::numeric_limits<float>::quiet_NaN(), 1, scache));
xgboost::data::IteratorAdapter adapter(data_handle, callback);
*out = new std::shared_ptr<DMatrix> {
DMatrix::Create(
&adapter, std::numeric_limits<float>::quiet_NaN(),
1, scache
)
};
API_END();
}

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2014 by Contributors
* Copyright 2014-2020 by Contributors
* \file group_data.h
* \brief this file defines utils to group data by integer keys
* Input: given input sequence (key,value), (k1,v1), (k2,v2)
@@ -14,6 +14,7 @@
#ifndef XGBOOST_COMMON_GROUP_DATA_H_
#define XGBOOST_COMMON_GROUP_DATA_H_
#include <cstddef>
#include <vector>
#include <algorithm>
@@ -44,15 +45,6 @@ class ParallelGroupBuilder {
size_t base_row_offset = 0)
: rptr_(*p_rptr),
data_(*p_data),
thread_rptr_(tmp_thread_rptr_),
base_row_offset_(base_row_offset) {}
ParallelGroupBuilder(std::vector<SizeType> *p_rptr,
std::vector<ValueType> *p_data,
std::vector<std::vector<SizeType> > *p_thread_rptr,
size_t base_row_offset = 0)
: rptr_(*p_rptr),
data_(*p_data),
thread_rptr_(*p_thread_rptr),
base_row_offset_(base_row_offset) {}
/*!
@@ -61,7 +53,7 @@ class ParallelGroupBuilder {
* \param max_key number of keys in the matrix, can be smaller than expected
* \param nthread number of thread that will be used in construction
*/
inline void InitBudget(std::size_t max_key, int nthread) {
void InitBudget(std::size_t max_key, int nthread) {
thread_rptr_.resize(nthread);
for (std::size_t i = 0; i < thread_rptr_.size(); ++i) {
thread_rptr_[i].resize(max_key - std::min(base_row_offset_, max_key));
@@ -74,7 +66,7 @@ class ParallelGroupBuilder {
* \param threadid the id of thread that calls this function
* \param nelem number of element budget add to this row
*/
inline void AddBudget(std::size_t key, int threadid, SizeType nelem = 1) {
void AddBudget(std::size_t key, int threadid, SizeType nelem = 1) {
std::vector<SizeType> &trptr = thread_rptr_[threadid];
size_t offset_key = key - base_row_offset_;
if (trptr.size() < offset_key + 1) {
@@ -129,9 +121,7 @@ class ParallelGroupBuilder {
/*! \brief index of nonzero entries in each row */
std::vector<ValueType> &data_;
/*! \brief thread local data structure */
std::vector<std::vector<SizeType> > &thread_rptr_;
/*! \brief local temp thread ptr, use this if not specified by the constructor */
std::vector<std::vector<SizeType> > tmp_thread_rptr_;
std::vector<std::vector<SizeType> > thread_rptr_;
/** \brief Used when rows being pushed into the builder are strictly above some number. */
size_t base_row_offset_;
};

View File

@@ -1,18 +1,26 @@
/*!
* Copyright (c) 2019 by Contributors
* Copyright (c) 2019~2020 by Contributors
* \file adapter.h
*/
#ifndef XGBOOST_DATA_ADAPTER_H_
#define XGBOOST_DATA_ADAPTER_H_
#include <dmlc/data.h>
#include <cstddef>
#include <functional>
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "xgboost/logging.h"
#include "xgboost/base.h"
#include "xgboost/data.h"
#include "xgboost/span.h"
#include "xgboost/c_api.h"
#include "../c_api/c_api_error.h"
namespace xgboost {
namespace data {
@@ -418,7 +426,7 @@ class FileAdapterBatch {
public:
class Line {
public:
Line(size_t row_idx, const uint32_t* feature_idx, const float* value,
Line(size_t row_idx, const uint32_t *feature_idx, const float *value,
size_t size)
: row_idx_(row_idx),
feature_idx_(feature_idx),
@@ -485,6 +493,112 @@ class FileAdapter : dmlc::DataIter<FileAdapterBatch> {
dmlc::Parser<uint32_t>* parser_;
};
/*! \brief Data iterator that takes callback to return data, used in JVM package for
* accepting data iterator. */
class IteratorAdapter : public dmlc::DataIter<FileAdapterBatch> {
public:
IteratorAdapter(DataIterHandle data_handle,
XGBCallbackDataIterNext* next_callback)
: columns_{data::kAdapterUnknownSize}, row_offset_{0},
at_first_(true),
data_handle_(data_handle), next_callback_(next_callback) {}
// override functions
void BeforeFirst() override {
CHECK(at_first_) << "Cannot reset IteratorAdapter";
}
bool Next() override {
if ((*next_callback_)(
data_handle_,
[](void *handle, XGBoostBatchCSR batch) -> int {
API_BEGIN();
static_cast<IteratorAdapter *>(handle)->SetData(batch);
API_END();
},
this) != 0) {
at_first_ = false;
return true;
} else {
return false;
}
}
FileAdapterBatch const& Value() const override {
return *batch_.get();
}
// callback to set the data
void SetData(const XGBoostBatchCSR& batch) {
offset_.clear();
label_.clear();
weight_.clear();
index_.clear();
value_.clear();
offset_.insert(offset_.end(), batch.offset, batch.offset + batch.size + 1);
if (batch.label != nullptr) {
label_.insert(label_.end(), batch.label, batch.label + batch.size);
}
if (batch.weight != nullptr) {
weight_.insert(weight_.end(), batch.weight, batch.weight + batch.size);
}
if (batch.index != nullptr) {
index_.insert(index_.end(), batch.index + offset_[0],
batch.index + offset_.back());
}
if (batch.value != nullptr) {
value_.insert(value_.end(), batch.value + offset_[0],
batch.value + offset_.back());
}
if (offset_[0] != 0) {
size_t base = offset_[0];
for (size_t &item : offset_) {
item -= base;
}
}
CHECK(columns_ == data::kAdapterUnknownSize || columns_ == batch.columns)
<< "Number of columns between batches changed from " << columns_
<< " to " << batch.columns;
columns_ = batch.columns;
block_.size = batch.size;
block_.offset = dmlc::BeginPtr(offset_);
block_.label = dmlc::BeginPtr(label_);
block_.weight = dmlc::BeginPtr(weight_);
block_.qid = nullptr;
block_.field = nullptr;
block_.index = dmlc::BeginPtr(index_);
block_.value = dmlc::BeginPtr(value_);
batch_.reset(new FileAdapterBatch(&block_, row_offset_));
row_offset_ += offset_.size() - 1;
}
size_t NumColumns() const { return columns_; }
size_t NumRows() const { return kAdapterUnknownSize; }
private:
std::vector<size_t> offset_;
std::vector<dmlc::real_t> label_;
std::vector<dmlc::real_t> weight_;
std::vector<uint32_t> index_;
std::vector<dmlc::real_t> value_;
size_t columns_;
size_t row_offset_;
// at the beinning.
bool at_first_;
// handle to the iterator,
DataIterHandle data_handle_;
// call back to get the data.
XGBCallbackDataIterNext *next_callback_;
// internal Rowblock
dmlc::RowBlock<uint32_t> block_;
std::unique_ptr<FileAdapterBatch> batch_;
};
class DMatrixSliceAdapterBatch {
public:
// Fetch metainfo values according to sliced rows

View File

@@ -463,6 +463,9 @@ template DMatrix* DMatrix::Create<data::FileAdapter>(
template DMatrix* DMatrix::Create<data::DMatrixSliceAdapter>(
data::DMatrixSliceAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
template DMatrix* DMatrix::Create<data::IteratorAdapter>(
data::IteratorAdapter* adapter, float missing, int nthread,
const std::string& cache_prefix, size_t page_size);
SparsePage SparsePage::GetTranspose(int num_columns) const {
SparsePage transpose;
@@ -544,15 +547,15 @@ uint64_t SparsePage::Push(const AdapterBatchT& batch, float missing, int nthread
int tid = omp_get_thread_num();
auto line = batch.GetLine(i);
for (auto j = 0ull; j < line.Size(); j++) {
auto element = line.GetElement(j);
data::COOTuple element = line.GetElement(j);
max_columns =
std::max(max_columns, static_cast<uint64_t>(element.column_idx + 1));
if (!common::CheckNAN(element.value) && element.value != missing) {
size_t key = element.row_idx -
base_rowid; // Adapter row index is absolute, here we want
// it relative to current page
size_t key = element.row_idx - base_rowid;
// Adapter row index is absolute, here we want it relative to
// current page
CHECK_GE(key, builder_base_row_offset);
builder.AddBudget(element.row_idx - base_rowid, tid);
builder.AddBudget(key, tid);
}
}
}

View File

@@ -1,5 +1,5 @@
/*!
* Copyright 2014 by Contributors
* Copyright 2014~2020 by Contributors
* \file simple_dmatrix.cc
* \brief the input data structure for gradient boosting
* \author Tianqi Chen
@@ -8,7 +8,7 @@
#include <xgboost/data.h>
#include "./simple_batch_iterator.h"
#include "../common/random.h"
#include "../data/adapter.h"
#include "adapter.h"
namespace xgboost {
namespace data {
@@ -175,5 +175,7 @@ template SimpleDMatrix::SimpleDMatrix(FileAdapter* adapter, float missing,
int nthread);
template SimpleDMatrix::SimpleDMatrix(DMatrixSliceAdapter* adapter, float missing,
int nthread);
template SimpleDMatrix::SimpleDMatrix(IteratorAdapter* adapter, float missing,
int nthread);
} // namespace data
} // namespace xgboost

View File

@@ -58,8 +58,9 @@ class RegLossObj : public ObjFunction {
LOG(WARNING) << "Label set is empty.";
}
CHECK_EQ(preds.Size(), info.labels_.Size())
<< "labels are not correctly provided"
<< "preds.size=" << preds.Size() << ", label.size=" << info.labels_.Size();
<< " " << "labels are not correctly provided"
<< "preds.size=" << preds.Size() << ", label.size=" << info.labels_.Size() << ", "
<< "Loss: " << Loss::Name();
size_t const ndata = preds.Size();
out_gpair->Resize(ndata);
auto device = tparam_->gpu_id;