[LZ4] enable 16 bit index
This commit is contained in:
parent
c4d389c5df
commit
6de1c86d18
@ -22,9 +22,6 @@ class CompressArray {
|
||||
public:
|
||||
// the data content.
|
||||
std::vector<DType> data;
|
||||
CompressArray() {
|
||||
use_deep_compress_ = dmlc::GetEnv("XGBOOST_LZ4_COMPRESS_DEEP", true);
|
||||
}
|
||||
// Decompression helper
|
||||
// number of chunks
|
||||
inline int num_chunk() const {
|
||||
@ -49,8 +46,8 @@ class CompressArray {
|
||||
inline void InitCompressChunks(const std::vector<bst_uint>& chunk_ptr);
|
||||
// initialize the compression chunks
|
||||
inline void InitCompressChunks(size_t chunk_size, size_t max_nchunk);
|
||||
// run decode on chunk_id
|
||||
inline void Compress(int chunk_id);
|
||||
// run decode on chunk_id, level = -1 means default.
|
||||
inline void Compress(int chunk_id, bool use_hc);
|
||||
// save the output buffer into file.
|
||||
inline void Write(dmlc::Stream* fo);
|
||||
|
||||
@ -63,8 +60,6 @@ class CompressArray {
|
||||
std::vector<std::string> out_buffer_;
|
||||
// input buffer of data.
|
||||
std::string in_buffer_;
|
||||
// use deep compression.
|
||||
bool use_deep_compress_;
|
||||
};
|
||||
|
||||
template<typename DType>
|
||||
@ -123,7 +118,7 @@ inline void CompressArray<DType>::InitCompressChunks(size_t chunk_size, size_t m
|
||||
}
|
||||
|
||||
template<typename DType>
|
||||
inline void CompressArray<DType>::Compress(int chunk_id) {
|
||||
inline void CompressArray<DType>::Compress(int chunk_id, bool use_hc) {
|
||||
CHECK_LT(static_cast<size_t>(chunk_id + 1), raw_chunks_.size());
|
||||
std::string& buf = out_buffer_[chunk_id];
|
||||
size_t raw_chunk_size = (raw_chunks_[chunk_id + 1] - raw_chunks_[chunk_id]) * sizeof(DType);
|
||||
@ -131,11 +126,11 @@ inline void CompressArray<DType>::Compress(int chunk_id) {
|
||||
CHECK_NE(bound, 0);
|
||||
buf.resize(bound);
|
||||
int encoded_size;
|
||||
if (use_deep_compress_) {
|
||||
if (use_hc) {
|
||||
encoded_size = LZ4_compress_HC(
|
||||
reinterpret_cast<char*>(dmlc::BeginPtr(data) + raw_chunks_[chunk_id]),
|
||||
dmlc::BeginPtr(buf), raw_chunk_size, buf.length(), 0);
|
||||
} else{
|
||||
dmlc::BeginPtr(buf), raw_chunk_size, buf.length(), 9);
|
||||
} else {
|
||||
encoded_size = LZ4_compress_default(
|
||||
reinterpret_cast<char*>(dmlc::BeginPtr(data) + raw_chunks_[chunk_id]),
|
||||
dmlc::BeginPtr(buf), raw_chunk_size, buf.length());
|
||||
@ -159,23 +154,25 @@ inline void CompressArray<DType>::Write(dmlc::Stream* fo) {
|
||||
}
|
||||
}
|
||||
|
||||
template<typename StorageIndex>
|
||||
class SparsePageLZ4Format : public SparsePage::Format {
|
||||
public:
|
||||
SparsePageLZ4Format() {
|
||||
explicit SparsePageLZ4Format(bool use_lz4_hc)
|
||||
: use_lz4_hc_(use_lz4_hc) {
|
||||
raw_bytes_ = raw_bytes_value_ = raw_bytes_index_ = 0;
|
||||
encoded_bytes_value_ = encoded_bytes_index_ = 0;
|
||||
nthread_ = 4;
|
||||
nthread_ = dmlc::GetEnv("XGBOOST_LZ4_DECODE_NTHREAD", 4);
|
||||
nthread_write_ = dmlc::GetEnv("XGBOOST_LZ4_COMPRESS_NTHREAD", 12);
|
||||
}
|
||||
~SparsePageLZ4Format() {
|
||||
virtual ~SparsePageLZ4Format() {
|
||||
size_t encoded_bytes = raw_bytes_ + encoded_bytes_value_ + encoded_bytes_index_;
|
||||
raw_bytes_ += raw_bytes_value_ + raw_bytes_index_;
|
||||
if (raw_bytes_ != 0) {
|
||||
LOG(CONSOLE) << "raw_bytes=" << raw_bytes_
|
||||
<< ", encoded_bytes=" << encoded_bytes
|
||||
<< ", ratio=" << double(encoded_bytes) / raw_bytes_
|
||||
<< ",ratio-index=" << double(encoded_bytes_index_) /raw_bytes_index_
|
||||
<< ",ratio-value=" << double(encoded_bytes_value_) /raw_bytes_value_;
|
||||
<< ", ratio-index=" << double(encoded_bytes_index_) /raw_bytes_index_
|
||||
<< ", ratio-value=" << double(encoded_bytes_value_) /raw_bytes_value_;
|
||||
}
|
||||
}
|
||||
|
||||
@ -188,7 +185,7 @@ class SparsePageLZ4Format : public SparsePage::Format {
|
||||
CHECK_EQ(index_.data.size(), value_.data.size());
|
||||
CHECK_EQ(index_.data.size(), page->data.size());
|
||||
for (size_t i = 0; i < page->data.size(); ++i) {
|
||||
page->data[i] = SparseBatch::Entry(index_.data[i], value_.data[i]);
|
||||
page->data[i] = SparseBatch::Entry(index_.data[i] + min_index_, value_.data[i]);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
@ -216,7 +213,7 @@ class SparsePageLZ4Format : public SparsePage::Format {
|
||||
size_t num = disk_offset_[cid + 1] - disk_offset_[cid];
|
||||
for (size_t j = 0; j < num; ++j) {
|
||||
page->data[dst_begin + j] = SparseBatch::Entry(
|
||||
index_.data[src_begin + j], value_.data[src_begin + j]);
|
||||
index_.data[src_begin + j] + min_index_, value_.data[src_begin + j]);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
@ -226,11 +223,18 @@ class SparsePageLZ4Format : public SparsePage::Format {
|
||||
CHECK(page.offset.size() != 0 && page.offset[0] == 0);
|
||||
CHECK_EQ(page.offset.back(), page.data.size());
|
||||
fo->Write(page.offset);
|
||||
min_index_ = page.min_index;
|
||||
fo->Write(&min_index_, sizeof(min_index_));
|
||||
index_.data.resize(page.data.size());
|
||||
value_.data.resize(page.data.size());
|
||||
|
||||
for (size_t i = 0; i < page.data.size(); ++i) {
|
||||
index_.data[i] = page.data[i].index;
|
||||
bst_uint idx = page.data[i].index - min_index_;
|
||||
CHECK_LE(idx, static_cast<bst_uint>(std::numeric_limits<StorageIndex>::max()))
|
||||
<< "The storage index is chosen to limited to smaller equal than "
|
||||
<< std::numeric_limits<StorageIndex>::max()
|
||||
<< "min_index=" << min_index_;
|
||||
index_.data[i] = static_cast<StorageIndex>(idx);
|
||||
value_.data[i] = page.data[i].fvalue;
|
||||
}
|
||||
|
||||
@ -243,15 +247,15 @@ class SparsePageLZ4Format : public SparsePage::Format {
|
||||
#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread_write_)
|
||||
for (int i = 0; i < ntotal; ++i) {
|
||||
if (i < nindex) {
|
||||
index_.Compress(i);
|
||||
index_.Compress(i, use_lz4_hc_);
|
||||
} else {
|
||||
value_.Compress(i - nindex);
|
||||
value_.Compress(i - nindex, use_lz4_hc_);
|
||||
}
|
||||
}
|
||||
index_.Write(fo);
|
||||
value_.Write(fo);
|
||||
// statistics
|
||||
raw_bytes_index_ += index_.RawBytes();
|
||||
raw_bytes_index_ += index_.RawBytes() * sizeof(bst_uint) / sizeof(StorageIndex);
|
||||
raw_bytes_value_ += value_.RawBytes();
|
||||
encoded_bytes_index_ += index_.EncodedBytes();
|
||||
encoded_bytes_value_ += value_.EncodedBytes();
|
||||
@ -259,6 +263,7 @@ class SparsePageLZ4Format : public SparsePage::Format {
|
||||
}
|
||||
|
||||
inline void LoadIndexValue(dmlc::SeekStream* fi) {
|
||||
fi->Read(&min_index_, sizeof(min_index_));
|
||||
index_.Read(fi);
|
||||
value_.Read(fi);
|
||||
|
||||
@ -280,6 +285,8 @@ class SparsePageLZ4Format : public SparsePage::Format {
|
||||
static const size_t kChunkSize = 64 << 10UL;
|
||||
// maximum chunk size.
|
||||
static const size_t kMaxChunk = 128;
|
||||
// bool whether use hc
|
||||
bool use_lz4_hc_;
|
||||
// number of threads
|
||||
int nthread_;
|
||||
// number of writing threads
|
||||
@ -288,10 +295,12 @@ class SparsePageLZ4Format : public SparsePage::Format {
|
||||
size_t raw_bytes_, raw_bytes_index_, raw_bytes_value_;
|
||||
// encoded bytes
|
||||
size_t encoded_bytes_index_, encoded_bytes_value_;
|
||||
/*! \brief minimum index value */
|
||||
uint32_t min_index_;
|
||||
/*! \brief external memory column offset */
|
||||
std::vector<size_t> disk_offset_;
|
||||
// internal index
|
||||
CompressArray<bst_uint> index_;
|
||||
CompressArray<StorageIndex> index_;
|
||||
// value set.
|
||||
CompressArray<bst_float> value_;
|
||||
};
|
||||
@ -299,7 +308,20 @@ class SparsePageLZ4Format : public SparsePage::Format {
|
||||
XGBOOST_REGISTER_SPARSE_PAGE_FORMAT(lz4)
|
||||
.describe("Apply LZ4 binary data compression for ext memory.")
|
||||
.set_body([]() {
|
||||
return new SparsePageLZ4Format();
|
||||
return new SparsePageLZ4Format<bst_uint>(false);
|
||||
});
|
||||
|
||||
XGBOOST_REGISTER_SPARSE_PAGE_FORMAT(lz4hc)
|
||||
.describe("Apply LZ4 binary data compression(high compression ratio) for ext memory.")
|
||||
.set_body([]() {
|
||||
return new SparsePageLZ4Format<bst_uint>(true);
|
||||
});
|
||||
|
||||
XGBOOST_REGISTER_SPARSE_PAGE_FORMAT(lz4i16hc)
|
||||
.describe("Apply LZ4 binary data compression(16 bit index mode) for ext memory.")
|
||||
.set_body([]() {
|
||||
return new SparsePageLZ4Format<uint16_t>(true);
|
||||
});
|
||||
|
||||
} // namespace data
|
||||
} // namespace xgboost
|
||||
|
||||
@ -247,12 +247,21 @@ SparsePage::Format* SparsePage::Format::Create(const std::string& name) {
|
||||
return (e->body)();
|
||||
}
|
||||
|
||||
std::string SparsePage::Format::DecideFormat(const std::string& cache_prefix) {
|
||||
std::pair<std::string, std::string>
|
||||
SparsePage::Format::DecideFormat(const std::string& cache_prefix) {
|
||||
size_t pos = cache_prefix.rfind(".fmt-");
|
||||
|
||||
if (pos != std::string::npos) {
|
||||
return cache_prefix.substr(pos + 5, cache_prefix.length());
|
||||
std::string fmt = cache_prefix.substr(pos + 5, cache_prefix.length());
|
||||
size_t cpos = fmt.rfind('-');
|
||||
if (cpos != std::string::npos) {
|
||||
return std::make_pair(fmt.substr(0, cpos), fmt.substr(cpos + 1, fmt.length()));
|
||||
} else {
|
||||
return "raw";
|
||||
return std::make_pair(fmt, fmt);
|
||||
}
|
||||
} else {
|
||||
std::string raw = "raw";
|
||||
return std::make_pair(raw, raw);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
#include <algorithm>
|
||||
#include <cstring>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
|
||||
namespace xgboost {
|
||||
namespace data {
|
||||
@ -25,7 +26,8 @@ class SparsePage {
|
||||
public:
|
||||
/*! \brief Format of the sparse page. */
|
||||
class Format;
|
||||
|
||||
/*! \brief minimum index of all index, used as hint for compression. */
|
||||
bst_uint min_index;
|
||||
/*! \brief offset of the segments */
|
||||
std::vector<size_t> offset;
|
||||
/*! \brief the data of the segments */
|
||||
@ -45,6 +47,7 @@ class SparsePage {
|
||||
}
|
||||
/*! \brief clear the page */
|
||||
inline void Clear(void) {
|
||||
min_index = 0;
|
||||
offset.clear();
|
||||
offset.push_back(0);
|
||||
data.clear();
|
||||
@ -163,9 +166,9 @@ class SparsePage::Format {
|
||||
static Format* Create(const std::string& name);
|
||||
/*!
|
||||
* \brief decide the format from cache prefix.
|
||||
* \return format type of the cache prefix.
|
||||
* \return pair of row format, column format type of the cache prefix.
|
||||
*/
|
||||
static std::string DecideFormat(const std::string& cache_prefix);
|
||||
static std::pair<std::string, std::string> DecideFormat(const std::string& cache_prefix);
|
||||
};
|
||||
|
||||
/*!
|
||||
|
||||
@ -136,6 +136,7 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
|
||||
// make the sparse page.
|
||||
dmlc::ThreadedIter<SparsePage> cmaker;
|
||||
SparsePage tmp;
|
||||
size_t batch_ptr = 0, batch_top = 0;
|
||||
dmlc::DataIter<RowBatch>* iter = this->RowIterator();
|
||||
std::bernoulli_distribution coin_flip(pkeep);
|
||||
|
||||
@ -151,13 +152,13 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
|
||||
}
|
||||
SparsePage* pcol = *dptr;
|
||||
pcol->Clear();
|
||||
pcol->min_index = ridx[0];
|
||||
int nthread;
|
||||
#pragma omp parallel
|
||||
{
|
||||
nthread = omp_get_num_threads();
|
||||
nthread = std::max(nthread, std::max(omp_get_num_procs() / 2 - 1, 1));
|
||||
}
|
||||
pcol->Clear();
|
||||
common::ParallelGroupBuilder<SparseBatch::Entry>
|
||||
builder(&pcol->offset, &pcol->data);
|
||||
builder.InitBudget(info.num_col, nthread);
|
||||
@ -199,21 +200,32 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
|
||||
auto make_next_col = [&] (SparsePage** dptr) {
|
||||
tmp.Clear();
|
||||
size_t btop = buffered_rowset_.size();
|
||||
while (iter->Next()) {
|
||||
|
||||
while (true) {
|
||||
if (batch_ptr != batch_top) {
|
||||
const RowBatch& batch = iter->Value();
|
||||
for (size_t i = 0; i < batch.size; ++i) {
|
||||
CHECK_EQ(batch_top, batch.size);
|
||||
for (size_t i = batch_ptr; i < batch_top; ++i) {
|
||||
bst_uint ridx = static_cast<bst_uint>(batch.base_rowid + i);
|
||||
if (pkeep == 1.0f || coin_flip(rnd)) {
|
||||
buffered_rowset_.push_back(ridx);
|
||||
tmp.Push(batch[i]);
|
||||
}
|
||||
}
|
||||
if (tmp.MemCostBytes() >= kPageSize ||
|
||||
tmp.Size() >= max_row_perbatch) {
|
||||
|
||||
if (tmp.Size() >= max_row_perbatch ||
|
||||
tmp.MemCostBytes() >= kPageSize) {
|
||||
make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr);
|
||||
batch_ptr = i + 1;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
batch_ptr = batch_top;
|
||||
}
|
||||
if (!iter->Next()) break;
|
||||
batch_ptr = 0;
|
||||
batch_top = iter->Value().size;
|
||||
}
|
||||
|
||||
if (tmp.Size() != 0) {
|
||||
make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr);
|
||||
return true;
|
||||
@ -227,12 +239,15 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
|
||||
std::string col_data_name = cache_prefix_ + ".col.page";
|
||||
std::unique_ptr<dmlc::Stream> fo(dmlc::Stream::Create(col_data_name.c_str(), "w"));
|
||||
// find format.
|
||||
std::string name_format = SparsePage::Format::DecideFormat(cache_prefix_);
|
||||
std::string name_format = SparsePage::Format::DecideFormat(cache_prefix_).second;
|
||||
fo->Write(name_format);
|
||||
std::unique_ptr<SparsePage::Format> format(SparsePage::Format::Create(name_format));
|
||||
|
||||
double tstart = dmlc::GetTime();
|
||||
size_t bytes_write = 0;
|
||||
// print every 4 sec.
|
||||
const double kStep = 4.0;
|
||||
size_t tick_expected = kStep;
|
||||
SparsePage* pcol = nullptr;
|
||||
|
||||
while (cmaker.Next(&pcol)) {
|
||||
@ -243,9 +258,12 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
|
||||
size_t spage = pcol->MemCostBytes();
|
||||
bytes_write += spage;
|
||||
double tdiff = dmlc::GetTime() - tstart;
|
||||
if (tdiff >= tick_expected) {
|
||||
LOG(CONSOLE) << "Writing to " << col_data_name
|
||||
<< " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, "
|
||||
<< (bytes_write >> 20UL) << " MB writen";
|
||||
tick_expected += kStep;
|
||||
}
|
||||
cmaker.Recycle(&pcol);
|
||||
}
|
||||
// save meta data
|
||||
|
||||
@ -78,7 +78,7 @@ void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
|
||||
std::string name_info = cache_prefix;
|
||||
std::string name_row = cache_prefix + ".row.page";
|
||||
std::unique_ptr<dmlc::Stream> fo(dmlc::Stream::Create(name_row.c_str(), "w"));
|
||||
std::string name_format = SparsePage::Format::DecideFormat(cache_prefix);
|
||||
std::string name_format = SparsePage::Format::DecideFormat(cache_prefix).first;
|
||||
fo->Write(name_format);
|
||||
std::unique_ptr<SparsePage::Format> format(SparsePage::Format::Create(name_format));
|
||||
|
||||
@ -86,6 +86,9 @@ void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
|
||||
SparsePage page;
|
||||
size_t bytes_write = 0;
|
||||
double tstart = dmlc::GetTime();
|
||||
// print every 4 sec.
|
||||
const double kStep = 4.0;
|
||||
size_t tick_expected = kStep;
|
||||
|
||||
while (src->Next()) {
|
||||
const dmlc::RowBlock<uint32_t>& batch = src->Value();
|
||||
@ -108,9 +111,12 @@ void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
|
||||
format->Write(page, fo.get());
|
||||
page.Clear();
|
||||
double tdiff = dmlc::GetTime() - tstart;
|
||||
if (tdiff >= tick_expected) {
|
||||
LOG(CONSOLE) << "Writing to " << name_row << " in "
|
||||
<< ((bytes_write >> 20UL) / tdiff) << " MB/s, "
|
||||
<< (bytes_write >> 20UL) << " written";
|
||||
tick_expected += kStep;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -133,7 +139,7 @@ void SparsePageSource::Create(DMatrix* src,
|
||||
std::string name_row = cache_prefix + ".row.page";
|
||||
std::unique_ptr<dmlc::Stream> fo(dmlc::Stream::Create(name_row.c_str(), "w"));
|
||||
// find format.
|
||||
std::string name_format = SparsePage::Format::DecideFormat(cache_prefix);
|
||||
std::string name_format = SparsePage::Format::DecideFormat(cache_prefix).first;
|
||||
fo->Write(name_format);
|
||||
std::unique_ptr<SparsePage::Format> format(SparsePage::Format::Create(name_format));
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user