Write ELLPACK pages to disk (#4879)

* add ellpack source
* add batch param
* extract function to parse cache info
* construct ellpack info separately
* push batch to ellpack page
* write ellpack page.
* make sparse page source reusable
This commit is contained in:
Rong Ou
2019-10-22 20:44:32 -07:00
committed by Jiaming Yuan
parent 310fe60b35
commit 5b1715d97c
25 changed files with 935 additions and 408 deletions

View File

@@ -1,7 +1,5 @@
/*!
* Copyright 2019 XGBoost contributors
*
* \file ellpack_page.cu
*/
#include <xgboost/data.h>
@@ -12,14 +10,22 @@
namespace xgboost {
EllpackPage::EllpackPage(DMatrix* dmat) : impl_{new EllpackPageImpl(dmat)} {}
EllpackPage::EllpackPage() : impl_{new EllpackPageImpl()} {}
EllpackPage::EllpackPage(DMatrix* dmat, const BatchParam& param)
: impl_{new EllpackPageImpl(dmat, param)} {}
EllpackPage::~EllpackPage() = default;
EllpackPageImpl::EllpackPageImpl(DMatrix* dmat) : dmat_{dmat} {}
size_t EllpackPage::Size() const {
return impl_->Size();
}
void EllpackPage::SetBaseRowId(size_t row_id) {
impl_->SetBaseRowId(row_id);
}
// Bin each input data entry, store the bin indices in compressed form.
template<typename std::enable_if<true, int>::type = 0>
__global__ void CompressBinEllpackKernel(
common::CompressedBufferWriter wr,
common::CompressedByteT* __restrict__ buffer, // gidx_buffer
@@ -43,7 +49,7 @@ __global__ void CompressBinEllpackKernel(
int feature = entry.index;
float fvalue = entry.fvalue;
// {feature_cuts, ncuts} forms the array of cuts of `feature'.
const float *feature_cuts = &cuts[cut_rows[feature]];
const float* feature_cuts = &cuts[cut_rows[feature]];
int ncuts = cut_rows[feature + 1] - cut_rows[feature];
// Assigning the bin in current entry.
// S.t.: fvalue < feature_cuts[bin]
@@ -58,87 +64,90 @@ __global__ void CompressBinEllpackKernel(
wr.AtomicWriteSymbol(buffer, bin, (irow + base_row) * row_stride + ifeature);
}
void EllpackPageImpl::Init(int device, int max_bin, int gpu_batch_nrows) {
if (initialised_) return;
// Construct an ELLPACK matrix in memory.
EllpackPageImpl::EllpackPageImpl(DMatrix* dmat, const BatchParam& param) {
monitor_.Init("ellpack_page");
dh::safe_cuda(cudaSetDevice(device));
dh::safe_cuda(cudaSetDevice(param.gpu_id));
monitor_.StartCuda("Quantiles");
// Create the quantile sketches for the dmatrix and initialize HistogramCuts.
common::HistogramCuts hmat;
size_t row_stride = common::DeviceSketch(device, max_bin, gpu_batch_nrows, dmat_, &hmat);
size_t row_stride =
common::DeviceSketch(param.gpu_id, param.max_bin, param.gpu_batch_nrows, dmat, &hmat);
monitor_.StopCuda("Quantiles");
const auto& info = dmat_->Info();
auto is_dense = info.num_nonzero_ == info.num_row_ * info.num_col_;
monitor_.StartCuda("InitEllpackInfo");
InitInfo(param.gpu_id, dmat->IsDense(), row_stride, hmat);
monitor_.StopCuda("InitEllpackInfo");
// Init global data
monitor_.StartCuda("InitCompressedData");
InitCompressedData(device, hmat, row_stride, is_dense);
InitCompressedData(param.gpu_id, dmat->Info().num_row_);
monitor_.StopCuda("InitCompressedData");
monitor_.StartCuda("BinningCompression");
DeviceHistogramBuilderState hist_builder_row_state(info.num_row_);
for (const auto& batch : dmat_->GetBatches<SparsePage>()) {
DeviceHistogramBuilderState hist_builder_row_state(dmat->Info().num_row_);
for (const auto& batch : dmat->GetBatches<SparsePage>()) {
hist_builder_row_state.BeginBatch(batch);
CreateHistIndices(device, batch, hist_builder_row_state.GetRowStateOnDevice());
CreateHistIndices(param.gpu_id, batch, hist_builder_row_state.GetRowStateOnDevice());
hist_builder_row_state.EndBatch();
}
monitor_.StopCuda("BinningCompression");
initialised_ = true;
}
void EllpackPageImpl::InitCompressedData(int device,
const common::HistogramCuts& hmat,
size_t row_stride,
bool is_dense) {
n_bins = hmat.Ptrs().back();
int null_gidx_value = hmat.Ptrs().back();
int num_symbols = n_bins + 1;
// minimum value for each feature.
common::Span<bst_float> min_fvalue;
// Required buffer size for storing data matrix in ELLPack format.
size_t compressed_size_bytes = common::CompressedBufferWriter::CalculateBufferSize(
row_stride * dmat_->Info().num_row_, num_symbols);
// Construct an EllpackInfo based on histogram cuts of features.
EllpackInfo::EllpackInfo(int device,
bool is_dense,
size_t row_stride,
const common::HistogramCuts& hmat,
dh::BulkAllocator& ba)
: is_dense(is_dense), row_stride(row_stride), n_bins(hmat.Ptrs().back()) {
ba.Allocate(device,
&feature_segments, hmat.Ptrs().size(),
&gidx_fvalue_map, hmat.Values().size(),
&min_fvalue, hmat.MinValues().size(),
&gidx_buffer, compressed_size_bytes);
&min_fvalue, hmat.MinValues().size());
dh::CopyVectorToDeviceSpan(gidx_fvalue_map, hmat.Values());
dh::CopyVectorToDeviceSpan(min_fvalue, hmat.MinValues());
dh::CopyVectorToDeviceSpan(feature_segments, hmat.Ptrs());
}
// Initialize the EllpackInfo for this page.
void EllpackPageImpl::InitInfo(int device,
bool is_dense,
size_t row_stride,
const common::HistogramCuts& hmat) {
matrix.info = EllpackInfo(device, is_dense, row_stride, hmat, ba_);
}
// Initialize the buffer to stored compressed features.
void EllpackPageImpl::InitCompressedData(int device, size_t num_rows) {
int num_symbols = matrix.info.n_bins + 1;
// Required buffer size for storing data matrix in ELLPack format.
size_t compressed_size_bytes = common::CompressedBufferWriter::CalculateBufferSize(
matrix.info.row_stride * num_rows, num_symbols);
ba_.Allocate(device, &gidx_buffer, compressed_size_bytes);
thrust::fill(
thrust::device_pointer_cast(gidx_buffer.data()),
thrust::device_pointer_cast(gidx_buffer.data() + gidx_buffer.size()), 0);
ellpack_matrix.Init(feature_segments,
min_fvalue,
gidx_fvalue_map,
row_stride,
common::CompressedIterator<uint32_t>(gidx_buffer.data(), num_symbols),
is_dense,
null_gidx_value);
matrix.gidx_iter = common::CompressedIterator<uint32_t>(gidx_buffer.data(), num_symbols);
}
// Compress a CSR page into ELLPACK.
void EllpackPageImpl::CreateHistIndices(int device,
const SparsePage& row_batch,
const RowStateOnDevice& device_row_state) {
// Has any been allocated for me in this batch?
if (!device_row_state.rows_to_process_from_batch) return;
unsigned int null_gidx_value = n_bins;
size_t row_stride = this->ellpack_matrix.row_stride;
unsigned int null_gidx_value = matrix.info.n_bins;
size_t row_stride = matrix.info.row_stride;
const auto &offset_vec = row_batch.offset.ConstHostVector();
const auto& offset_vec = row_batch.offset.ConstHostVector();
int num_symbols = n_bins + 1;
int num_symbols = matrix.info.n_bins + 1;
// bin and compress entries in batches of rows
size_t gpu_batch_nrows = std::min(
dh::TotalMemory(device) / (16 * row_stride * sizeof(Entry)),
@@ -162,7 +171,7 @@ void EllpackPageImpl::CreateHistIndices(int device,
offset_vec[device_row_state.row_offset_in_current_batch + batch_row_end];
/*! \brief row offset in SparsePage (the input data). */
dh::device_vector<size_t> row_ptrs(batch_nrows+1);
dh::device_vector<size_t> row_ptrs(batch_nrows + 1);
thrust::copy(
offset_vec.data() + device_row_state.row_offset_in_current_batch + batch_row_begin,
offset_vec.data() + device_row_state.row_offset_in_current_batch + batch_row_end + 1,
@@ -185,8 +194,8 @@ void EllpackPageImpl::CreateHistIndices(int device,
gidx_buffer.data(),
row_ptrs.data().get(),
entries_d.data().get(),
gidx_fvalue_map.data(),
feature_segments.data(),
matrix.info.gidx_fvalue_map.data(),
matrix.info.feature_segments.data(),
device_row_state.total_rows_processed + batch_row_begin,
batch_nrows,
row_stride,
@@ -194,4 +203,73 @@ void EllpackPageImpl::CreateHistIndices(int device,
}
}
// Return the number of rows contained in this page.
size_t EllpackPageImpl::Size() const {
return n_rows;
}
// Clear the current page.
void EllpackPageImpl::Clear() {
ba_.Clear();
gidx_buffer = {};
idx_buffer.clear();
n_rows = 0;
}
// Push a CSR page to the current page.
//
// First compress the CSR page into ELLPACK, then the compressed buffer is copied to host and
// appended to the existing host vector.
void EllpackPageImpl::Push(int device, const SparsePage& batch) {
monitor_.StartCuda("InitCompressedData");
InitCompressedData(device, batch.Size());
monitor_.StopCuda("InitCompressedData");
monitor_.StartCuda("BinningCompression");
DeviceHistogramBuilderState hist_builder_row_state(batch.Size());
hist_builder_row_state.BeginBatch(batch);
CreateHistIndices(device, batch, hist_builder_row_state.GetRowStateOnDevice());
hist_builder_row_state.EndBatch();
monitor_.StopCuda("BinningCompression");
monitor_.StartCuda("CopyDeviceToHost");
std::vector<common::CompressedByteT> buffer(gidx_buffer.size());
dh::CopyDeviceSpanToVector(&buffer, gidx_buffer);
int offset = 0;
if (!idx_buffer.empty()) {
offset = ::xgboost::common::detail::kPadding;
}
idx_buffer.reserve(idx_buffer.size() + buffer.size() - offset);
idx_buffer.insert(idx_buffer.end(), buffer.begin() + offset, buffer.end());
ba_.Clear();
gidx_buffer = {};
monitor_.StopCuda("CopyDeviceToHost");
n_rows += batch.Size();
}
// Return the memory cost for storing the compressed features.
size_t EllpackPageImpl::MemCostBytes() const {
return idx_buffer.size() * sizeof(common::CompressedByteT);
}
// Copy the compressed features to GPU.
void EllpackPageImpl::InitDevice(int device, EllpackInfo info) {
if (device_initialized_) return;
monitor_.StartCuda("CopyPageToDevice");
dh::safe_cuda(cudaSetDevice(device));
gidx_buffer = {};
ba_.Allocate(device, &gidx_buffer, idx_buffer.size());
dh::CopyVectorToDeviceSpan(gidx_buffer, idx_buffer);
matrix.info = info;
matrix.gidx_iter = common::CompressedIterator<uint32_t>(gidx_buffer.data(), info.n_bins + 1);
monitor_.StopCuda("CopyPageToDevice");
device_initialized_ = true;
}
} // namespace xgboost