Partial rewrite EllpackPage (#5352)

This commit is contained in:
Rory Mitchell
2020-03-11 10:15:53 +13:00
committed by GitHub
parent 7a99f8f27f
commit 3ad4333b0e
23 changed files with 496 additions and 733 deletions

View File

@@ -4,9 +4,9 @@
#include <xgboost/data.h>
#include "./ellpack_page.cuh"
#include "../common/hist_util.h"
#include "../common/random.h"
#include "./ellpack_page.cuh"
namespace xgboost {
@@ -17,13 +17,9 @@ EllpackPage::EllpackPage(DMatrix* dmat, const BatchParam& param)
EllpackPage::~EllpackPage() = default;
size_t EllpackPage::Size() const {
return impl_->Size();
}
size_t EllpackPage::Size() const { return impl_->Size(); }
void EllpackPage::SetBaseRowId(size_t row_id) {
impl_->SetBaseRowId(row_id);
}
void EllpackPage::SetBaseRowId(size_t row_id) { impl_->SetBaseRowId(row_id); }
// Bin each input data entry, store the bin indices in compressed form.
__global__ void CompressBinEllpackKernel(
@@ -65,16 +61,18 @@ __global__ void CompressBinEllpackKernel(
}
// Construct an ELLPACK matrix with the given number of empty rows.
EllpackPageImpl::EllpackPageImpl(int device, EllpackInfo info, size_t n_rows) {
EllpackPageImpl::EllpackPageImpl(int device, common::HistogramCuts cuts,
bool is_dense, size_t row_stride,
size_t n_rows)
: is_dense(is_dense),
cuts_(std::move(cuts)),
row_stride(row_stride),
n_rows(n_rows) {
monitor_.Init("ellpack_page");
dh::safe_cuda(cudaSetDevice(device));
matrix.info = info;
matrix.base_rowid = 0;
matrix.n_rows = n_rows;
monitor_.StartCuda("InitCompressedData");
InitCompressedData(device, n_rows);
InitCompressedData(device);
monitor_.StopCuda("InitCompressedData");
}
@@ -93,33 +91,27 @@ size_t GetRowStride(DMatrix* dmat) {
}
// Construct an ELLPACK matrix in memory.
EllpackPageImpl::EllpackPageImpl(DMatrix* dmat, const BatchParam& param) {
EllpackPageImpl::EllpackPageImpl(DMatrix* dmat, const BatchParam& param)
: is_dense(dmat->IsDense()) {
monitor_.Init("ellpack_page");
dh::safe_cuda(cudaSetDevice(param.gpu_id));
matrix.n_rows = dmat->Info().num_row_;
n_rows = dmat->Info().num_row_;
monitor_.StartCuda("Quantiles");
// Create the quantile sketches for the dmatrix and initialize HistogramCuts.
size_t row_stride = GetRowStride(dmat);
auto cuts = common::DeviceSketch(param.gpu_id, dmat, param.max_bin,
row_stride = GetRowStride(dmat);
cuts_ = common::DeviceSketch(param.gpu_id, dmat, param.max_bin,
param.gpu_batch_nrows);
monitor_.StopCuda("Quantiles");
monitor_.StartCuda("InitEllpackInfo");
InitInfo(param.gpu_id, dmat->IsDense(), row_stride, cuts);
monitor_.StopCuda("InitEllpackInfo");
monitor_.StartCuda("InitCompressedData");
InitCompressedData(param.gpu_id, dmat->Info().num_row_);
InitCompressedData(param.gpu_id);
monitor_.StopCuda("InitCompressedData");
monitor_.StartCuda("BinningCompression");
DeviceHistogramBuilderState hist_builder_row_state(dmat->Info().num_row_);
for (const auto& batch : dmat->GetBatches<SparsePage>()) {
hist_builder_row_state.BeginBatch(batch);
CreateHistIndices(param.gpu_id, batch, hist_builder_row_state.GetRowStateOnDevice());
hist_builder_row_state.EndBatch();
CreateHistIndices(param.gpu_id, batch);
}
monitor_.StopCuda("BinningCompression");
}
@@ -133,23 +125,26 @@ struct CopyPage {
size_t offset;
CopyPage(EllpackPageImpl* dst, EllpackPageImpl* src, size_t offset)
: cbw{dst->matrix.info.NumSymbols()},
dst_data_d{dst->gidx_buffer.data()},
src_iterator_d{src->gidx_buffer.data(), src->matrix.info.NumSymbols()},
: cbw{dst->NumSymbols()},
dst_data_d{dst->gidx_buffer.DevicePointer()},
src_iterator_d{src->gidx_buffer.DevicePointer(), src->NumSymbols()},
offset(offset) {}
__device__ void operator()(size_t element_id) {
cbw.AtomicWriteSymbol(dst_data_d, src_iterator_d[element_id], element_id + offset);
cbw.AtomicWriteSymbol(dst_data_d, src_iterator_d[element_id],
element_id + offset);
}
};
// Copy the data from the given EllpackPage to the current page.
size_t EllpackPageImpl::Copy(int device, EllpackPageImpl* page, size_t offset) {
monitor_.StartCuda("Copy");
size_t num_elements = page->matrix.n_rows * page->matrix.info.row_stride;
CHECK_EQ(matrix.info.row_stride, page->matrix.info.row_stride);
CHECK_EQ(matrix.info.NumSymbols(), page->matrix.info.NumSymbols());
CHECK_GE(matrix.n_rows * matrix.info.row_stride, offset + num_elements);
size_t num_elements = page->n_rows * page->row_stride;
CHECK_EQ(row_stride, page->row_stride);
CHECK_EQ(NumSymbols(), page->NumSymbols());
CHECK_GE(n_rows * row_stride, offset + num_elements);
gidx_buffer.SetDevice(device);
page->gidx_buffer.SetDevice(device);
dh::LaunchN(device, num_elements, CopyPage(this, page, offset));
monitor_.StopCuda("Copy");
return num_elements;
@@ -160,26 +155,29 @@ struct CompactPage {
common::CompressedBufferWriter cbw;
common::CompressedByteT* dst_data_d;
common::CompressedIterator<uint32_t> src_iterator_d;
/*! \brief An array that maps the rows from the full DMatrix to the compacted page.
/*! \brief An array that maps the rows from the full DMatrix to the compacted
* page.
*
* The total size is the number of rows in the original, uncompacted DMatrix. Elements are the
* row ids in the compacted page. Rows not needed are set to SIZE_MAX.
* The total size is the number of rows in the original, uncompacted DMatrix.
* Elements are the row ids in the compacted page. Rows not needed are set to
* SIZE_MAX.
*
* An example compacting 16 rows to 8 rows:
* [SIZE_MAX, 0, 1, SIZE_MAX, SIZE_MAX, 2, SIZE_MAX, 3, 4, 5, SIZE_MAX, 6, SIZE_MAX, 7, SIZE_MAX,
* SIZE_MAX]
* [SIZE_MAX, 0, 1, SIZE_MAX, SIZE_MAX, 2, SIZE_MAX, 3, 4, 5, SIZE_MAX, 6,
* SIZE_MAX, 7, SIZE_MAX, SIZE_MAX]
*/
common::Span<size_t> row_indexes;
size_t base_rowid;
size_t row_stride;
CompactPage(EllpackPageImpl* dst, EllpackPageImpl* src, common::Span<size_t> row_indexes)
: cbw{dst->matrix.info.NumSymbols()},
dst_data_d{dst->gidx_buffer.data()},
src_iterator_d{src->gidx_buffer.data(), src->matrix.info.NumSymbols()},
CompactPage(EllpackPageImpl* dst, EllpackPageImpl* src,
common::Span<size_t> row_indexes)
: cbw{dst->NumSymbols()},
dst_data_d{dst->gidx_buffer.DevicePointer()},
src_iterator_d{src->gidx_buffer.DevicePointer(), src->NumSymbols()},
row_indexes(row_indexes),
base_rowid{src->matrix.base_rowid},
row_stride{src->matrix.info.row_stride} {}
base_rowid{src->base_rowid},
row_stride{src->row_stride} {}
__device__ void operator()(size_t row_id) {
size_t src_row = base_rowid + row_id;
@@ -188,100 +186,72 @@ struct CompactPage {
size_t dst_offset = dst_row * row_stride;
size_t src_offset = row_id * row_stride;
for (size_t j = 0; j < row_stride; j++) {
cbw.AtomicWriteSymbol(dst_data_d, src_iterator_d[src_offset + j], dst_offset + j);
cbw.AtomicWriteSymbol(dst_data_d, src_iterator_d[src_offset + j],
dst_offset + j);
}
}
};
// Compacts the data from the given EllpackPage into the current page.
void EllpackPageImpl::Compact(int device, EllpackPageImpl* page, common::Span<size_t> row_indexes) {
void EllpackPageImpl::Compact(int device, EllpackPageImpl* page,
common::Span<size_t> row_indexes) {
monitor_.StartCuda("Compact");
CHECK_EQ(matrix.info.row_stride, page->matrix.info.row_stride);
CHECK_EQ(matrix.info.NumSymbols(), page->matrix.info.NumSymbols());
CHECK_LE(page->matrix.base_rowid + page->matrix.n_rows, row_indexes.size());
dh::LaunchN(device, page->matrix.n_rows, CompactPage(this, page, row_indexes));
CHECK_EQ(row_stride, page->row_stride);
CHECK_EQ(NumSymbols(), page->NumSymbols());
CHECK_LE(page->base_rowid + page->n_rows, row_indexes.size());
gidx_buffer.SetDevice(device);
page->gidx_buffer.SetDevice(device);
dh::LaunchN(device, page->n_rows, CompactPage(this, page, row_indexes));
monitor_.StopCuda("Compact");
}
// Construct an EllpackInfo based on histogram cuts of features.
EllpackInfo::EllpackInfo(int device,
bool is_dense,
size_t row_stride,
const common::HistogramCuts& hmat,
dh::BulkAllocator* ba)
: is_dense(is_dense), row_stride(row_stride), n_bins(hmat.Ptrs().back()) {
ba->Allocate(device,
&feature_segments, hmat.Ptrs().size(),
&gidx_fvalue_map, hmat.Values().size(),
&min_fvalue, hmat.MinValues().size());
dh::CopyVectorToDeviceSpan(gidx_fvalue_map, hmat.Values());
dh::CopyVectorToDeviceSpan(min_fvalue, hmat.MinValues());
dh::CopyVectorToDeviceSpan(feature_segments, hmat.Ptrs());
}
// Initialize the EllpackInfo for this page.
void EllpackPageImpl::InitInfo(int device,
bool is_dense,
size_t row_stride,
const common::HistogramCuts& hmat) {
matrix.info = EllpackInfo(device, is_dense, row_stride, hmat, &ba_);
}
// Initialize the buffer to stored compressed features.
void EllpackPageImpl::InitCompressedData(int device, size_t num_rows) {
size_t num_symbols = matrix.info.NumSymbols();
void EllpackPageImpl::InitCompressedData(int device) {
size_t num_symbols = NumSymbols();
// Required buffer size for storing data matrix in ELLPack format.
size_t compressed_size_bytes = common::CompressedBufferWriter::CalculateBufferSize(
matrix.info.row_stride * num_rows, num_symbols);
ba_.Allocate(device, &gidx_buffer, compressed_size_bytes);
thrust::fill(dh::tbegin(gidx_buffer), dh::tend(gidx_buffer), 0);
matrix.gidx_iter = common::CompressedIterator<uint32_t>(gidx_buffer.data(), num_symbols);
size_t compressed_size_bytes =
common::CompressedBufferWriter::CalculateBufferSize(row_stride * n_rows,
num_symbols);
gidx_buffer.SetDevice(device);
// Don't call fill unnecessarily
if (gidx_buffer.Size() == 0) {
gidx_buffer.Resize(compressed_size_bytes, 0);
} else {
gidx_buffer.Resize(compressed_size_bytes, 0);
thrust::fill(dh::tbegin(gidx_buffer), dh::tend(gidx_buffer), 0);
}
}
// Compress a CSR page into ELLPACK.
void EllpackPageImpl::CreateHistIndices(int device,
const SparsePage& row_batch,
const RowStateOnDevice& device_row_state) {
// Has any been allocated for me in this batch?
if (!device_row_state.rows_to_process_from_batch) return;
unsigned int null_gidx_value = matrix.info.n_bins;
size_t row_stride = matrix.info.row_stride;
const SparsePage& row_batch) {
if (row_batch.Size() == 0) return;
unsigned int null_gidx_value = NumSymbols() - 1;
const auto& offset_vec = row_batch.offset.ConstHostVector();
// bin and compress entries in batches of rows
size_t gpu_batch_nrows = std::min(
dh::TotalMemory(device) / (16 * row_stride * sizeof(Entry)),
static_cast<size_t>(device_row_state.rows_to_process_from_batch));
size_t gpu_batch_nrows =
std::min(dh::TotalMemory(device) / (16 * row_stride * sizeof(Entry)),
static_cast<size_t>(row_batch.Size()));
const std::vector<Entry>& data_vec = row_batch.data.ConstHostVector();
size_t gpu_nbatches = common::DivRoundUp(device_row_state.rows_to_process_from_batch,
gpu_batch_nrows);
size_t gpu_nbatches = common::DivRoundUp(row_batch.Size(), gpu_batch_nrows);
for (size_t gpu_batch = 0; gpu_batch < gpu_nbatches; ++gpu_batch) {
size_t batch_row_begin = gpu_batch * gpu_batch_nrows;
size_t batch_row_end = (gpu_batch + 1) * gpu_batch_nrows;
if (batch_row_end > device_row_state.rows_to_process_from_batch) {
batch_row_end = device_row_state.rows_to_process_from_batch;
}
size_t batch_row_end =
std::min((gpu_batch + 1) * gpu_batch_nrows, row_batch.Size());
size_t batch_nrows = batch_row_end - batch_row_begin;
const auto ent_cnt_begin =
offset_vec[device_row_state.row_offset_in_current_batch + batch_row_begin];
const auto ent_cnt_end =
offset_vec[device_row_state.row_offset_in_current_batch + batch_row_end];
const auto ent_cnt_begin = offset_vec[batch_row_begin];
const auto ent_cnt_end = offset_vec[batch_row_end];
/*! \brief row offset in SparsePage (the input data). */
dh::device_vector<size_t> row_ptrs(batch_nrows + 1);
thrust::copy(
offset_vec.data() + device_row_state.row_offset_in_current_batch + batch_row_begin,
offset_vec.data() + device_row_state.row_offset_in_current_batch + batch_row_end + 1,
row_ptrs.begin());
thrust::copy(offset_vec.data() + batch_row_begin,
offset_vec.data() + batch_row_end + 1, row_ptrs.begin());
// number of entries in this batch.
size_t n_entries = ent_cnt_end - ent_cnt_begin;
@@ -289,97 +259,50 @@ void EllpackPageImpl::CreateHistIndices(int device,
// copy data entries to device.
dh::safe_cuda(cudaMemcpy(entries_d.data().get(),
data_vec.data() + ent_cnt_begin,
n_entries * sizeof(Entry),
cudaMemcpyDefault));
n_entries * sizeof(Entry), cudaMemcpyDefault));
const dim3 block3(32, 8, 1); // 256 threads
const dim3 grid3(common::DivRoundUp(batch_nrows, block3.x),
common::DivRoundUp(row_stride, block3.y),
1);
dh::LaunchKernel {grid3, block3} (
CompressBinEllpackKernel,
common::CompressedBufferWriter(matrix.info.NumSymbols()),
gidx_buffer.data(),
row_ptrs.data().get(),
entries_d.data().get(),
matrix.info.gidx_fvalue_map.data(),
matrix.info.feature_segments.data(),
device_row_state.total_rows_processed + batch_row_begin,
batch_nrows,
row_stride,
common::DivRoundUp(row_stride, block3.y), 1);
auto device_accessor = GetDeviceAccessor(device);
dh::LaunchKernel {grid3, block3}(
CompressBinEllpackKernel, common::CompressedBufferWriter(NumSymbols()),
gidx_buffer.DevicePointer(), row_ptrs.data().get(),
entries_d.data().get(), device_accessor.gidx_fvalue_map.data(),
device_accessor.feature_segments.data(),
row_batch.base_rowid + batch_row_begin, batch_nrows, row_stride,
null_gidx_value);
}
}
// Return the number of rows contained in this page.
size_t EllpackPageImpl::Size() const {
return matrix.n_rows;
}
// Clear the current page.
void EllpackPageImpl::Clear() {
ba_.Clear();
gidx_buffer = {};
idx_buffer.clear();
sparse_page_.Clear();
matrix.base_rowid = 0;
matrix.n_rows = 0;
device_initialized_ = false;
}
// Push a CSR page to the current page.
//
// The CSR pages are accumulated in memory until they reach a certain size, then written out as
// compressed ELLPACK.
void EllpackPageImpl::Push(int device, const SparsePage& batch) {
sparse_page_.Push(batch);
matrix.n_rows += batch.Size();
}
// Compress the accumulated SparsePage.
void EllpackPageImpl::CompressSparsePage(int device) {
monitor_.StartCuda("InitCompressedData");
InitCompressedData(device, matrix.n_rows);
monitor_.StopCuda("InitCompressedData");
monitor_.StartCuda("BinningCompression");
DeviceHistogramBuilderState hist_builder_row_state(matrix.n_rows);
hist_builder_row_state.BeginBatch(sparse_page_);
CreateHistIndices(device, sparse_page_, hist_builder_row_state.GetRowStateOnDevice());
hist_builder_row_state.EndBatch();
monitor_.StopCuda("BinningCompression");
monitor_.StartCuda("CopyDeviceToHost");
idx_buffer.resize(gidx_buffer.size());
dh::CopyDeviceSpanToVector(&idx_buffer, gidx_buffer);
ba_.Clear();
gidx_buffer = {};
monitor_.StopCuda("CopyDeviceToHost");
}
size_t EllpackPageImpl::Size() const { return n_rows; }
// Return the memory cost for storing the compressed features.
size_t EllpackPageImpl::MemCostBytes() const {
// Required buffer size for storing data matrix in ELLPack format.
size_t compressed_size_bytes = common::CompressedBufferWriter::CalculateBufferSize(
matrix.info.row_stride * matrix.n_rows, matrix.info.NumSymbols());
size_t EllpackPageImpl::MemCostBytes(size_t num_rows, size_t row_stride,
const common::HistogramCuts& cuts) {
// Required buffer size for storing data matrix in EtoLLPack format.
size_t compressed_size_bytes =
common::CompressedBufferWriter::CalculateBufferSize(row_stride * num_rows,
cuts.TotalBins() + 1);
return compressed_size_bytes;
}
// Copy the compressed features to GPU.
void EllpackPageImpl::InitDevice(int device, EllpackInfo info) {
if (device_initialized_) return;
EllpackDeviceAccessor EllpackPageImpl::GetDeviceAccessor(int device) const {
gidx_buffer.SetDevice(device);
return EllpackDeviceAccessor(
device, cuts_, is_dense, row_stride, base_rowid, n_rows,
common::CompressedIterator<uint32_t>(gidx_buffer.ConstDevicePointer(),
NumSymbols()));
}
monitor_.StartCuda("CopyPageToDevice");
dh::safe_cuda(cudaSetDevice(device));
gidx_buffer = {};
ba_.Allocate(device, &gidx_buffer, idx_buffer.size());
dh::CopyVectorToDeviceSpan(gidx_buffer, idx_buffer);
matrix.info = info;
matrix.gidx_iter = common::CompressedIterator<uint32_t>(gidx_buffer.data(), info.n_bins + 1);
monitor_.StopCuda("CopyPageToDevice");
device_initialized_ = true;
EllpackPageImpl::EllpackPageImpl(int device, common::HistogramCuts cuts,
const SparsePage& page, bool is_dense,
size_t row_stride)
: cuts_(std::move(cuts)),
is_dense(is_dense),
n_rows(page.Size()),
row_stride(row_stride) {
this->InitCompressedData(device);
this->CreateHistIndices(device, page);
}
} // namespace xgboost