External data adapters (#5044)
* Use external data adapters as lightweight intermediate layer between external data and DMatrix
This commit is contained in:
@@ -18,9 +18,8 @@
|
||||
|
||||
#include "c_api_error.h"
|
||||
#include "../data/simple_csr_source.h"
|
||||
#include "../common/math.h"
|
||||
#include "../common/io.h"
|
||||
#include "../common/group_data.h"
|
||||
#include "../data/adapter.h"
|
||||
|
||||
|
||||
namespace xgboost {
|
||||
@@ -218,37 +217,9 @@ XGB_DLL int XGDMatrixCreateFromCSREx(const size_t* indptr,
|
||||
size_t nelem,
|
||||
size_t num_col,
|
||||
DMatrixHandle* out) {
|
||||
std::unique_ptr<data::SimpleCSRSource> source(new data::SimpleCSRSource());
|
||||
|
||||
API_BEGIN();
|
||||
data::SimpleCSRSource& mat = *source;
|
||||
auto& offset_vec = mat.page_.offset.HostVector();
|
||||
auto& data_vec = mat.page_.data.HostVector();
|
||||
offset_vec.reserve(nindptr);
|
||||
data_vec.reserve(nelem);
|
||||
offset_vec.resize(1);
|
||||
offset_vec[0] = 0;
|
||||
size_t num_column = 0;
|
||||
for (size_t i = 1; i < nindptr; ++i) {
|
||||
for (size_t j = indptr[i - 1]; j < indptr[i]; ++j) {
|
||||
if (!common::CheckNAN(data[j])) {
|
||||
// automatically skip nan.
|
||||
data_vec.emplace_back(Entry(indices[j], data[j]));
|
||||
num_column = std::max(num_column, static_cast<size_t>(indices[j] + 1));
|
||||
}
|
||||
}
|
||||
offset_vec.push_back(mat.page_.data.Size());
|
||||
}
|
||||
|
||||
mat.info.num_col_ = num_column;
|
||||
if (num_col > 0) {
|
||||
CHECK_LE(mat.info.num_col_, num_col)
|
||||
<< "num_col=" << num_col << " vs " << mat.info.num_col_;
|
||||
mat.info.num_col_ = num_col;
|
||||
}
|
||||
mat.info.num_row_ = nindptr - 1;
|
||||
mat.info.num_nonzero_ = mat.page_.data.Size();
|
||||
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(std::move(source)));
|
||||
data::CSRAdapter adapter(indptr, indices, data, nindptr - 1, nelem, num_col);
|
||||
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(&adapter, std::nan(""), 1));
|
||||
API_END();
|
||||
}
|
||||
|
||||
@@ -259,361 +230,41 @@ XGB_DLL int XGDMatrixCreateFromCSCEx(const size_t* col_ptr,
|
||||
size_t nelem,
|
||||
size_t num_row,
|
||||
DMatrixHandle* out) {
|
||||
std::unique_ptr<data::SimpleCSRSource> source(new data::SimpleCSRSource());
|
||||
|
||||
API_BEGIN();
|
||||
// FIXME: User should be able to control number of threads
|
||||
const int nthread = omp_get_max_threads();
|
||||
data::SimpleCSRSource& mat = *source;
|
||||
auto& offset_vec = mat.page_.offset.HostVector();
|
||||
auto& data_vec = mat.page_.data.HostVector();
|
||||
common::ParallelGroupBuilder<
|
||||
Entry, std::remove_reference<decltype(offset_vec)>::type::value_type>
|
||||
builder(&offset_vec, &data_vec);
|
||||
builder.InitBudget(0, nthread);
|
||||
size_t ncol = nindptr - 1; // NOLINT(*)
|
||||
#pragma omp parallel for schedule(static)
|
||||
for (omp_ulong i = 0; i < static_cast<omp_ulong>(ncol); ++i) { // NOLINT(*)
|
||||
int tid = omp_get_thread_num();
|
||||
for (size_t j = col_ptr[i]; j < col_ptr[i+1]; ++j) {
|
||||
if (!common::CheckNAN(data[j])) {
|
||||
builder.AddBudget(indices[j], tid);
|
||||
}
|
||||
}
|
||||
}
|
||||
builder.InitStorage();
|
||||
#pragma omp parallel for schedule(static)
|
||||
for (omp_ulong i = 0; i < static_cast<omp_ulong>(ncol); ++i) { // NOLINT(*)
|
||||
int tid = omp_get_thread_num();
|
||||
for (size_t j = col_ptr[i]; j < col_ptr[i+1]; ++j) {
|
||||
if (!common::CheckNAN(data[j])) {
|
||||
builder.Push(indices[j],
|
||||
Entry(static_cast<bst_uint>(i), data[j]),
|
||||
tid);
|
||||
}
|
||||
}
|
||||
}
|
||||
mat.info.num_row_ = mat.page_.offset.Size() - 1;
|
||||
if (num_row > 0) {
|
||||
CHECK_LE(mat.info.num_row_, num_row);
|
||||
// provision for empty rows at the bottom of matrix
|
||||
auto& offset_vec = mat.page_.offset.HostVector();
|
||||
for (uint64_t i = mat.info.num_row_; i < static_cast<uint64_t>(num_row); ++i) {
|
||||
offset_vec.push_back(offset_vec.back());
|
||||
}
|
||||
mat.info.num_row_ = num_row;
|
||||
CHECK_EQ(mat.info.num_row_, offset_vec.size() - 1); // sanity check
|
||||
}
|
||||
mat.info.num_col_ = ncol;
|
||||
mat.info.num_nonzero_ = nelem;
|
||||
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(std::move(source)));
|
||||
data::CSCAdapter adapter(col_ptr, indices, data, nindptr - 1, num_row);
|
||||
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(&adapter, std::nan(""), 1));
|
||||
API_END();
|
||||
}
|
||||
|
||||
XGB_DLL int XGDMatrixCreateFromMat(const bst_float* data,
|
||||
xgboost::bst_ulong nrow,
|
||||
xgboost::bst_ulong ncol,
|
||||
bst_float missing,
|
||||
xgboost::bst_ulong ncol, bst_float missing,
|
||||
DMatrixHandle* out) {
|
||||
std::unique_ptr<data::SimpleCSRSource> source(new data::SimpleCSRSource());
|
||||
|
||||
API_BEGIN();
|
||||
data::SimpleCSRSource& mat = *source;
|
||||
auto& offset_vec = mat.page_.offset.HostVector();
|
||||
auto& data_vec = mat.page_.data.HostVector();
|
||||
offset_vec.resize(1+nrow);
|
||||
bool nan_missing = common::CheckNAN(missing);
|
||||
mat.info.num_row_ = nrow;
|
||||
mat.info.num_col_ = ncol;
|
||||
const bst_float* data0 = data;
|
||||
|
||||
// count elements for sizing data
|
||||
data = data0;
|
||||
for (xgboost::bst_ulong i = 0; i < nrow; ++i, data += ncol) {
|
||||
xgboost::bst_ulong nelem = 0;
|
||||
for (xgboost::bst_ulong j = 0; j < ncol; ++j) {
|
||||
if (common::CheckNAN(data[j])) {
|
||||
CHECK(nan_missing)
|
||||
<< "There are NAN in the matrix, however, you did not set missing=NAN";
|
||||
} else {
|
||||
if (nan_missing || data[j] != missing) {
|
||||
++nelem;
|
||||
}
|
||||
}
|
||||
}
|
||||
offset_vec[i+1] = offset_vec[i] + nelem;
|
||||
}
|
||||
data_vec.resize(mat.page_.data.Size() + offset_vec.back());
|
||||
|
||||
data = data0;
|
||||
for (xgboost::bst_ulong i = 0; i < nrow; ++i, data += ncol) {
|
||||
xgboost::bst_ulong matj = 0;
|
||||
for (xgboost::bst_ulong j = 0; j < ncol; ++j) {
|
||||
if (common::CheckNAN(data[j])) {
|
||||
} else {
|
||||
if (nan_missing || data[j] != missing) {
|
||||
data_vec[offset_vec[i] + matj] = Entry(j, data[j]);
|
||||
++matj;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
mat.info.num_nonzero_ = mat.page_.data.Size();
|
||||
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(std::move(source)));
|
||||
data::DenseAdapter adapter(data, nrow, nrow * ncol, ncol);
|
||||
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(&adapter, missing, 1));
|
||||
API_END();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
void PrefixSum(T *x, size_t N) {
|
||||
std::vector<T> suma;
|
||||
#pragma omp parallel
|
||||
{
|
||||
const int ithread = omp_get_thread_num();
|
||||
const int nthreads = omp_get_num_threads();
|
||||
#pragma omp single
|
||||
{
|
||||
suma.resize(nthreads+1);
|
||||
suma[0] = 0;
|
||||
}
|
||||
T sum = 0;
|
||||
T offset = 0;
|
||||
#pragma omp for schedule(static)
|
||||
for (omp_ulong i = 0; i < N; i++) {
|
||||
sum += x[i];
|
||||
x[i] = sum;
|
||||
}
|
||||
suma[ithread+1] = sum;
|
||||
#pragma omp barrier
|
||||
for (omp_ulong i = 0; i < static_cast<omp_ulong>(ithread+1); i++) {
|
||||
offset += suma[i];
|
||||
}
|
||||
#pragma omp for schedule(static)
|
||||
for (omp_ulong i = 0; i < N; i++) {
|
||||
x[i] += offset;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
XGB_DLL int XGDMatrixCreateFromMat_omp(const bst_float* data, // NOLINT
|
||||
xgboost::bst_ulong nrow,
|
||||
xgboost::bst_ulong ncol,
|
||||
bst_float missing, DMatrixHandle* out,
|
||||
int nthread) {
|
||||
// avoid openmp unless enough data to be worth it to avoid overhead costs
|
||||
if (nrow*ncol <= 10000*50) {
|
||||
return(XGDMatrixCreateFromMat(data, nrow, ncol, missing, out));
|
||||
}
|
||||
|
||||
API_BEGIN();
|
||||
const int nthreadmax = std::max(omp_get_num_procs() / 2 - 1, 1);
|
||||
// const int nthreadmax = omp_get_max_threads();
|
||||
if (nthread <= 0) nthread=nthreadmax;
|
||||
int nthread_orig = omp_get_max_threads();
|
||||
omp_set_num_threads(nthread);
|
||||
|
||||
std::unique_ptr<data::SimpleCSRSource> source(new data::SimpleCSRSource());
|
||||
data::SimpleCSRSource& mat = *source;
|
||||
auto& offset_vec = mat.page_.offset.HostVector();
|
||||
auto& data_vec = mat.page_.data.HostVector();
|
||||
offset_vec.resize(1+nrow);
|
||||
mat.info.num_row_ = nrow;
|
||||
mat.info.num_col_ = ncol;
|
||||
|
||||
// Check for errors in missing elements
|
||||
// Count elements per row (to avoid otherwise need to copy)
|
||||
bool nan_missing = common::CheckNAN(missing);
|
||||
std::vector<int> badnan;
|
||||
badnan.resize(nthread, 0);
|
||||
|
||||
#pragma omp parallel num_threads(nthread)
|
||||
{
|
||||
int ithread = omp_get_thread_num();
|
||||
|
||||
// Count elements per row
|
||||
#pragma omp for schedule(static)
|
||||
for (omp_ulong i = 0; i < nrow; ++i) {
|
||||
xgboost::bst_ulong nelem = 0;
|
||||
for (xgboost::bst_ulong j = 0; j < ncol; ++j) {
|
||||
if (common::CheckNAN(data[ncol*i + j]) && !nan_missing) {
|
||||
badnan[ithread] = 1;
|
||||
} else if (common::CheckNAN(data[ncol * i + j])) {
|
||||
} else if (nan_missing || data[ncol * i + j] != missing) {
|
||||
++nelem;
|
||||
}
|
||||
}
|
||||
offset_vec[i+1] = nelem;
|
||||
}
|
||||
}
|
||||
// Inform about any NaNs and resize data matrix
|
||||
for (int i = 0; i < nthread; i++) {
|
||||
CHECK(!badnan[i]) << "There are NAN in the matrix, however, you did not set missing=NAN";
|
||||
}
|
||||
|
||||
// do cumulative sum (to avoid otherwise need to copy)
|
||||
PrefixSum(&offset_vec[0], offset_vec.size());
|
||||
data_vec.resize(mat.page_.data.Size() + offset_vec.back());
|
||||
|
||||
// Fill data matrix (now that know size, no need for slow push_back())
|
||||
#pragma omp parallel num_threads(nthread)
|
||||
{
|
||||
#pragma omp for schedule(static)
|
||||
for (omp_ulong i = 0; i < nrow; ++i) {
|
||||
xgboost::bst_ulong matj = 0;
|
||||
for (xgboost::bst_ulong j = 0; j < ncol; ++j) {
|
||||
if (common::CheckNAN(data[ncol * i + j])) {
|
||||
} else if (nan_missing || data[ncol * i + j] != missing) {
|
||||
data_vec[offset_vec[i] + matj] =
|
||||
Entry(j, data[ncol * i + j]);
|
||||
++matj;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// restore omp state
|
||||
omp_set_num_threads(nthread_orig);
|
||||
|
||||
mat.info.num_nonzero_ = mat.page_.data.Size();
|
||||
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(std::move(source)));
|
||||
data::DenseAdapter adapter(data, nrow, nrow * ncol, ncol);
|
||||
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(&adapter, missing, nthread));
|
||||
API_END();
|
||||
}
|
||||
|
||||
enum class DTType : uint8_t {
|
||||
kFloat32 = 0,
|
||||
kFloat64 = 1,
|
||||
kBool8 = 2,
|
||||
kInt32 = 3,
|
||||
kInt8 = 4,
|
||||
kInt16 = 5,
|
||||
kInt64 = 6,
|
||||
kUnknown = 7
|
||||
};
|
||||
|
||||
DTType DTGetType(std::string type_string) {
|
||||
if (type_string == "float32") {
|
||||
return DTType::kFloat32;
|
||||
} else if (type_string == "float64") {
|
||||
return DTType::kFloat64;
|
||||
} else if (type_string == "bool8") {
|
||||
return DTType::kBool8;
|
||||
} else if (type_string == "int32") {
|
||||
return DTType::kInt32;
|
||||
} else if (type_string == "int8") {
|
||||
return DTType::kInt8;
|
||||
} else if (type_string == "int16") {
|
||||
return DTType::kInt16;
|
||||
} else if (type_string == "int64") {
|
||||
return DTType::kInt64;
|
||||
} else {
|
||||
LOG(FATAL) << "Unknown data table type.";
|
||||
return DTType::kUnknown;
|
||||
}
|
||||
}
|
||||
|
||||
float DTGetValue(void* column, DTType dt_type, size_t ridx) {
|
||||
float missing = std::numeric_limits<float>::quiet_NaN();
|
||||
switch (dt_type) {
|
||||
case DTType::kFloat32: {
|
||||
float val = reinterpret_cast<float*>(column)[ridx];
|
||||
return std::isfinite(val) ? val : missing;
|
||||
}
|
||||
case DTType::kFloat64: {
|
||||
double val = reinterpret_cast<double*>(column)[ridx];
|
||||
return std::isfinite(val) ? static_cast<float>(val) : missing;
|
||||
}
|
||||
case DTType::kBool8: {
|
||||
bool val = reinterpret_cast<bool*>(column)[ridx];
|
||||
return static_cast<float>(val);
|
||||
}
|
||||
case DTType::kInt32: {
|
||||
int32_t val = reinterpret_cast<int32_t*>(column)[ridx];
|
||||
return val != (-2147483647 - 1) ? static_cast<float>(val) : missing;
|
||||
}
|
||||
case DTType::kInt8: {
|
||||
int8_t val = reinterpret_cast<int8_t*>(column)[ridx];
|
||||
return val != -128 ? static_cast<float>(val) : missing;
|
||||
}
|
||||
case DTType::kInt16: {
|
||||
int16_t val = reinterpret_cast<int16_t*>(column)[ridx];
|
||||
return val != -32768 ? static_cast<float>(val) : missing;
|
||||
}
|
||||
case DTType::kInt64: {
|
||||
int64_t val = reinterpret_cast<int64_t*>(column)[ridx];
|
||||
return val != -9223372036854775807 - 1 ? static_cast<float>(val)
|
||||
: missing;
|
||||
}
|
||||
default: {
|
||||
LOG(FATAL) << "Unknown data table type.";
|
||||
return 0.0f;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
XGB_DLL int XGDMatrixCreateFromDT(void** data, const char** feature_stypes,
|
||||
xgboost::bst_ulong nrow,
|
||||
xgboost::bst_ulong ncol, DMatrixHandle* out,
|
||||
int nthread) {
|
||||
// avoid openmp unless enough data to be worth it to avoid overhead costs
|
||||
if (nrow * ncol <= 10000 * 50) {
|
||||
nthread = 1;
|
||||
}
|
||||
|
||||
API_BEGIN();
|
||||
const int nthreadmax = std::max(omp_get_num_procs() / 2 - 1, 1);
|
||||
if (nthread <= 0) nthread = nthreadmax;
|
||||
int nthread_orig = omp_get_max_threads();
|
||||
omp_set_num_threads(nthread);
|
||||
|
||||
std::unique_ptr<data::SimpleCSRSource> source(new data::SimpleCSRSource());
|
||||
data::SimpleCSRSource& mat = *source;
|
||||
mat.page_.offset.Resize(1 + nrow);
|
||||
mat.info.num_row_ = nrow;
|
||||
mat.info.num_col_ = ncol;
|
||||
|
||||
auto& page_offset = mat.page_.offset.HostVector();
|
||||
#pragma omp parallel num_threads(nthread)
|
||||
{
|
||||
// Count elements per row, column by column
|
||||
for (auto j = 0u; j < ncol; ++j) {
|
||||
DTType dtype = DTGetType(feature_stypes[j]);
|
||||
#pragma omp for schedule(static)
|
||||
for (omp_ulong i = 0; i < nrow; ++i) {
|
||||
float val = DTGetValue(data[j], dtype, i);
|
||||
if (!std::isnan(val)) {
|
||||
page_offset[i + 1]++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// do cumulative sum (to avoid otherwise need to copy)
|
||||
PrefixSum(&page_offset[0], page_offset.size());
|
||||
|
||||
mat.page_.data.Resize(mat.page_.data.Size() + page_offset.back());
|
||||
|
||||
auto& page_data = mat.page_.data.HostVector();
|
||||
|
||||
// Fill data matrix (now that know size, no need for slow push_back())
|
||||
std::vector<size_t> position(nrow);
|
||||
#pragma omp parallel num_threads(nthread)
|
||||
{
|
||||
for (xgboost::bst_ulong j = 0; j < ncol; ++j) {
|
||||
DTType dtype = DTGetType(feature_stypes[j]);
|
||||
#pragma omp for schedule(static)
|
||||
for (omp_ulong i = 0; i < nrow; ++i) {
|
||||
float val = DTGetValue(data[j], dtype, i);
|
||||
if (!std::isnan(val)) {
|
||||
page_data[page_offset[i] + position[i]] = Entry(j, val);
|
||||
position[i]++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// restore omp state
|
||||
omp_set_num_threads(nthread_orig);
|
||||
|
||||
mat.info.num_nonzero_ = mat.page_.data.Size();
|
||||
*out = new std::shared_ptr<DMatrix>(DMatrix::Create(std::move(source)));
|
||||
data::DataTableAdapter adapter(data, feature_stypes, nrow, ncol);
|
||||
*out = new std::shared_ptr<DMatrix>(
|
||||
DMatrix::Create(&adapter, std::nan(""), nthread));
|
||||
API_END();
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user