diff --git a/plugin/lz4/sparse_page_lz4_format.cc b/plugin/lz4/sparse_page_lz4_format.cc index b7c4742f5..cad2ceadd 100644 --- a/plugin/lz4/sparse_page_lz4_format.cc +++ b/plugin/lz4/sparse_page_lz4_format.cc @@ -22,9 +22,6 @@ class CompressArray { public: // the data content. std::vector data; - CompressArray() { - use_deep_compress_ = dmlc::GetEnv("XGBOOST_LZ4_COMPRESS_DEEP", true); - } // Decompression helper // number of chunks inline int num_chunk() const { @@ -49,8 +46,8 @@ class CompressArray { inline void InitCompressChunks(const std::vector& chunk_ptr); // initialize the compression chunks inline void InitCompressChunks(size_t chunk_size, size_t max_nchunk); - // run decode on chunk_id - inline void Compress(int chunk_id); + // run decode on chunk_id, level = -1 means default. + inline void Compress(int chunk_id, bool use_hc); // save the output buffer into file. inline void Write(dmlc::Stream* fo); @@ -63,8 +60,6 @@ class CompressArray { std::vector out_buffer_; // input buffer of data. std::string in_buffer_; - // use deep compression. - bool use_deep_compress_; }; template @@ -123,7 +118,7 @@ inline void CompressArray::InitCompressChunks(size_t chunk_size, size_t m } template -inline void CompressArray::Compress(int chunk_id) { +inline void CompressArray::Compress(int chunk_id, bool use_hc) { CHECK_LT(static_cast(chunk_id + 1), raw_chunks_.size()); std::string& buf = out_buffer_[chunk_id]; size_t raw_chunk_size = (raw_chunks_[chunk_id + 1] - raw_chunks_[chunk_id]) * sizeof(DType); @@ -131,11 +126,11 @@ inline void CompressArray::Compress(int chunk_id) { CHECK_NE(bound, 0); buf.resize(bound); int encoded_size; - if (use_deep_compress_) { + if (use_hc) { encoded_size = LZ4_compress_HC( reinterpret_cast(dmlc::BeginPtr(data) + raw_chunks_[chunk_id]), - dmlc::BeginPtr(buf), raw_chunk_size, buf.length(), 0); - } else{ + dmlc::BeginPtr(buf), raw_chunk_size, buf.length(), 9); + } else { encoded_size = LZ4_compress_default( reinterpret_cast(dmlc::BeginPtr(data) + raw_chunks_[chunk_id]), dmlc::BeginPtr(buf), raw_chunk_size, buf.length()); @@ -159,23 +154,25 @@ inline void CompressArray::Write(dmlc::Stream* fo) { } } +template class SparsePageLZ4Format : public SparsePage::Format { public: - SparsePageLZ4Format() { + explicit SparsePageLZ4Format(bool use_lz4_hc) + : use_lz4_hc_(use_lz4_hc) { raw_bytes_ = raw_bytes_value_ = raw_bytes_index_ = 0; encoded_bytes_value_ = encoded_bytes_index_ = 0; - nthread_ = 4; + nthread_ = dmlc::GetEnv("XGBOOST_LZ4_DECODE_NTHREAD", 4); nthread_write_ = dmlc::GetEnv("XGBOOST_LZ4_COMPRESS_NTHREAD", 12); } - ~SparsePageLZ4Format() { + virtual ~SparsePageLZ4Format() { size_t encoded_bytes = raw_bytes_ + encoded_bytes_value_ + encoded_bytes_index_; raw_bytes_ += raw_bytes_value_ + raw_bytes_index_; if (raw_bytes_ != 0) { LOG(CONSOLE) << "raw_bytes=" << raw_bytes_ << ", encoded_bytes=" << encoded_bytes << ", ratio=" << double(encoded_bytes) / raw_bytes_ - << ",ratio-index=" << double(encoded_bytes_index_) /raw_bytes_index_ - << ",ratio-value=" << double(encoded_bytes_value_) /raw_bytes_value_; + << ", ratio-index=" << double(encoded_bytes_index_) /raw_bytes_index_ + << ", ratio-value=" << double(encoded_bytes_value_) /raw_bytes_value_; } } @@ -188,7 +185,7 @@ class SparsePageLZ4Format : public SparsePage::Format { CHECK_EQ(index_.data.size(), value_.data.size()); CHECK_EQ(index_.data.size(), page->data.size()); for (size_t i = 0; i < page->data.size(); ++i) { - page->data[i] = SparseBatch::Entry(index_.data[i], value_.data[i]); + page->data[i] = SparseBatch::Entry(index_.data[i] + min_index_, value_.data[i]); } return true; } @@ -216,7 +213,7 @@ class SparsePageLZ4Format : public SparsePage::Format { size_t num = disk_offset_[cid + 1] - disk_offset_[cid]; for (size_t j = 0; j < num; ++j) { page->data[dst_begin + j] = SparseBatch::Entry( - index_.data[src_begin + j], value_.data[src_begin + j]); + index_.data[src_begin + j] + min_index_, value_.data[src_begin + j]); } } return true; @@ -226,11 +223,18 @@ class SparsePageLZ4Format : public SparsePage::Format { CHECK(page.offset.size() != 0 && page.offset[0] == 0); CHECK_EQ(page.offset.back(), page.data.size()); fo->Write(page.offset); + min_index_ = page.min_index; + fo->Write(&min_index_, sizeof(min_index_)); index_.data.resize(page.data.size()); value_.data.resize(page.data.size()); for (size_t i = 0; i < page.data.size(); ++i) { - index_.data[i] = page.data[i].index; + bst_uint idx = page.data[i].index - min_index_; + CHECK_LE(idx, static_cast(std::numeric_limits::max())) + << "The storage index is chosen to limited to smaller equal than " + << std::numeric_limits::max() + << "min_index=" << min_index_; + index_.data[i] = static_cast(idx); value_.data[i] = page.data[i].fvalue; } @@ -243,15 +247,15 @@ class SparsePageLZ4Format : public SparsePage::Format { #pragma omp parallel for schedule(dynamic, 1) num_threads(nthread_write_) for (int i = 0; i < ntotal; ++i) { if (i < nindex) { - index_.Compress(i); + index_.Compress(i, use_lz4_hc_); } else { - value_.Compress(i - nindex); + value_.Compress(i - nindex, use_lz4_hc_); } } index_.Write(fo); value_.Write(fo); // statistics - raw_bytes_index_ += index_.RawBytes(); + raw_bytes_index_ += index_.RawBytes() * sizeof(bst_uint) / sizeof(StorageIndex); raw_bytes_value_ += value_.RawBytes(); encoded_bytes_index_ += index_.EncodedBytes(); encoded_bytes_value_ += value_.EncodedBytes(); @@ -259,6 +263,7 @@ class SparsePageLZ4Format : public SparsePage::Format { } inline void LoadIndexValue(dmlc::SeekStream* fi) { + fi->Read(&min_index_, sizeof(min_index_)); index_.Read(fi); value_.Read(fi); @@ -280,6 +285,8 @@ class SparsePageLZ4Format : public SparsePage::Format { static const size_t kChunkSize = 64 << 10UL; // maximum chunk size. static const size_t kMaxChunk = 128; + // bool whether use hc + bool use_lz4_hc_; // number of threads int nthread_; // number of writing threads @@ -288,10 +295,12 @@ class SparsePageLZ4Format : public SparsePage::Format { size_t raw_bytes_, raw_bytes_index_, raw_bytes_value_; // encoded bytes size_t encoded_bytes_index_, encoded_bytes_value_; + /*! \brief minimum index value */ + uint32_t min_index_; /*! \brief external memory column offset */ std::vector disk_offset_; // internal index - CompressArray index_; + CompressArray index_; // value set. CompressArray value_; }; @@ -299,7 +308,20 @@ class SparsePageLZ4Format : public SparsePage::Format { XGBOOST_REGISTER_SPARSE_PAGE_FORMAT(lz4) .describe("Apply LZ4 binary data compression for ext memory.") .set_body([]() { - return new SparsePageLZ4Format(); + return new SparsePageLZ4Format(false); }); + +XGBOOST_REGISTER_SPARSE_PAGE_FORMAT(lz4hc) +.describe("Apply LZ4 binary data compression(high compression ratio) for ext memory.") +.set_body([]() { + return new SparsePageLZ4Format(true); + }); + +XGBOOST_REGISTER_SPARSE_PAGE_FORMAT(lz4i16hc) +.describe("Apply LZ4 binary data compression(16 bit index mode) for ext memory.") +.set_body([]() { + return new SparsePageLZ4Format(true); + }); + } // namespace data } // namespace xgboost diff --git a/src/data/data.cc b/src/data/data.cc index d3c530c32..0b5f64270 100644 --- a/src/data/data.cc +++ b/src/data/data.cc @@ -247,12 +247,21 @@ SparsePage::Format* SparsePage::Format::Create(const std::string& name) { return (e->body)(); } -std::string SparsePage::Format::DecideFormat(const std::string& cache_prefix) { +std::pair +SparsePage::Format::DecideFormat(const std::string& cache_prefix) { size_t pos = cache_prefix.rfind(".fmt-"); + if (pos != std::string::npos) { - return cache_prefix.substr(pos + 5, cache_prefix.length()); + std::string fmt = cache_prefix.substr(pos + 5, cache_prefix.length()); + size_t cpos = fmt.rfind('-'); + if (cpos != std::string::npos) { + return std::make_pair(fmt.substr(0, cpos), fmt.substr(cpos + 1, fmt.length())); + } else { + return std::make_pair(fmt, fmt); + } } else { - return "raw"; + std::string raw = "raw"; + return std::make_pair(raw, raw); } } diff --git a/src/data/sparse_batch_page.h b/src/data/sparse_batch_page.h index 680b255c2..41893e6b5 100644 --- a/src/data/sparse_batch_page.h +++ b/src/data/sparse_batch_page.h @@ -15,6 +15,7 @@ #include #include #include +#include namespace xgboost { namespace data { @@ -25,7 +26,8 @@ class SparsePage { public: /*! \brief Format of the sparse page. */ class Format; - + /*! \brief minimum index of all index, used as hint for compression. */ + bst_uint min_index; /*! \brief offset of the segments */ std::vector offset; /*! \brief the data of the segments */ @@ -45,6 +47,7 @@ class SparsePage { } /*! \brief clear the page */ inline void Clear(void) { + min_index = 0; offset.clear(); offset.push_back(0); data.clear(); @@ -163,9 +166,9 @@ class SparsePage::Format { static Format* Create(const std::string& name); /*! * \brief decide the format from cache prefix. - * \return format type of the cache prefix. + * \return pair of row format, column format type of the cache prefix. */ - static std::string DecideFormat(const std::string& cache_prefix); + static std::pair DecideFormat(const std::string& cache_prefix); }; /*! diff --git a/src/data/sparse_page_dmatrix.cc b/src/data/sparse_page_dmatrix.cc index e25ea6e25..4fe139910 100644 --- a/src/data/sparse_page_dmatrix.cc +++ b/src/data/sparse_page_dmatrix.cc @@ -136,6 +136,7 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, // make the sparse page. dmlc::ThreadedIter cmaker; SparsePage tmp; + size_t batch_ptr = 0, batch_top = 0; dmlc::DataIter* iter = this->RowIterator(); std::bernoulli_distribution coin_flip(pkeep); @@ -151,13 +152,13 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, } SparsePage* pcol = *dptr; pcol->Clear(); + pcol->min_index = ridx[0]; int nthread; #pragma omp parallel { nthread = omp_get_num_threads(); nthread = std::max(nthread, std::max(omp_get_num_procs() / 2 - 1, 1)); } - pcol->Clear(); common::ParallelGroupBuilder builder(&pcol->offset, &pcol->data); builder.InitBudget(info.num_col, nthread); @@ -199,21 +200,32 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, auto make_next_col = [&] (SparsePage** dptr) { tmp.Clear(); size_t btop = buffered_rowset_.size(); - while (iter->Next()) { - const RowBatch& batch = iter->Value(); - for (size_t i = 0; i < batch.size; ++i) { - bst_uint ridx = static_cast(batch.base_rowid + i); - if (pkeep == 1.0f || coin_flip(rnd)) { - buffered_rowset_.push_back(ridx); - tmp.Push(batch[i]); + + while (true) { + if (batch_ptr != batch_top) { + const RowBatch& batch = iter->Value(); + CHECK_EQ(batch_top, batch.size); + for (size_t i = batch_ptr; i < batch_top; ++i) { + bst_uint ridx = static_cast(batch.base_rowid + i); + if (pkeep == 1.0f || coin_flip(rnd)) { + buffered_rowset_.push_back(ridx); + tmp.Push(batch[i]); + } + + if (tmp.Size() >= max_row_perbatch || + tmp.MemCostBytes() >= kPageSize) { + make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr); + batch_ptr = i + 1; + return true; + } } + batch_ptr = batch_top; } - if (tmp.MemCostBytes() >= kPageSize || - tmp.Size() >= max_row_perbatch) { - make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr); - return true; - } + if (!iter->Next()) break; + batch_ptr = 0; + batch_top = iter->Value().size; } + if (tmp.Size() != 0) { make_col_batch(tmp, dmlc::BeginPtr(buffered_rowset_) + btop, dptr); return true; @@ -227,12 +239,15 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, std::string col_data_name = cache_prefix_ + ".col.page"; std::unique_ptr fo(dmlc::Stream::Create(col_data_name.c_str(), "w")); // find format. - std::string name_format = SparsePage::Format::DecideFormat(cache_prefix_); + std::string name_format = SparsePage::Format::DecideFormat(cache_prefix_).second; fo->Write(name_format); std::unique_ptr format(SparsePage::Format::Create(name_format)); double tstart = dmlc::GetTime(); size_t bytes_write = 0; + // print every 4 sec. + const double kStep = 4.0; + size_t tick_expected = kStep; SparsePage* pcol = nullptr; while (cmaker.Next(&pcol)) { @@ -243,9 +258,12 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, size_t spage = pcol->MemCostBytes(); bytes_write += spage; double tdiff = dmlc::GetTime() - tstart; - LOG(CONSOLE) << "Writing to " << col_data_name - << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " - << (bytes_write >> 20UL) << " MB writen"; + if (tdiff >= tick_expected) { + LOG(CONSOLE) << "Writing to " << col_data_name + << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " + << (bytes_write >> 20UL) << " MB writen"; + tick_expected += kStep; + } cmaker.Recycle(&pcol); } // save meta data diff --git a/src/data/sparse_page_source.cc b/src/data/sparse_page_source.cc index 8159d3807..ed4b7853b 100644 --- a/src/data/sparse_page_source.cc +++ b/src/data/sparse_page_source.cc @@ -78,7 +78,7 @@ void SparsePageSource::Create(dmlc::Parser* src, std::string name_info = cache_prefix; std::string name_row = cache_prefix + ".row.page"; std::unique_ptr fo(dmlc::Stream::Create(name_row.c_str(), "w")); - std::string name_format = SparsePage::Format::DecideFormat(cache_prefix); + std::string name_format = SparsePage::Format::DecideFormat(cache_prefix).first; fo->Write(name_format); std::unique_ptr format(SparsePage::Format::Create(name_format)); @@ -86,6 +86,9 @@ void SparsePageSource::Create(dmlc::Parser* src, SparsePage page; size_t bytes_write = 0; double tstart = dmlc::GetTime(); + // print every 4 sec. + const double kStep = 4.0; + size_t tick_expected = kStep; while (src->Next()) { const dmlc::RowBlock& batch = src->Value(); @@ -108,9 +111,12 @@ void SparsePageSource::Create(dmlc::Parser* src, format->Write(page, fo.get()); page.Clear(); double tdiff = dmlc::GetTime() - tstart; - LOG(CONSOLE) << "Writing to " << name_row << " in " - << ((bytes_write >> 20UL) / tdiff) << " MB/s, " - << (bytes_write >> 20UL) << " written"; + if (tdiff >= tick_expected) { + LOG(CONSOLE) << "Writing to " << name_row << " in " + << ((bytes_write >> 20UL) / tdiff) << " MB/s, " + << (bytes_write >> 20UL) << " written"; + tick_expected += kStep; + } } } @@ -133,7 +139,7 @@ void SparsePageSource::Create(DMatrix* src, std::string name_row = cache_prefix + ".row.page"; std::unique_ptr fo(dmlc::Stream::Create(name_row.c_str(), "w")); // find format. - std::string name_format = SparsePage::Format::DecideFormat(cache_prefix); + std::string name_format = SparsePage::Format::DecideFormat(cache_prefix).first; fo->Write(name_format); std::unique_ptr format(SparsePage::Format::Create(name_format));