From e3c34c79be05f7833f8fb0228f28745d224d61bc Mon Sep 17 00:00:00 2001 From: Rory Mitchell Date: Wed, 4 Dec 2019 10:56:17 +1300 Subject: [PATCH] External data adapters (#5044) * Use external data adapters as lightweight intermediate layer between external data and DMatrix --- include/xgboost/data.h | 14 + src/c_api/c_api.cc | 375 +------------------- src/common/group_data.h | 15 +- src/data/adapter.h | 488 ++++++++++++++++++++++++++ src/data/data.cc | 30 +- src/data/simple_csr_source.cc | 64 ---- src/data/simple_csr_source.h | 7 +- src/data/simple_dmatrix.h | 120 ++++++- tests/cpp/common/test_group_data.cc | 54 +++ tests/cpp/data/test_adapter.cc | 104 ++++++ tests/cpp/data/test_data.cc | 1 - tests/cpp/data/test_simple_dmatrix.cc | 63 ++++ tests/python/test_basic.py | 112 ------ tests/python/test_dmatrix.py | 171 +++++++++ tests/python/test_sparse_dmatrix.py | 33 -- 15 files changed, 1058 insertions(+), 593 deletions(-) create mode 100644 src/data/adapter.h create mode 100644 tests/cpp/common/test_group_data.cc create mode 100644 tests/cpp/data/test_adapter.cc create mode 100644 tests/python/test_dmatrix.py delete mode 100644 tests/python/test_sparse_dmatrix.py diff --git a/include/xgboost/data.h b/include/xgboost/data.h index c663d8f3b..23318a685 100644 --- a/include/xgboost/data.h +++ b/include/xgboost/data.h @@ -465,6 +465,20 @@ class DMatrix { */ static DMatrix* Create(std::unique_ptr>&& source, const std::string& cache_prefix = ""); + + /** + * \brief Creates a new DMatrix from an external data adapter. + * + * \tparam AdapterT Type of the adapter. + * \param adapter View onto an external data. + * \param missing Values to count as missing. + * \param nthread Number of threads for construction. + * + * \return a Created DMatrix. + */ + template + static DMatrix* Create(AdapterT* adapter, float missing, int nthread); + /*! * \brief Create a DMatrix by loading data from parser. * Parser can later be deleted after the DMatrix i created. diff --git a/src/c_api/c_api.cc b/src/c_api/c_api.cc index 1f8965320..51da22bee 100644 --- a/src/c_api/c_api.cc +++ b/src/c_api/c_api.cc @@ -18,9 +18,8 @@ #include "c_api_error.h" #include "../data/simple_csr_source.h" -#include "../common/math.h" #include "../common/io.h" -#include "../common/group_data.h" +#include "../data/adapter.h" namespace xgboost { @@ -218,37 +217,9 @@ XGB_DLL int XGDMatrixCreateFromCSREx(const size_t* indptr, size_t nelem, size_t num_col, DMatrixHandle* out) { - std::unique_ptr source(new data::SimpleCSRSource()); - API_BEGIN(); - data::SimpleCSRSource& mat = *source; - auto& offset_vec = mat.page_.offset.HostVector(); - auto& data_vec = mat.page_.data.HostVector(); - offset_vec.reserve(nindptr); - data_vec.reserve(nelem); - offset_vec.resize(1); - offset_vec[0] = 0; - size_t num_column = 0; - for (size_t i = 1; i < nindptr; ++i) { - for (size_t j = indptr[i - 1]; j < indptr[i]; ++j) { - if (!common::CheckNAN(data[j])) { - // automatically skip nan. - data_vec.emplace_back(Entry(indices[j], data[j])); - num_column = std::max(num_column, static_cast(indices[j] + 1)); - } - } - offset_vec.push_back(mat.page_.data.Size()); - } - - mat.info.num_col_ = num_column; - if (num_col > 0) { - CHECK_LE(mat.info.num_col_, num_col) - << "num_col=" << num_col << " vs " << mat.info.num_col_; - mat.info.num_col_ = num_col; - } - mat.info.num_row_ = nindptr - 1; - mat.info.num_nonzero_ = mat.page_.data.Size(); - *out = new std::shared_ptr(DMatrix::Create(std::move(source))); + data::CSRAdapter adapter(indptr, indices, data, nindptr - 1, nelem, num_col); + *out = new std::shared_ptr(DMatrix::Create(&adapter, std::nan(""), 1)); API_END(); } @@ -259,361 +230,41 @@ XGB_DLL int XGDMatrixCreateFromCSCEx(const size_t* col_ptr, size_t nelem, size_t num_row, DMatrixHandle* out) { - std::unique_ptr source(new data::SimpleCSRSource()); - API_BEGIN(); - // FIXME: User should be able to control number of threads - const int nthread = omp_get_max_threads(); - data::SimpleCSRSource& mat = *source; - auto& offset_vec = mat.page_.offset.HostVector(); - auto& data_vec = mat.page_.data.HostVector(); - common::ParallelGroupBuilder< - Entry, std::remove_reference::type::value_type> - builder(&offset_vec, &data_vec); - builder.InitBudget(0, nthread); - size_t ncol = nindptr - 1; // NOLINT(*) - #pragma omp parallel for schedule(static) - for (omp_ulong i = 0; i < static_cast(ncol); ++i) { // NOLINT(*) - int tid = omp_get_thread_num(); - for (size_t j = col_ptr[i]; j < col_ptr[i+1]; ++j) { - if (!common::CheckNAN(data[j])) { - builder.AddBudget(indices[j], tid); - } - } - } - builder.InitStorage(); - #pragma omp parallel for schedule(static) - for (omp_ulong i = 0; i < static_cast(ncol); ++i) { // NOLINT(*) - int tid = omp_get_thread_num(); - for (size_t j = col_ptr[i]; j < col_ptr[i+1]; ++j) { - if (!common::CheckNAN(data[j])) { - builder.Push(indices[j], - Entry(static_cast(i), data[j]), - tid); - } - } - } - mat.info.num_row_ = mat.page_.offset.Size() - 1; - if (num_row > 0) { - CHECK_LE(mat.info.num_row_, num_row); - // provision for empty rows at the bottom of matrix - auto& offset_vec = mat.page_.offset.HostVector(); - for (uint64_t i = mat.info.num_row_; i < static_cast(num_row); ++i) { - offset_vec.push_back(offset_vec.back()); - } - mat.info.num_row_ = num_row; - CHECK_EQ(mat.info.num_row_, offset_vec.size() - 1); // sanity check - } - mat.info.num_col_ = ncol; - mat.info.num_nonzero_ = nelem; - *out = new std::shared_ptr(DMatrix::Create(std::move(source))); + data::CSCAdapter adapter(col_ptr, indices, data, nindptr - 1, num_row); + *out = new std::shared_ptr(DMatrix::Create(&adapter, std::nan(""), 1)); API_END(); } XGB_DLL int XGDMatrixCreateFromMat(const bst_float* data, xgboost::bst_ulong nrow, - xgboost::bst_ulong ncol, - bst_float missing, + xgboost::bst_ulong ncol, bst_float missing, DMatrixHandle* out) { - std::unique_ptr source(new data::SimpleCSRSource()); - API_BEGIN(); - data::SimpleCSRSource& mat = *source; - auto& offset_vec = mat.page_.offset.HostVector(); - auto& data_vec = mat.page_.data.HostVector(); - offset_vec.resize(1+nrow); - bool nan_missing = common::CheckNAN(missing); - mat.info.num_row_ = nrow; - mat.info.num_col_ = ncol; - const bst_float* data0 = data; - - // count elements for sizing data - data = data0; - for (xgboost::bst_ulong i = 0; i < nrow; ++i, data += ncol) { - xgboost::bst_ulong nelem = 0; - for (xgboost::bst_ulong j = 0; j < ncol; ++j) { - if (common::CheckNAN(data[j])) { - CHECK(nan_missing) - << "There are NAN in the matrix, however, you did not set missing=NAN"; - } else { - if (nan_missing || data[j] != missing) { - ++nelem; - } - } - } - offset_vec[i+1] = offset_vec[i] + nelem; - } - data_vec.resize(mat.page_.data.Size() + offset_vec.back()); - - data = data0; - for (xgboost::bst_ulong i = 0; i < nrow; ++i, data += ncol) { - xgboost::bst_ulong matj = 0; - for (xgboost::bst_ulong j = 0; j < ncol; ++j) { - if (common::CheckNAN(data[j])) { - } else { - if (nan_missing || data[j] != missing) { - data_vec[offset_vec[i] + matj] = Entry(j, data[j]); - ++matj; - } - } - } - } - - mat.info.num_nonzero_ = mat.page_.data.Size(); - *out = new std::shared_ptr(DMatrix::Create(std::move(source))); + data::DenseAdapter adapter(data, nrow, nrow * ncol, ncol); + *out = new std::shared_ptr(DMatrix::Create(&adapter, missing, 1)); API_END(); } -template -void PrefixSum(T *x, size_t N) { - std::vector suma; -#pragma omp parallel - { - const int ithread = omp_get_thread_num(); - const int nthreads = omp_get_num_threads(); -#pragma omp single - { - suma.resize(nthreads+1); - suma[0] = 0; - } - T sum = 0; - T offset = 0; -#pragma omp for schedule(static) - for (omp_ulong i = 0; i < N; i++) { - sum += x[i]; - x[i] = sum; - } - suma[ithread+1] = sum; -#pragma omp barrier - for (omp_ulong i = 0; i < static_cast(ithread+1); i++) { - offset += suma[i]; - } -#pragma omp for schedule(static) - for (omp_ulong i = 0; i < N; i++) { - x[i] += offset; - } - } -} - XGB_DLL int XGDMatrixCreateFromMat_omp(const bst_float* data, // NOLINT xgboost::bst_ulong nrow, xgboost::bst_ulong ncol, bst_float missing, DMatrixHandle* out, int nthread) { - // avoid openmp unless enough data to be worth it to avoid overhead costs - if (nrow*ncol <= 10000*50) { - return(XGDMatrixCreateFromMat(data, nrow, ncol, missing, out)); - } - API_BEGIN(); - const int nthreadmax = std::max(omp_get_num_procs() / 2 - 1, 1); - // const int nthreadmax = omp_get_max_threads(); - if (nthread <= 0) nthread=nthreadmax; - int nthread_orig = omp_get_max_threads(); - omp_set_num_threads(nthread); - - std::unique_ptr source(new data::SimpleCSRSource()); - data::SimpleCSRSource& mat = *source; - auto& offset_vec = mat.page_.offset.HostVector(); - auto& data_vec = mat.page_.data.HostVector(); - offset_vec.resize(1+nrow); - mat.info.num_row_ = nrow; - mat.info.num_col_ = ncol; - - // Check for errors in missing elements - // Count elements per row (to avoid otherwise need to copy) - bool nan_missing = common::CheckNAN(missing); - std::vector badnan; - badnan.resize(nthread, 0); - -#pragma omp parallel num_threads(nthread) - { - int ithread = omp_get_thread_num(); - - // Count elements per row -#pragma omp for schedule(static) - for (omp_ulong i = 0; i < nrow; ++i) { - xgboost::bst_ulong nelem = 0; - for (xgboost::bst_ulong j = 0; j < ncol; ++j) { - if (common::CheckNAN(data[ncol*i + j]) && !nan_missing) { - badnan[ithread] = 1; - } else if (common::CheckNAN(data[ncol * i + j])) { - } else if (nan_missing || data[ncol * i + j] != missing) { - ++nelem; - } - } - offset_vec[i+1] = nelem; - } - } - // Inform about any NaNs and resize data matrix - for (int i = 0; i < nthread; i++) { - CHECK(!badnan[i]) << "There are NAN in the matrix, however, you did not set missing=NAN"; - } - - // do cumulative sum (to avoid otherwise need to copy) - PrefixSum(&offset_vec[0], offset_vec.size()); - data_vec.resize(mat.page_.data.Size() + offset_vec.back()); - - // Fill data matrix (now that know size, no need for slow push_back()) -#pragma omp parallel num_threads(nthread) - { -#pragma omp for schedule(static) - for (omp_ulong i = 0; i < nrow; ++i) { - xgboost::bst_ulong matj = 0; - for (xgboost::bst_ulong j = 0; j < ncol; ++j) { - if (common::CheckNAN(data[ncol * i + j])) { - } else if (nan_missing || data[ncol * i + j] != missing) { - data_vec[offset_vec[i] + matj] = - Entry(j, data[ncol * i + j]); - ++matj; - } - } - } - } - // restore omp state - omp_set_num_threads(nthread_orig); - - mat.info.num_nonzero_ = mat.page_.data.Size(); - *out = new std::shared_ptr(DMatrix::Create(std::move(source))); + data::DenseAdapter adapter(data, nrow, nrow * ncol, ncol); + *out = new std::shared_ptr(DMatrix::Create(&adapter, missing, nthread)); API_END(); } -enum class DTType : uint8_t { - kFloat32 = 0, - kFloat64 = 1, - kBool8 = 2, - kInt32 = 3, - kInt8 = 4, - kInt16 = 5, - kInt64 = 6, - kUnknown = 7 -}; - -DTType DTGetType(std::string type_string) { - if (type_string == "float32") { - return DTType::kFloat32; - } else if (type_string == "float64") { - return DTType::kFloat64; - } else if (type_string == "bool8") { - return DTType::kBool8; - } else if (type_string == "int32") { - return DTType::kInt32; - } else if (type_string == "int8") { - return DTType::kInt8; - } else if (type_string == "int16") { - return DTType::kInt16; - } else if (type_string == "int64") { - return DTType::kInt64; - } else { - LOG(FATAL) << "Unknown data table type."; - return DTType::kUnknown; - } -} - -float DTGetValue(void* column, DTType dt_type, size_t ridx) { - float missing = std::numeric_limits::quiet_NaN(); - switch (dt_type) { - case DTType::kFloat32: { - float val = reinterpret_cast(column)[ridx]; - return std::isfinite(val) ? val : missing; - } - case DTType::kFloat64: { - double val = reinterpret_cast(column)[ridx]; - return std::isfinite(val) ? static_cast(val) : missing; - } - case DTType::kBool8: { - bool val = reinterpret_cast(column)[ridx]; - return static_cast(val); - } - case DTType::kInt32: { - int32_t val = reinterpret_cast(column)[ridx]; - return val != (-2147483647 - 1) ? static_cast(val) : missing; - } - case DTType::kInt8: { - int8_t val = reinterpret_cast(column)[ridx]; - return val != -128 ? static_cast(val) : missing; - } - case DTType::kInt16: { - int16_t val = reinterpret_cast(column)[ridx]; - return val != -32768 ? static_cast(val) : missing; - } - case DTType::kInt64: { - int64_t val = reinterpret_cast(column)[ridx]; - return val != -9223372036854775807 - 1 ? static_cast(val) - : missing; - } - default: { - LOG(FATAL) << "Unknown data table type."; - return 0.0f; - } - } -} - XGB_DLL int XGDMatrixCreateFromDT(void** data, const char** feature_stypes, xgboost::bst_ulong nrow, xgboost::bst_ulong ncol, DMatrixHandle* out, int nthread) { - // avoid openmp unless enough data to be worth it to avoid overhead costs - if (nrow * ncol <= 10000 * 50) { - nthread = 1; - } - API_BEGIN(); - const int nthreadmax = std::max(omp_get_num_procs() / 2 - 1, 1); - if (nthread <= 0) nthread = nthreadmax; - int nthread_orig = omp_get_max_threads(); - omp_set_num_threads(nthread); - - std::unique_ptr source(new data::SimpleCSRSource()); - data::SimpleCSRSource& mat = *source; - mat.page_.offset.Resize(1 + nrow); - mat.info.num_row_ = nrow; - mat.info.num_col_ = ncol; - - auto& page_offset = mat.page_.offset.HostVector(); -#pragma omp parallel num_threads(nthread) - { - // Count elements per row, column by column - for (auto j = 0u; j < ncol; ++j) { - DTType dtype = DTGetType(feature_stypes[j]); -#pragma omp for schedule(static) - for (omp_ulong i = 0; i < nrow; ++i) { - float val = DTGetValue(data[j], dtype, i); - if (!std::isnan(val)) { - page_offset[i + 1]++; - } - } - } - } - // do cumulative sum (to avoid otherwise need to copy) - PrefixSum(&page_offset[0], page_offset.size()); - - mat.page_.data.Resize(mat.page_.data.Size() + page_offset.back()); - - auto& page_data = mat.page_.data.HostVector(); - - // Fill data matrix (now that know size, no need for slow push_back()) - std::vector position(nrow); -#pragma omp parallel num_threads(nthread) - { - for (xgboost::bst_ulong j = 0; j < ncol; ++j) { - DTType dtype = DTGetType(feature_stypes[j]); -#pragma omp for schedule(static) - for (omp_ulong i = 0; i < nrow; ++i) { - float val = DTGetValue(data[j], dtype, i); - if (!std::isnan(val)) { - page_data[page_offset[i] + position[i]] = Entry(j, val); - position[i]++; - } - } - } - } - - // restore omp state - omp_set_num_threads(nthread_orig); - - mat.info.num_nonzero_ = mat.page_.data.Size(); - *out = new std::shared_ptr(DMatrix::Create(std::move(source))); + data::DataTableAdapter adapter(data, feature_stypes, nrow, ncol); + *out = new std::shared_ptr( + DMatrix::Create(&adapter, std::nan(""), nthread)); API_END(); } diff --git a/src/common/group_data.h b/src/common/group_data.h index 81a2d999e..0b7af3b2c 100644 --- a/src/common/group_data.h +++ b/src/common/group_data.h @@ -69,25 +69,26 @@ struct ParallelGroupBuilder { /*! \brief step 3: initialize the necessary storage */ inline void InitStorage() { // set rptr to correct size + SizeType rptr_fill_value = rptr_.empty() ? 0 : rptr_.back(); for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) { if (rptr_.size() <= thread_rptr_[tid].size()) { - rptr_.resize(thread_rptr_[tid].size() + 1); // key + 1 + rptr_.resize(thread_rptr_[tid].size() + 1, rptr_fill_value); // key + 1 } } // initialize rptr to be beginning of each segment - std::size_t start = 0; + std::size_t count = 0; for (std::size_t i = 0; i + 1 < rptr_.size(); ++i) { for (std::size_t tid = 0; tid < thread_rptr_.size(); ++tid) { std::vector &trptr = thread_rptr_[tid]; if (i < trptr.size()) { // i^th row is assigned for this thread - std::size_t ncnt = trptr[i]; // how many entries in this row - trptr[i] = start; - start += ncnt; + std::size_t thread_count = trptr[i]; // how many entries in this row + trptr[i] = count + rptr_.back(); + count += thread_count; } } - rptr_[i + 1] = start; // pointer accumulated from all thread + rptr_[i + 1] += count; // pointer accumulated from all thread } - data_.resize(start); + data_.resize(rptr_.back()); } /*! * \brief step 4: add data to the allocated space, diff --git a/src/data/adapter.h b/src/data/adapter.h new file mode 100644 index 000000000..df94e070e --- /dev/null +++ b/src/data/adapter.h @@ -0,0 +1,488 @@ +/*! + * Copyright (c) 2019 by Contributors + * \file adapter.h + */ +#ifndef XGBOOST_DATA_ADAPTER_H_ +#define XGBOOST_DATA_ADAPTER_H_ +#include +#include +#include +namespace xgboost { +namespace data { + +/** External data formats should implement an adapter as below. The + * adapter provides a uniform access to data outside xgboost, allowing + * construction of DMatrix objects from a range of sources without duplicating + * code. + * + * The adapter object is an iterator that returns batches of data. Each batch + * contains a number of "lines". A line represents a set of elements from a + * sparse input matrix, normally a row in the case of a CSR matrix or a column + * for a CSC matrix. Typically in sparse matrix formats we can efficiently + * access subsets of elements at a time, but cannot efficiently lookups elements + * by random access, hence the "line" abstraction, allowing the sparse matrix to + * return subsets of elements efficiently. Individual elements are described by + * a COO tuple (row index, column index, value). + * + * This abstraction allows us to read through different sparse matrix formats + * using the same interface. In particular we can write a DMatrix constructor + * that uses the same code to construct itself from a CSR matrix, CSC matrix, + * dense matrix, csv, libsvm file, or potentially other formats. To see why this + * is necessary, imagine we have 5 external matrix formats and 5 internal + * DMatrix types where each DMatrix needs a custom constructor for each possible + * input. The number of constructors is 5*5=25. Using an abstraction over the + * input data types the number of constructors is reduced to 5, as each DMatrix + * is oblivious to the external data format. Adding a new input source is simply + * a case of implementing an adapter. + * + * Most of the below adapters do not need more than one batch as the data + * originates from an in memory source. The file adapter does require batches to + * avoid loading the entire file in memory. + * + * An important detail is empty row/column handling. Files loaded from disk do + * not provide meta information about the number of rows/columns to expect, this + * needs to be inferred during construction. Other sparse formats may specify a + * number of rows/columns, but we can encounter entirely sparse rows or columns, + * leading to disagreement between the inferred number and the meta-info + * provided. To resolve this, adapters have methods specifying the number of + * rows/columns expected, these methods may return zero where these values must + * be inferred from data. A constructed DMatrix should agree with the input + * source on numbers of rows/columns, appending empty rows if necessary. + * */ + +/** \brief An adapter can return this value for number of rows or columns + * indicating that this value is currently unknown and should be inferred while + * passing over the data. */ +constexpr size_t kAdapterUnknownSize = std::numeric_limits::max(); + +struct COOTuple { + COOTuple(size_t row_idx, size_t column_idx, float value) + : row_idx(row_idx), column_idx(column_idx), value(value) {} + + size_t row_idx{0}; + size_t column_idx{0}; + float value{0}; +}; + +namespace detail { + +/** + * \brief Simplifies the use of DataIter when there is only one batch. + */ +template +class SingleBatchDataIter : dmlc::DataIter { + public: + void BeforeFirst() override { counter = 0; } + bool Next() override { + if (counter == 0) { + counter++; + return true; + } + return false; + } + + private: + int counter{0}; +}; + +/** \brief Indicates this data source cannot contain meta-info such as labels, + * weights or qid. */ +class NoMetaInfo { + public: + const float* Labels() const { return nullptr; } + const float* Weights() const { return nullptr; } + const uint64_t* Qid() const { return nullptr; } +}; + +}; // namespace detail + +class CSRAdapterBatch : public detail::NoMetaInfo { + public: + class Line { + public: + Line(size_t row_idx, size_t size, const unsigned* feature_idx, + const float* values) + : row_idx(row_idx), + size(size), + feature_idx(feature_idx), + values(values) {} + + size_t Size() const { return size; } + COOTuple GetElement(size_t idx) const { + return COOTuple(row_idx, feature_idx[idx], values[idx]); + } + + private: + size_t row_idx; + size_t size; + const unsigned* feature_idx; + const float* values; + }; + CSRAdapterBatch(const size_t* row_ptr, const unsigned* feature_idx, + const float* values, size_t num_rows, size_t num_elements, + size_t num_features) + : row_ptr(row_ptr), + feature_idx(feature_idx), + values(values), + num_rows(num_rows), + num_elements(num_elements), + num_features(num_features) {} + const Line GetLine(size_t idx) const { + size_t begin_offset = row_ptr[idx]; + size_t end_offset = row_ptr[idx + 1]; + return Line(idx, end_offset - begin_offset, &feature_idx[begin_offset], + &values[begin_offset]); + } + size_t Size() const { return num_rows; } + + private: + const size_t* row_ptr; + const unsigned* feature_idx; + const float* values; + size_t num_elements; + size_t num_rows; + size_t num_features; +}; + +class CSRAdapter : public detail::SingleBatchDataIter { + public: + CSRAdapter(const size_t* row_ptr, const unsigned* feature_idx, + const float* values, size_t num_rows, size_t num_elements, + size_t num_features) + : batch(row_ptr, feature_idx, values, num_rows, num_elements, + num_features), + num_rows(num_rows), + num_columns(num_features) {} + const CSRAdapterBatch& Value() const override { return batch; } + size_t NumRows() const { return num_rows; } + size_t NumColumns() const { return num_columns; } + + private: + CSRAdapterBatch batch; + size_t num_rows; + size_t num_columns; +}; + +class DenseAdapterBatch : public detail::NoMetaInfo { + public: + DenseAdapterBatch(const float* values, size_t num_rows, size_t num_elements, + size_t num_features) + : num_features(num_features), + num_rows(num_rows), + num_elements(num_elements), + values(values) {} + + private: + class Line { + public: + Line(const float* values, size_t size, size_t row_idx) + : row_idx(row_idx), size(size), values(values) {} + + size_t Size() const { return size; } + COOTuple GetElement(size_t idx) const { + return COOTuple(row_idx, idx, values[idx]); + } + + private: + size_t row_idx; + size_t size; + const float* values; + }; + + public: + size_t Size() const { return num_rows; } + const Line GetLine(size_t idx) const { + return Line(values + idx * num_features, num_features, idx); + } + + private: + const float* values; + size_t num_elements; + size_t num_rows; + size_t num_features; +}; + +class DenseAdapter : public detail::SingleBatchDataIter { + public: + DenseAdapter(const float* values, size_t num_rows, size_t num_elements, + size_t num_features) + : batch(values, num_rows, num_elements, num_features), + num_rows(num_rows), + num_columns(num_features) {} + const DenseAdapterBatch& Value() const override { return batch; } + + size_t NumRows() const { return num_rows; } + size_t NumColumns() const { return num_columns; } + + private: + DenseAdapterBatch batch; + size_t num_rows; + size_t num_columns; +}; + +class CSCAdapterBatch : public detail::NoMetaInfo { + public: + CSCAdapterBatch(const size_t* col_ptr, const unsigned* row_idx, + const float* values, size_t num_features) + : col_ptr(col_ptr), + row_idx(row_idx), + values(values), + num_features(num_features) {} + + private: + class Line { + public: + Line(size_t col_idx, size_t size, const unsigned* row_idx, + const float* values) + : col_idx(col_idx), size(size), row_idx(row_idx), values(values) {} + + size_t Size() const { return size; } + COOTuple GetElement(size_t idx) const { + return COOTuple(row_idx[idx], col_idx, values[idx]); + } + + private: + size_t col_idx; + size_t size; + const unsigned* row_idx; + const float* values; + }; + + public: + size_t Size() const { return num_features; } + const Line GetLine(size_t idx) const { + size_t begin_offset = col_ptr[idx]; + size_t end_offset = col_ptr[idx + 1]; + return Line(idx, end_offset - begin_offset, &row_idx[begin_offset], + &values[begin_offset]); + } + + private: + const size_t* col_ptr; + const unsigned* row_idx; + const float* values; + size_t num_features; +}; + +class CSCAdapter : public detail::SingleBatchDataIter { + public: + CSCAdapter(const size_t* col_ptr, const unsigned* row_idx, + const float* values, size_t num_features, size_t num_rows) + : batch(col_ptr, row_idx, values, num_features), + num_rows(num_rows), + num_columns(num_features) {} + const CSCAdapterBatch& Value() const override { return batch; } + + // JVM package sends 0 as unknown + size_t NumRows() const { + return num_rows == 0 ? kAdapterUnknownSize : num_rows; + } + size_t NumColumns() const { return num_columns; } + + private: + CSCAdapterBatch batch; + size_t num_rows; + size_t num_columns; +}; + +class DataTableAdapterBatch : public detail::NoMetaInfo { + public: + DataTableAdapterBatch(void** data, const char** feature_stypes, + size_t num_rows, size_t num_features) + : data(data), + feature_stypes(feature_stypes), + num_features(num_features), + num_rows(num_rows) {} + + private: + enum class DTType : uint8_t { + kFloat32 = 0, + kFloat64 = 1, + kBool8 = 2, + kInt32 = 3, + kInt8 = 4, + kInt16 = 5, + kInt64 = 6, + kUnknown = 7 + }; + + DTType DTGetType(std::string type_string) const { + if (type_string == "float32") { + return DTType::kFloat32; + } else if (type_string == "float64") { + return DTType::kFloat64; + } else if (type_string == "bool8") { + return DTType::kBool8; + } else if (type_string == "int32") { + return DTType::kInt32; + } else if (type_string == "int8") { + return DTType::kInt8; + } else if (type_string == "int16") { + return DTType::kInt16; + } else if (type_string == "int64") { + return DTType::kInt64; + } else { + LOG(FATAL) << "Unknown data table type."; + return DTType::kUnknown; + } + } + + class Line { + float DTGetValue(const void* column, DTType dt_type, size_t ridx) const { + float missing = std::numeric_limits::quiet_NaN(); + switch (dt_type) { + case DTType::kFloat32: { + float val = reinterpret_cast(column)[ridx]; + return std::isfinite(val) ? val : missing; + } + case DTType::kFloat64: { + double val = reinterpret_cast(column)[ridx]; + return std::isfinite(val) ? static_cast(val) : missing; + } + case DTType::kBool8: { + bool val = reinterpret_cast(column)[ridx]; + return static_cast(val); + } + case DTType::kInt32: { + int32_t val = reinterpret_cast(column)[ridx]; + return val != (-2147483647 - 1) ? static_cast(val) : missing; + } + case DTType::kInt8: { + int8_t val = reinterpret_cast(column)[ridx]; + return val != -128 ? static_cast(val) : missing; + } + case DTType::kInt16: { + int16_t val = reinterpret_cast(column)[ridx]; + return val != -32768 ? static_cast(val) : missing; + } + case DTType::kInt64: { + int64_t val = reinterpret_cast(column)[ridx]; + return val != -9223372036854775807 - 1 ? static_cast(val) + : missing; + } + default: { + LOG(FATAL) << "Unknown data table type."; + return 0.0f; + } + } + } + + public: + Line(DTType type, size_t size, size_t column_idx, const void* column) + : type(type), size(size), column_idx(column_idx), column(column) {} + + size_t Size() const { return size; } + COOTuple GetElement(size_t idx) const { + return COOTuple(idx, column_idx, DTGetValue(column, type, idx)); + } + + private: + DTType type; + size_t size; + size_t column_idx; + const void* column; + }; + + public: + size_t Size() const { return num_features; } + const Line GetLine(size_t idx) const { + return Line(DTGetType(feature_stypes[idx]), num_rows, idx, data[idx]); + } + + private: + void** data; + const char** feature_stypes; + size_t num_features; + size_t num_rows; +}; + +class DataTableAdapter + : public detail::SingleBatchDataIter { + public: + DataTableAdapter(void** data, const char** feature_stypes, size_t num_rows, + size_t num_features) + : batch(data, feature_stypes, num_rows, num_features), + num_rows(num_rows), + num_columns(num_features) {} + const DataTableAdapterBatch& Value() const override { return batch; } + size_t NumRows() const { return num_rows; } + size_t NumColumns() const { return num_columns; } + + private: + DataTableAdapterBatch batch; + size_t num_rows; + size_t num_columns; +}; + +class FileAdapterBatch { + public: + class Line { + public: + Line(size_t row_idx, const uint32_t* feature_idx, const float* value, + size_t size) + : row_idx(row_idx), + feature_idx(feature_idx), + value(value), + size(size) {} + + size_t Size() { return size; } + COOTuple GetElement(size_t idx) { + float fvalue = value == nullptr ? 1.0f : value[idx]; + return COOTuple(row_idx, feature_idx[idx], fvalue); + } + + private: + size_t row_idx; + const uint32_t* feature_idx; + const float* value; + size_t size; + }; + FileAdapterBatch(const dmlc::RowBlock* block, size_t row_offset) + : block(block), row_offset(row_offset) {} + Line GetLine(size_t idx) const { + auto begin = block->offset[idx]; + auto end = block->offset[idx + 1]; + return Line(idx + row_offset, &block->index[begin], &block->value[begin], + end - begin); + } + const float* Labels() const { return block->label; } + const float* Weights() const { return block->weight; } + const uint64_t* Qid() const { return block->qid; } + + size_t Size() const { return block->size; } + + private: + const dmlc::RowBlock* block; + size_t row_offset; +}; + +/** \brief FileAdapter wraps dmlc::parser to read files and provide access in a + * common interface. */ +class FileAdapter : dmlc::DataIter { + public: + explicit FileAdapter(dmlc::Parser* parser) : parser(parser) {} + + const FileAdapterBatch& Value() const override { return *batch.get(); } + void BeforeFirst() override { + batch.reset(); + parser->BeforeFirst(); + row_offset = 0; + } + bool Next() override { + bool next = parser->Next(); + batch.reset(new FileAdapterBatch(&parser->Value(), row_offset)); + row_offset += parser->Value().size; + return next; + } + // Indicates a number of rows/columns must be inferred + size_t NumRows() const { return kAdapterUnknownSize; } + size_t NumColumns() const { return kAdapterUnknownSize; } + + private: + size_t row_offset{0}; + std::unique_ptr batch; + dmlc::Parser* parser; +}; +}; // namespace data +} // namespace xgboost +#endif // XGBOOST_DATA_ADAPTER_H_ diff --git a/src/data/data.cc b/src/data/data.cc index 0bf181ca9..03d3887c8 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -15,6 +15,7 @@ #include "../common/io.h" #include "../common/version.h" #include "../common/group_data.h" +#include "../data/adapter.h" #if DMLC_ENABLE_STD_THREAD #include "./sparse_page_source.h" @@ -207,6 +208,7 @@ DMatrix* DMatrix::Load(const std::string& uri, LOG(CONSOLE) << "Load part of data " << partid << " of " << npart << " parts"; } + // legacy handling of binary data loading if (file_format == "auto" && npart == 1) { int magic; @@ -214,13 +216,13 @@ DMatrix* DMatrix::Load(const std::string& uri, if (fi != nullptr) { common::PeekableInStream is(fi.get()); if (is.PeekRead(&magic, sizeof(magic)) == sizeof(magic) && - magic == data::SimpleCSRSource::kMagic) { + magic == data::SimpleCSRSource::kMagic) { std::unique_ptr source(new data::SimpleCSRSource()); source->LoadBinary(&is); DMatrix* dmat = DMatrix::Create(std::move(source), cache_file); if (!silent) { LOG(CONSOLE) << dmat->Info().num_row_ << 'x' << dmat->Info().num_col_ << " matrix with " - << dmat->Info().num_nonzero_ << " entries loaded from " << uri; + << dmat->Info().num_nonzero_ << " entries loaded from " << uri; } return dmat; } @@ -291,9 +293,9 @@ DMatrix* DMatrix::Create(dmlc::Parser* parser, const std::string& cache_prefix, const size_t page_size) { if (cache_prefix.length() == 0) { - std::unique_ptr source(new data::SimpleCSRSource()); - source->CopyFrom(parser); - return DMatrix::Create(std::move(source), cache_prefix); + data::FileAdapter adapter(parser); + return DMatrix::Create(&adapter, std::numeric_limits::quiet_NaN(), + 1); } else { #if DMLC_ENABLE_STD_THREAD if (!data::SparsePageSource::CacheExist(cache_prefix, ".row.page")) { @@ -355,9 +357,23 @@ DMatrix* DMatrix::Create(std::unique_ptr>&& source, #endif // DMLC_ENABLE_STD_THREAD } } -} // namespace xgboost -namespace xgboost { +template +DMatrix* DMatrix::Create(AdapterT* adapter, float missing, int nthread) { + return new data::SimpleDMatrix(adapter, missing, nthread); +} + +template DMatrix* DMatrix::Create(data::DenseAdapter* adapter, + float missing, int nthread); +template DMatrix* DMatrix::Create(data::CSRAdapter* adapter, + float missing, int nthread); +template DMatrix* DMatrix::Create(data::CSCAdapter* adapter, + float missing, int nthread); +template DMatrix* DMatrix::Create( + data::DataTableAdapter* adapter, float missing, int nthread); +template DMatrix* DMatrix::Create(data::FileAdapter* adapter, + float missing, int nthread); + SparsePage SparsePage::GetTranspose(int num_columns) const { SparsePage transpose; common::ParallelGroupBuilder builder(&transpose.offset.HostVector(), diff --git a/src/data/simple_csr_source.cc b/src/data/simple_csr_source.cc index 2723e9a2f..91c7a1e38 100644 --- a/src/data/simple_csr_source.cc +++ b/src/data/simple_csr_source.cc @@ -6,7 +6,6 @@ #include #include -#include #include "simple_csr_source.h" #include "columnar.h" @@ -26,69 +25,6 @@ void SimpleCSRSource::CopyFrom(DMatrix* src) { } } -void SimpleCSRSource::CopyFrom(dmlc::Parser* parser) { - // use qid to get group info - const uint64_t default_max = std::numeric_limits::max(); - uint64_t last_group_id = default_max; - bst_uint group_size = 0; - std::vector qids; - this->Clear(); - while (parser->Next()) { - const dmlc::RowBlock& batch = parser->Value(); - if (batch.label != nullptr) { - auto& labels = info.labels_.HostVector(); - labels.insert(labels.end(), batch.label, batch.label + batch.size); - } - if (batch.weight != nullptr) { - auto& weights = info.weights_.HostVector(); - weights.insert(weights.end(), batch.weight, batch.weight + batch.size); - } - if (batch.qid != nullptr) { - qids.insert(qids.end(), batch.qid, batch.qid + batch.size); - // get group - for (size_t i = 0; i < batch.size; ++i) { - const uint64_t cur_group_id = batch.qid[i]; - if (last_group_id == default_max || last_group_id != cur_group_id) { - info.group_ptr_.push_back(group_size); - } - last_group_id = cur_group_id; - ++group_size; - } - } - - // Remove the assertion on batch.index, which can be null in the case that the data in this - // batch is entirely sparse. Although it's true that this indicates a likely issue with the - // user's data workflows, passing XGBoost entirely sparse data should not cause it to fail. - // See https://github.com/dmlc/xgboost/issues/1827 for complete detail. - // CHECK(batch.index != nullptr); - - // update information - this->info.num_row_ += batch.size; - // copy the data over - auto& data_vec = page_.data.HostVector(); - auto& offset_vec = page_.offset.HostVector(); - for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) { - uint32_t index = batch.index[i]; - bst_float fvalue = batch.value == nullptr ? 1.0f : batch.value[i]; - data_vec.emplace_back(index, fvalue); - this->info.num_col_ = std::max(this->info.num_col_, - static_cast(index + 1)); - } - size_t top = page_.offset.Size(); - for (size_t i = 0; i < batch.size; ++i) { - offset_vec.push_back(offset_vec[top - 1] + batch.offset[i + 1] - batch.offset[0]); - } - } - if (last_group_id != default_max) { - if (group_size > info.group_ptr_.back()) { - info.group_ptr_.push_back(group_size); - } - } - this->info.num_nonzero_ = static_cast(page_.data.Size()); - // Either every row has query ID or none at all - CHECK(qids.empty() || qids.size() == info.num_row_); -} - void SimpleCSRSource::LoadBinary(dmlc::Stream* fi) { int tmagic; CHECK(fi->Read(&tmagic, sizeof(tmagic)) == sizeof(tmagic)) << "invalid input file format"; diff --git a/src/data/simple_csr_source.h b/src/data/simple_csr_source.h index 1ae0d1896..200922e1d 100644 --- a/src/data/simple_csr_source.h +++ b/src/data/simple_csr_source.h @@ -45,12 +45,7 @@ class SimpleCSRSource : public DataSource { * \param src source data iter. */ void CopyFrom(DMatrix* src); - /*! - * \brief copy content of data from parser, also set the additional information. - * \param src source data iter. - * \param info The additional information reflected in the parser. - */ - void CopyFrom(dmlc::Parser* src); + /*! * \brief copy content of data from foreign **GPU** columnar buffer. * \param interfaces_str JSON representation of cuda array interfaces. diff --git a/src/data/simple_dmatrix.h b/src/data/simple_dmatrix.h index 0479a3577..6ea85bc0d 100644 --- a/src/data/simple_dmatrix.h +++ b/src/data/simple_dmatrix.h @@ -11,12 +11,15 @@ #include #include -#include #include +#include #include #include #include "simple_csr_source.h" +#include "../common/group_data.h" +#include "../common/math.h" +#include "adapter.h" namespace xgboost { namespace data { @@ -26,6 +29,121 @@ class SimpleDMatrix : public DMatrix { explicit SimpleDMatrix(std::unique_ptr>&& source) : source_(std::move(source)) {} + template + explicit SimpleDMatrix(AdapterT* adapter, float missing, int nthread) { + // Set number of threads but keep old value so we can reset it after + const int nthreadmax = omp_get_max_threads(); + if (nthread <= 0) nthread = nthreadmax; + int nthread_original = omp_get_max_threads(); + omp_set_num_threads(nthread); + + source_.reset(new SimpleCSRSource()); + SimpleCSRSource& mat = *reinterpret_cast(source_.get()); + std::vector qids; + uint64_t default_max = std::numeric_limits::max(); + uint64_t last_group_id = default_max; + bst_uint group_size = 0; + auto& offset_vec = mat.page_.offset.HostVector(); + auto& data_vec = mat.page_.data.HostVector(); + uint64_t inferred_num_columns = 0; + + adapter->BeforeFirst(); + // Iterate over batches of input data + while (adapter->Next()) { + auto &batch = adapter->Value(); + common::ParallelGroupBuilder< + Entry, std::remove_reference::type::value_type> + builder(&offset_vec, &data_vec); + builder.InitBudget(0, nthread); + + // First-pass over the batch counting valid elements + size_t num_lines = batch.Size(); +#pragma omp parallel for schedule(static) + for (omp_ulong i = 0; i < static_cast(num_lines); + ++i) { // NOLINT(*) + int tid = omp_get_thread_num(); + auto line = batch.GetLine(i); + for (auto j = 0ull; j < line.Size(); j++) { + auto element = line.GetElement(j); + inferred_num_columns = + std::max(inferred_num_columns, + static_cast(element.column_idx + 1)); + if (!common::CheckNAN(element.value) && element.value != missing) { + builder.AddBudget(element.row_idx, tid); + } + } + } + builder.InitStorage(); + + // Second pass over batch, placing elements in correct position +#pragma omp parallel for schedule(static) + for (omp_ulong i = 0; i < static_cast(num_lines); + ++i) { // NOLINT(*) + int tid = omp_get_thread_num(); + auto line = batch.GetLine(i); + for (auto j = 0ull; j < line.Size(); j++) { + auto element = line.GetElement(j); + if (!common::CheckNAN(element.value) && element.value != missing) { + builder.Push(element.row_idx, Entry(element.column_idx, element.value), + tid); + } + } + } + + // Append meta information if available + if (batch.Labels() != nullptr) { + auto& labels = mat.info.labels_.HostVector(); + labels.insert(labels.end(), batch.Labels(), batch.Labels() + batch.Size()); + } + if (batch.Weights() != nullptr) { + auto& weights = mat.info.weights_.HostVector(); + weights.insert(weights.end(), batch.Weights(), batch.Weights() + batch.Size()); + } + if (batch.Qid() != nullptr) { + qids.insert(qids.end(), batch.Qid(), batch.Qid() + batch.Size()); + // get group + for (size_t i = 0; i < batch.Size(); ++i) { + const uint64_t cur_group_id = batch.Qid()[i]; + if (last_group_id == default_max || last_group_id != cur_group_id) { + mat.info.group_ptr_.push_back(group_size); + } + last_group_id = cur_group_id; + ++group_size; + } + } + } + + if (last_group_id != default_max) { + if (group_size > mat.info.group_ptr_.back()) { + mat.info.group_ptr_.push_back(group_size); + } + } + + // Deal with empty rows/columns if necessary + if (adapter->NumColumns() == kAdapterUnknownSize) { + mat.info.num_col_ = inferred_num_columns; + } else { + mat.info.num_col_ = adapter->NumColumns(); + } + // Synchronise worker columns + rabit::Allreduce(&mat.info.num_col_, 1); + + if (adapter->NumRows() == kAdapterUnknownSize) { + mat.info.num_row_ = offset_vec.size() - 1; + } else { + if (offset_vec.empty()) { + offset_vec.emplace_back(0); + } + + while (offset_vec.size() - 1 < adapter->NumRows()) { + offset_vec.emplace_back(offset_vec.back()); + } + mat.info.num_row_ = adapter->NumRows(); + } + mat.info.num_nonzero_ = data_vec.size(); + omp_set_num_threads(nthread_original); + } + MetaInfo& Info() override; const MetaInfo& Info() const override; diff --git a/tests/cpp/common/test_group_data.cc b/tests/cpp/common/test_group_data.cc new file mode 100644 index 000000000..095f10d75 --- /dev/null +++ b/tests/cpp/common/test_group_data.cc @@ -0,0 +1,54 @@ +/*! + * Copyright 2019 by Contributors + */ +#include +#include +#include "../../../src/common/group_data.h" + +namespace xgboost { +namespace common { + +TEST(group_data, ParallelGroupBuilder) { + std::vector offsets; + std::vector data; + ParallelGroupBuilder builder(&offsets, &data); + builder.InitBudget(0, 1); + // Add two rows with two elements each + builder.AddBudget(0, 0, 2); + builder.AddBudget(1, 0, 2); + + builder.InitStorage(); + builder.Push(0, Entry(0, 0), 0); + builder.Push(0, Entry(1, 1), 0); + builder.Push(1, Entry(0, 2), 0); + builder.Push(1, Entry(1, 3), 0); + + std::vector expected_data{ + Entry(0, 0), + Entry(1, 1), + Entry(0, 2), + Entry(1, 3), + }; + std::vector expected_offsets{0, 2, 4}; + + EXPECT_EQ(data, expected_data); + EXPECT_EQ(offsets, expected_offsets); + + // Create new builder, add one more row given already populated offsets/data + ParallelGroupBuilder builder2(&offsets, &data); + builder2.InitBudget(0, 1); + builder2.AddBudget(2, 0, 2); + builder2.InitStorage(); + builder2.Push(2, Entry(0, 4), 0); + builder2.Push(2, Entry(1, 5), 0); + + expected_data.emplace_back(Entry(0, 4)); + expected_data.emplace_back(Entry(1, 5)); + expected_offsets.emplace_back(6); + + EXPECT_EQ(data, expected_data); + EXPECT_EQ(offsets, expected_offsets); +} + +} // namespace common +} // namespace xgboost diff --git a/tests/cpp/data/test_adapter.cc b/tests/cpp/data/test_adapter.cc new file mode 100644 index 000000000..3e0607362 --- /dev/null +++ b/tests/cpp/data/test_adapter.cc @@ -0,0 +1,104 @@ +// Copyright (c) 2019 by Contributors +#include +#include +#include +#include +#include "../../../src/data/adapter.h" +#include "../../../src/data/simple_dmatrix.h" +#include "../../../src/common/timer.h" +#include "../helpers.h" +using namespace xgboost; // NOLINT +TEST(c_api, CSRAdapter) { + int m = 3; + int n = 2; + std::vector data = {1, 2, 3, 4, 5}; + std::vector feature_idx = {0, 1, 0, 1, 1}; + std::vector row_ptr = {0, 2, 4, 5}; + data::CSRAdapter adapter(row_ptr.data(), feature_idx.data(), data.data(), + row_ptr.size() - 1, data.size(), n); + adapter.Next(); + auto & batch = adapter.Value(); + auto line0 = batch.GetLine(0); + EXPECT_EQ(line0.GetElement(0).value, 1); + EXPECT_EQ(line0.GetElement(1).value, 2); + + auto line1 = batch.GetLine(1); + EXPECT_EQ(line1 .GetElement(0).value, 3); + EXPECT_EQ(line1 .GetElement(1).value, 4); + auto line2 = batch.GetLine(2); + EXPECT_EQ(line2 .GetElement(0).value, 5); + EXPECT_EQ(line2 .GetElement(0).row_idx, 2); + EXPECT_EQ(line2 .GetElement(0).column_idx, 1); + + data::SimpleDMatrix dmat(&adapter, -1, std::nan("")); + EXPECT_EQ(dmat.Info().num_col_, 2); + EXPECT_EQ(dmat.Info().num_row_, 3); + EXPECT_EQ(dmat.Info().num_nonzero_, 5); + + for (auto &batch : dmat.GetBatches()) { + for (auto i = 0ull; i < batch.Size(); i++) { + auto inst = batch[i]; + for(auto j = 0ull; j < inst.size(); j++) + { + EXPECT_EQ(inst[j].fvalue, data[row_ptr[i] + j]); + EXPECT_EQ(inst[j].index, feature_idx[row_ptr[i] + j]); + } + } + } +} +TEST(c_api, DenseAdapter) { + int m = 3; + int n = 2; + std::vector data = {1, 2, 3, 4, 5, 6}; + data::DenseAdapter adapter(data.data(), m, m*n, n); + data::SimpleDMatrix dmat(&adapter,-1,std::numeric_limits::quiet_NaN()); + EXPECT_EQ(dmat.Info().num_col_, 2); + EXPECT_EQ(dmat.Info().num_row_, 3); + EXPECT_EQ(dmat.Info().num_nonzero_, 6); + + for (auto &batch : dmat.GetBatches()) { + for (auto i = 0ull; i < batch.Size(); i++) { + auto inst = batch[i]; + for(auto j = 0ull; j < inst.size(); j++) + { + EXPECT_EQ(inst[j].fvalue, data[i*n+j]); + EXPECT_EQ(inst[j].index, j); + } + } + } +} + +TEST(c_api, CSCAdapter) { + std::vector data = {1, 3, 2, 4, 5}; + std::vector row_idx = {0, 1, 0, 1, 2}; + std::vector col_ptr = {0, 2, 5}; + data::CSCAdapter adapter(col_ptr.data(), row_idx.data(), data.data(), 2, 3); + data::SimpleDMatrix dmat(&adapter,-1,std::numeric_limits::quiet_NaN()); + EXPECT_EQ(dmat.Info().num_col_, 2); + EXPECT_EQ(dmat.Info().num_row_, 3); + EXPECT_EQ(dmat.Info().num_nonzero_, 5); + + auto &batch = *dmat.GetBatches().begin(); + auto inst = batch[0]; + EXPECT_EQ(inst[0].fvalue, 1); + EXPECT_EQ(inst[0].index, 0); + EXPECT_EQ(inst[1].fvalue, 2); + EXPECT_EQ(inst[1].index, 1); + + inst = batch[1]; + EXPECT_EQ(inst[0].fvalue, 3); + EXPECT_EQ(inst[0].index, 0); + EXPECT_EQ(inst[1].fvalue, 4); + EXPECT_EQ(inst[1].index, 1); + + inst = batch[2]; + EXPECT_EQ(inst[0].fvalue, 5); + EXPECT_EQ(inst[0].index, 1); +} + +TEST(c_api, FileAdapter) { + std::string filename = "test.libsvm"; + CreateBigTestData(filename, 10); + std::unique_ptr> parser(dmlc::Parser::Create(filename.c_str(), 0, 1,"auto")); + data::FileAdapter adapter(parser.get()); +} diff --git a/tests/cpp/data/test_data.cc b/tests/cpp/data/test_data.cc index 65eb61977..041cd9dbc 100644 --- a/tests/cpp/data/test_data.cc +++ b/tests/cpp/data/test_data.cc @@ -101,7 +101,6 @@ TEST(DMatrix, Uri) { std::string path = tmpdir.path + "/small.csv"; std::ofstream fout(path); - ASSERT_TRUE(fout); size_t i = 0; for (size_t r = 0; r < kRows; ++r) { for (size_t c = 0; c < kCols; ++c) { diff --git a/tests/cpp/data/test_simple_dmatrix.cc b/tests/cpp/data/test_simple_dmatrix.cc index 1f3d602ba..24858f0e1 100644 --- a/tests/cpp/data/test_simple_dmatrix.cc +++ b/tests/cpp/data/test_simple_dmatrix.cc @@ -4,6 +4,9 @@ #include "../../../src/data/simple_dmatrix.h" #include "../helpers.h" +#include "../../../src/data/adapter.h" + +using namespace xgboost; // NOLINT TEST(SimpleDMatrix, MetaInfo) { dmlc::TemporaryDirectory tempdir; @@ -63,3 +66,63 @@ TEST(SimpleDMatrix, ColAccessWithoutBatches) { EXPECT_EQ(num_col_batch, 1) << "Expected number of batches to be 1"; delete dmat; } + +TEST(SimpleDMatrix, Empty) { + std::vector data{}; + std::vector feature_idx = {}; + std::vector row_ptr = {}; + + data::CSRAdapter csr_adapter(row_ptr.data(), feature_idx.data(), data.data(), 0, 0, 0); + data::SimpleDMatrix dmat(&csr_adapter, + std::numeric_limits::quiet_NaN(), 1); + CHECK_EQ(dmat.Info().num_nonzero_, 0); + CHECK_EQ(dmat.Info().num_row_, 0); + CHECK_EQ(dmat.Info().num_col_, 0); + for (auto &batch : dmat.GetBatches()) { + CHECK_EQ(batch.Size(), 0); + } + + data::DenseAdapter dense_adapter(nullptr, 0, 0, 0); + dmat = data::SimpleDMatrix(&dense_adapter, + std::numeric_limits::quiet_NaN(), 1); + CHECK_EQ(dmat.Info().num_nonzero_, 0); + CHECK_EQ(dmat.Info().num_row_, 0); + CHECK_EQ(dmat.Info().num_col_, 0); + for (auto &batch : dmat.GetBatches()) { + CHECK_EQ(batch.Size(), 0); + } + + data::CSCAdapter csc_adapter(nullptr, nullptr, nullptr, 0, 0); + dmat = data::SimpleDMatrix(&csc_adapter, + std::numeric_limits::quiet_NaN(), 1); + CHECK_EQ(dmat.Info().num_nonzero_, 0); + CHECK_EQ(dmat.Info().num_row_, 0); + CHECK_EQ(dmat.Info().num_col_, 0); + for (auto &batch : dmat.GetBatches()) { + CHECK_EQ(batch.Size(), 0); + } +} + +TEST(SimpleDMatrix, MissingData) { + std::vector data{0.0, std::nanf(""), 1.0}; + std::vector feature_idx = {0, 1, 0}; + std::vector row_ptr = {0, 2, 3}; + + data::CSRAdapter adapter(row_ptr.data(), feature_idx.data(), data.data(), 2, 3, 2); + data::SimpleDMatrix dmat(&adapter, std::numeric_limits::quiet_NaN(), 1); + CHECK_EQ(dmat.Info().num_nonzero_, 2); + dmat = data::SimpleDMatrix(&adapter, 1.0, 1); + CHECK_EQ(dmat.Info().num_nonzero_, 1); +} + +TEST(SimpleDMatrix, EmptyRow) { + std::vector data{0.0, 1.0}; + std::vector feature_idx = {0, 1}; + std::vector row_ptr = {0, 2, 2}; + + data::CSRAdapter adapter(row_ptr.data(), feature_idx.data(), data.data(), 2, 2, 2); + data::SimpleDMatrix dmat(&adapter, std::numeric_limits::quiet_NaN(), 1); + CHECK_EQ(dmat.Info().num_nonzero_, 2); + CHECK_EQ(dmat.Info().num_row_, 2); + CHECK_EQ(dmat.Info().num_col_, 2); +} diff --git a/tests/python/test_basic.py b/tests/python/test_basic.py index 80053831d..6dbd12799 100644 --- a/tests/python/test_basic.py +++ b/tests/python/test_basic.py @@ -64,25 +64,6 @@ class TestBasic(unittest.TestCase): # assert they are the same assert np.sum(np.abs(preds2 - preds)) == 0 - def test_np_view(self): - # Sliced Float32 array - y = np.array([12, 34, 56], np.float32)[::2] - from_view = xgb.DMatrix(np.array([[]]), label=y).get_label() - from_array = xgb.DMatrix(np.array([[]]), label=y + 0).get_label() - assert (from_view.shape == from_array.shape) - assert (from_view == from_array).all() - - # Sliced UInt array - z = np.array([12, 34, 56], np.uint32)[::2] - dmat = xgb.DMatrix(np.array([[]])) - dmat.set_uint_info('root_index', z) - from_view = dmat.get_uint_info('root_index') - dmat = xgb.DMatrix(np.array([[]])) - dmat.set_uint_info('root_index', z + 0) - from_array = dmat.get_uint_info('root_index') - assert (from_view.shape == from_array.shape) - assert (from_view == from_array).all() - def test_record_results(self): dtrain = xgb.DMatrix(dpath + 'agaricus.txt.train') dtest = xgb.DMatrix(dpath + 'agaricus.txt.test') @@ -127,72 +108,6 @@ class TestBasic(unittest.TestCase): # assert they are the same assert np.sum(np.abs(preds2 - preds)) == 0 - def test_dmatrix_init(self): - data = np.random.randn(5, 5) - - # different length - self.assertRaises(ValueError, xgb.DMatrix, data, - feature_names=list('abcdef')) - # contains duplicates - self.assertRaises(ValueError, xgb.DMatrix, data, - feature_names=['a', 'b', 'c', 'd', 'd']) - # contains symbol - self.assertRaises(ValueError, xgb.DMatrix, data, - feature_names=['a', 'b', 'c', 'd', 'e<1']) - - dm = xgb.DMatrix(data) - dm.feature_names = list('abcde') - assert dm.feature_names == list('abcde') - - assert dm.slice([0, 1]).feature_names == dm.feature_names - - dm.feature_types = 'q' - assert dm.feature_types == list('qqqqq') - - dm.feature_types = list('qiqiq') - assert dm.feature_types == list('qiqiq') - - def incorrect_type_set(): - dm.feature_types = list('abcde') - - self.assertRaises(ValueError, incorrect_type_set) - - # reset - dm.feature_names = None - self.assertEqual(dm.feature_names, ['f0', 'f1', 'f2', 'f3', 'f4']) - assert dm.feature_types is None - - def test_feature_names(self): - data = np.random.randn(100, 5) - target = np.array([0, 1] * 50) - - cases = [['Feature1', 'Feature2', 'Feature3', 'Feature4', 'Feature5'], - [u'要因1', u'要因2', u'要因3', u'要因4', u'要因5']] - - for features in cases: - dm = xgb.DMatrix(data, label=target, - feature_names=features) - assert dm.feature_names == features - assert dm.num_row() == 100 - assert dm.num_col() == 5 - - params = {'objective': 'multi:softprob', - 'eval_metric': 'mlogloss', - 'eta': 0.3, - 'num_class': 3} - - bst = xgb.train(params, dm, num_boost_round=10) - scores = bst.get_fscore() - assert list(sorted(k for k in scores)) == features - - dummy = np.random.randn(5, 5) - dm = xgb.DMatrix(dummy, feature_names=features) - bst.predict(dm) - - # different feature name must raises error - dm = xgb.DMatrix(dummy, feature_names=list('abcde')) - self.assertRaises(ValueError, bst.predict, dm) - def test_dump(self): data = np.random.randn(100, 2) target = np.array([0, 1] * 50) @@ -250,27 +165,6 @@ class TestBasic(unittest.TestCase): assert dm.num_row() == row assert dm.num_col() == cols - def test_dmatrix_numpy_init(self): - data = np.random.randn(5, 5) - dm = xgb.DMatrix(data) - assert dm.num_row() == 5 - assert dm.num_col() == 5 - - data = np.array([[1, 2], [3, 4]]) - dm = xgb.DMatrix(data) - assert dm.num_row() == 2 - assert dm.num_col() == 2 - - # 0d array - self.assertRaises(ValueError, xgb.DMatrix, np.array(1)) - # 1d array - self.assertRaises(ValueError, xgb.DMatrix, np.array([1, 2, 3])) - # 3d array - data = np.random.randn(5, 5, 5) - self.assertRaises(ValueError, xgb.DMatrix, data) - # object dtype - data = np.array([['a', 'b'], ['c', 'd']]) - self.assertRaises(ValueError, xgb.DMatrix, data) def test_cv(self): dm = xgb.DMatrix(dpath + 'agaricus.txt.train') @@ -336,12 +230,6 @@ class TestBasic(unittest.TestCase): ' dtype=float32)]') assert output == solution - def test_get_info(self): - dtrain = xgb.DMatrix(dpath + 'agaricus.txt.train') - dtrain.get_float_info('label') - dtrain.get_float_info('weight') - dtrain.get_float_info('base_margin') - dtrain.get_uint_info('root_index') class TestBasicPathLike(unittest.TestCase): diff --git a/tests/python/test_dmatrix.py b/tests/python/test_dmatrix.py new file mode 100644 index 000000000..5877e04fd --- /dev/null +++ b/tests/python/test_dmatrix.py @@ -0,0 +1,171 @@ +# -*- coding: utf-8 -*- +import numpy as np +import xgboost as xgb +import unittest +import scipy.sparse +from scipy.sparse import rand + +rng = np.random.RandomState(1) + +dpath = 'demo/data/' +rng = np.random.RandomState(1994) + + +class TestDMatrix(unittest.TestCase): + def test_dmatrix_numpy_init(self): + data = np.random.randn(5, 5) + dm = xgb.DMatrix(data) + assert dm.num_row() == 5 + assert dm.num_col() == 5 + + data = np.array([[1, 2], [3, 4]]) + dm = xgb.DMatrix(data) + assert dm.num_row() == 2 + assert dm.num_col() == 2 + + # 0d array + self.assertRaises(ValueError, xgb.DMatrix, np.array(1)) + # 1d array + self.assertRaises(ValueError, xgb.DMatrix, np.array([1, 2, 3])) + # 3d array + data = np.random.randn(5, 5, 5) + self.assertRaises(ValueError, xgb.DMatrix, data) + # object dtype + data = np.array([['a', 'b'], ['c', 'd']]) + self.assertRaises(ValueError, xgb.DMatrix, data) + + def test_csr(self): + indptr = np.array([0, 2, 3, 6]) + indices = np.array([0, 2, 2, 0, 1, 2]) + data = np.array([1, 2, 3, 4, 5, 6]) + X = scipy.sparse.csr_matrix((data, indices, indptr), shape=(3, 3)) + dtrain = xgb.DMatrix(X) + assert dtrain.num_row() == 3 + assert dtrain.num_col() == 3 + + def test_csc(self): + row = np.array([0, 2, 2, 0, 1, 2]) + col = np.array([0, 0, 1, 2, 2, 2]) + data = np.array([1, 2, 3, 4, 5, 6]) + X = scipy.sparse.csc_matrix((data, (row, col)), shape=(3, 3)) + dtrain = xgb.DMatrix(X) + assert dtrain.num_row() == 3 + assert dtrain.num_col() == 3 + + def test_np_view(self): + # Sliced Float32 array + y = np.array([12, 34, 56], np.float32)[::2] + from_view = xgb.DMatrix(np.array([[]]), label=y).get_label() + from_array = xgb.DMatrix(np.array([[]]), label=y + 0).get_label() + assert (from_view.shape == from_array.shape) + assert (from_view == from_array).all() + + # Sliced UInt array + z = np.array([12, 34, 56], np.uint32)[::2] + dmat = xgb.DMatrix(np.array([[]])) + dmat.set_uint_info('root_index', z) + from_view = dmat.get_uint_info('root_index') + dmat = xgb.DMatrix(np.array([[]])) + dmat.set_uint_info('root_index', z + 0) + from_array = dmat.get_uint_info('root_index') + assert (from_view.shape == from_array.shape) + assert (from_view == from_array).all() + + def test_feature_names(self): + data = np.random.randn(5, 5) + + # different length + self.assertRaises(ValueError, xgb.DMatrix, data, + feature_names=list('abcdef')) + # contains duplicates + self.assertRaises(ValueError, xgb.DMatrix, data, + feature_names=['a', 'b', 'c', 'd', 'd']) + # contains symbol + self.assertRaises(ValueError, xgb.DMatrix, data, + feature_names=['a', 'b', 'c', 'd', 'e<1']) + + dm = xgb.DMatrix(data) + dm.feature_names = list('abcde') + assert dm.feature_names == list('abcde') + + assert dm.slice([0, 1]).feature_names == dm.feature_names + + dm.feature_types = 'q' + assert dm.feature_types == list('qqqqq') + + dm.feature_types = list('qiqiq') + assert dm.feature_types == list('qiqiq') + + def incorrect_type_set(): + dm.feature_types = list('abcde') + + self.assertRaises(ValueError, incorrect_type_set) + + # reset + dm.feature_names = None + self.assertEqual(dm.feature_names, ['f0', 'f1', 'f2', 'f3', 'f4']) + assert dm.feature_types is None + + def test_feature_names(self): + data = np.random.randn(100, 5) + target = np.array([0, 1] * 50) + + cases = [['Feature1', 'Feature2', 'Feature3', 'Feature4', 'Feature5'], + [u'要因1', u'要因2', u'要因3', u'要因4', u'要因5']] + + for features in cases: + dm = xgb.DMatrix(data, label=target, + feature_names=features) + assert dm.feature_names == features + assert dm.num_row() == 100 + assert dm.num_col() == 5 + + params = {'objective': 'multi:softprob', + 'eval_metric': 'mlogloss', + 'eta': 0.3, + 'num_class': 3} + + bst = xgb.train(params, dm, num_boost_round=10) + scores = bst.get_fscore() + assert list(sorted(k for k in scores)) == features + + dummy = np.random.randn(5, 5) + dm = xgb.DMatrix(dummy, feature_names=features) + bst.predict(dm) + + # different feature name must raises error + dm = xgb.DMatrix(dummy, feature_names=list('abcde')) + self.assertRaises(ValueError, bst.predict, dm) + + def test_get_info(self): + dtrain = xgb.DMatrix(dpath + 'agaricus.txt.train') + dtrain.get_float_info('label') + dtrain.get_float_info('weight') + dtrain.get_float_info('base_margin') + dtrain.get_uint_info('root_index') + + def test_sparse_dmatrix_csr(self): + nrow = 100 + ncol = 1000 + x = rand(nrow, ncol, density=0.0005, format='csr', random_state=rng) + assert x.indices.max() < ncol - 1 + x.data[:] = 1 + dtrain = xgb.DMatrix(x, label=np.random.binomial(1, 0.3, nrow)) + assert (dtrain.num_row(), dtrain.num_col()) == (nrow, ncol) + watchlist = [(dtrain, 'train')] + param = {'max_depth': 3, 'objective': 'binary:logistic', 'verbosity': 0} + bst = xgb.train(param, dtrain, 5, watchlist) + bst.predict(dtrain) + + def test_sparse_dmatrix_csc(self): + nrow = 1000 + ncol = 100 + x = rand(nrow, ncol, density=0.0005, format='csc', random_state=rng) + assert x.indices.max() < nrow - 1 + x.data[:] = 1 + dtrain = xgb.DMatrix(x, label=np.random.binomial(1, 0.3, nrow)) + assert (dtrain.num_row(), dtrain.num_col()) == (nrow, ncol) + watchlist = [(dtrain, 'train')] + param = {'max_depth': 3, 'objective': 'binary:logistic', 'verbosity': 0} + bst = xgb.train(param, dtrain, 5, watchlist) + bst.predict(dtrain) diff --git a/tests/python/test_sparse_dmatrix.py b/tests/python/test_sparse_dmatrix.py deleted file mode 100644 index 03ea1f07d..000000000 --- a/tests/python/test_sparse_dmatrix.py +++ /dev/null @@ -1,33 +0,0 @@ -import numpy as np -import xgboost as xgb -from scipy.sparse import rand - -rng = np.random.RandomState(1) - -param = {'max_depth': 3, 'objective': 'binary:logistic', 'verbosity': 0} - - -def test_sparse_dmatrix_csr(): - nrow = 100 - ncol = 1000 - x = rand(nrow, ncol, density=0.0005, format='csr', random_state=rng) - assert x.indices.max() < ncol - 1 - x.data[:] = 1 - dtrain = xgb.DMatrix(x, label=np.random.binomial(1, 0.3, nrow)) - assert (dtrain.num_row(), dtrain.num_col()) == (nrow, ncol) - watchlist = [(dtrain, 'train')] - bst = xgb.train(param, dtrain, 5, watchlist) - bst.predict(dtrain) - - -def test_sparse_dmatrix_csc(): - nrow = 1000 - ncol = 100 - x = rand(nrow, ncol, density=0.0005, format='csc', random_state=rng) - assert x.indices.max() < nrow - 1 - x.data[:] = 1 - dtrain = xgb.DMatrix(x, label=np.random.binomial(1, 0.3, nrow)) - assert (dtrain.num_row(), dtrain.num_col()) == (nrow, ncol) - watchlist = [(dtrain, 'train')] - bst = xgb.train(param, dtrain, 5, watchlist) - bst.predict(dtrain)