Remove lz4 compression with external memory. (#7076)

This commit is contained in:
Jiaming Yuan 2021-07-06 14:46:43 +08:00 committed by GitHub
parent 116d711815
commit f937f514aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 9 additions and 360 deletions

View File

@ -21,12 +21,12 @@ jobs:
submodules: 'true' submodules: 'true'
- name: Install system packages - name: Install system packages
run: | run: |
brew install lz4 ninja libomp brew install ninja libomp
- name: Build gtest binary - name: Build gtest binary
run: | run: |
mkdir build mkdir build
cd build cd build
cmake .. -DGOOGLE_TEST=ON -DUSE_OPENMP=ON -DUSE_DMLC_GTEST=ON -DPLUGIN_LZ4=ON -DPLUGIN_DENSE_PARSER=ON -GNinja cmake .. -DGOOGLE_TEST=ON -DUSE_OPENMP=ON -DUSE_DMLC_GTEST=ON -DPLUGIN_DENSE_PARSER=ON -GNinja
ninja -v ninja -v
- name: Run gtest binary - name: Run gtest binary
run: | run: |

View File

@ -62,7 +62,6 @@ set(ENABLED_SANITIZERS "address" "leak" CACHE STRING
"Semicolon separated list of sanitizer names. E.g 'address;leak'. Supported sanitizers are "Semicolon separated list of sanitizer names. E.g 'address;leak'. Supported sanitizers are
address, leak, undefined and thread.") address, leak, undefined and thread.")
## Plugins ## Plugins
option(PLUGIN_LZ4 "Build lz4 plugin" OFF)
option(PLUGIN_DENSE_PARSER "Build dense parser plugin" OFF) option(PLUGIN_DENSE_PARSER "Build dense parser plugin" OFF)
option(PLUGIN_RMM "Build with RAPIDS Memory Manager (RMM)" OFF) option(PLUGIN_RMM "Build with RAPIDS Memory Manager (RMM)" OFF)
## TODO: 1. Add check if DPC++ compiler is used for building ## TODO: 1. Add check if DPC++ compiler is used for building
@ -92,6 +91,9 @@ endif (R_LIB AND GOOGLE_TEST)
if (USE_AVX) if (USE_AVX)
message(SEND_ERROR "The option 'USE_AVX' is deprecated as experimental AVX features have been removed from XGBoost.") message(SEND_ERROR "The option 'USE_AVX' is deprecated as experimental AVX features have been removed from XGBoost.")
endif (USE_AVX) endif (USE_AVX)
if (PLUGIN_LZ4)
message(SEND_ERROR "The option 'PLUGIN_LZ4' is removed from XGBoost.")
endif (PLUGIN_LZ4)
if (PLUGIN_RMM AND NOT (USE_CUDA)) if (PLUGIN_RMM AND NOT (USE_CUDA))
message(SEND_ERROR "`PLUGIN_RMM` must be enabled with `USE_CUDA` flag.") message(SEND_ERROR "`PLUGIN_RMM` must be enabled with `USE_CUDA` flag.")
endif (PLUGIN_RMM AND NOT (USE_CUDA)) endif (PLUGIN_RMM AND NOT (USE_CUDA))

2
Jenkinsfile vendored
View File

@ -147,7 +147,7 @@ def BuildCPU() {
# This step is not necessary, but here we include it, to ensure that DMLC_CORE_USE_CMAKE flag is correctly propagated # This step is not necessary, but here we include it, to ensure that DMLC_CORE_USE_CMAKE flag is correctly propagated
# We want to make sure that we use the configured header build/dmlc/build_config.h instead of include/dmlc/build_config_default.h. # We want to make sure that we use the configured header build/dmlc/build_config.h instead of include/dmlc/build_config_default.h.
# See discussion at https://github.com/dmlc/xgboost/issues/5510 # See discussion at https://github.com/dmlc/xgboost/issues/5510
${dockerRun} ${container_type} ${docker_binary} tests/ci_build/build_via_cmake.sh -DPLUGIN_LZ4=ON -DPLUGIN_DENSE_PARSER=ON ${dockerRun} ${container_type} ${docker_binary} tests/ci_build/build_via_cmake.sh -DPLUGIN_DENSE_PARSER=ON
${dockerRun} ${container_type} ${docker_binary} bash -c "cd build && ctest --extra-verbose" ${dockerRun} ${container_type} ${docker_binary} bash -c "cd build && ctest --extra-verbose"
""" """
// Sanitizer test // Sanitizer test

View File

@ -1,13 +1,3 @@
if (PLUGIN_LZ4)
target_sources(objxgboost PRIVATE ${xgboost_SOURCE_DIR}/plugin/lz4/sparse_page_lz4_format.cc)
find_path(LIBLZ4_INCLUDE_DIR lz4.h)
find_library(LIBLZ4_LIBRARY NAMES lz4)
message(STATUS "LIBLZ4_INCLUDE_DIR = ${LIBLZ4_INCLUDE_DIR}")
message(STATUS "LIBLZ4_LIBRARY = ${LIBLZ4_LIBRARY}")
target_include_directories(objxgboost PUBLIC ${LIBLZ4_INCLUDE_DIR})
target_link_libraries(objxgboost PUBLIC ${LIBLZ4_LIBRARY})
endif (PLUGIN_LZ4)
if (PLUGIN_DENSE_PARSER) if (PLUGIN_DENSE_PARSER)
target_sources(objxgboost PRIVATE ${xgboost_SOURCE_DIR}/plugin/dense_parser/dense_libsvm.cc) target_sources(objxgboost PRIVATE ${xgboost_SOURCE_DIR}/plugin/dense_parser/dense_libsvm.cc)
endif (PLUGIN_DENSE_PARSER) endif (PLUGIN_DENSE_PARSER)

View File

@ -1,341 +0,0 @@
/*!
* Copyright (c) 2015 by Contributors
* \file sparse_page_lz4_format.cc
* XGBoost Plugin to enable LZ4 compressed format on the external memory pages.
*/
#include <dmlc/registry.h>
#include <xgboost/data.h>
#include <xgboost/logging.h>
#include <xgboost/parameter.h>
#include <lz4.h>
#include <lz4hc.h>
#include "../../src/data/sparse_page_writer.h"
namespace xgboost {
namespace data {
DMLC_REGISTRY_FILE_TAG(sparse_page_lz4_format);
// array to help compression of decompression.
template<typename DType>
class CompressArray {
public:
// the data content.
std::vector<DType> data;
// Decompression helper
// number of chunks
inline int num_chunk() const {
CHECK_GT(raw_chunks_.size(), 1);
return static_cast<int>(raw_chunks_.size() - 1);
}
// raw bytes
inline size_t RawBytes() const {
return raw_chunks_.back() * sizeof(DType);
}
// encoded bytes
inline size_t EncodedBytes() const {
return encoded_chunks_.back() +
(encoded_chunks_.size() + raw_chunks_.size()) * sizeof(bst_uint);
}
// load the array from file.
inline void Read(dmlc::SeekStream* fi);
// run decode on chunk_id
inline void Decompress(int chunk_id);
// Compression helper
// initialize the compression chunks
inline void InitCompressChunks(const std::vector<bst_uint>& chunk_ptr);
// initialize the compression chunks
inline void InitCompressChunks(size_t chunk_size, size_t max_nchunk);
// run decode on chunk_id, level = -1 means default.
inline void Compress(int chunk_id, bool use_hc);
// save the output buffer into file.
inline void Write(dmlc::Stream* fo);
private:
// the chunk split of the data, by number of elements
std::vector<bst_uint> raw_chunks_;
// the encoded chunk, by number of bytes
std::vector<bst_uint> encoded_chunks_;
// output buffer of compression.
std::vector<std::string> out_buffer_;
// input buffer of data.
std::string in_buffer_;
};
template<typename DType>
inline void CompressArray<DType>::Read(dmlc::SeekStream* fi) {
CHECK(fi->Read(&raw_chunks_));
CHECK(fi->Read(&encoded_chunks_));
size_t buffer_size = encoded_chunks_.back();
in_buffer_.resize(buffer_size);
CHECK_EQ(fi->Read(dmlc::BeginPtr(in_buffer_), buffer_size), buffer_size);
data.resize(raw_chunks_.back());
}
template<typename DType>
inline void CompressArray<DType>::Decompress(int chunk_id) {
int chunk_size = static_cast<int>(
raw_chunks_[chunk_id + 1] - raw_chunks_[chunk_id]) * sizeof(DType);
int encoded_size = static_cast<int>(
encoded_chunks_[chunk_id + 1] - encoded_chunks_[chunk_id]);
// decompress data
int src_size = LZ4_decompress_fast(
dmlc::BeginPtr(in_buffer_) + encoded_chunks_[chunk_id],
reinterpret_cast<char*>(dmlc::BeginPtr(data) + raw_chunks_[chunk_id]),
chunk_size);
CHECK_EQ(encoded_size, src_size);
}
template<typename DType>
inline void CompressArray<DType>::InitCompressChunks(
const std::vector<bst_uint>& chunk_ptr) {
raw_chunks_ = chunk_ptr;
CHECK_GE(raw_chunks_.size(), 2);
out_buffer_.resize(raw_chunks_.size() - 1);
for (size_t i = 0; i < out_buffer_.size(); ++i) {
out_buffer_[i].resize(raw_chunks_[i + 1] - raw_chunks_[i]);
}
}
template<typename DType>
inline void CompressArray<DType>::InitCompressChunks(size_t chunk_size, size_t max_nchunk) {
raw_chunks_.clear();
raw_chunks_.push_back(0);
size_t min_chunk_size = data.size() / max_nchunk;
chunk_size = std::max(min_chunk_size, chunk_size);
size_t nstep = data.size() / chunk_size;
for (size_t i = 0; i < nstep; ++i) {
raw_chunks_.push_back(raw_chunks_.back() + chunk_size);
CHECK_LE(raw_chunks_.back(), data.size());
}
if (nstep == 0) raw_chunks_.push_back(0);
raw_chunks_.back() = data.size();
CHECK_GE(raw_chunks_.size(), 2);
out_buffer_.resize(raw_chunks_.size() - 1);
for (size_t i = 0; i < out_buffer_.size(); ++i) {
out_buffer_[i].resize(raw_chunks_[i + 1] - raw_chunks_[i]);
}
}
template<typename DType>
inline void CompressArray<DType>::Compress(int chunk_id, bool use_hc) {
CHECK_LT(static_cast<size_t>(chunk_id + 1), raw_chunks_.size());
std::string& buf = out_buffer_[chunk_id];
size_t raw_chunk_size = (raw_chunks_[chunk_id + 1] - raw_chunks_[chunk_id]) * sizeof(DType);
int bound = LZ4_compressBound(raw_chunk_size);
CHECK_NE(bound, 0);
buf.resize(bound);
int encoded_size;
if (use_hc) {
encoded_size = LZ4_compress_HC(
reinterpret_cast<char*>(dmlc::BeginPtr(data) + raw_chunks_[chunk_id]),
dmlc::BeginPtr(buf), raw_chunk_size, buf.length(), 9);
} else {
encoded_size = LZ4_compress_default(
reinterpret_cast<char*>(dmlc::BeginPtr(data) + raw_chunks_[chunk_id]),
dmlc::BeginPtr(buf), raw_chunk_size, buf.length());
}
CHECK_NE(encoded_size, 0);
CHECK_LE(static_cast<size_t>(encoded_size), buf.length());
buf.resize(encoded_size);
}
template<typename DType>
inline void CompressArray<DType>::Write(dmlc::Stream* fo) {
encoded_chunks_.clear();
encoded_chunks_.push_back(0);
for (size_t i = 0; i < out_buffer_.size(); ++i) {
encoded_chunks_.push_back(encoded_chunks_.back() + out_buffer_[i].length());
}
fo->Write(raw_chunks_);
fo->Write(encoded_chunks_);
for (const std::string& buf : out_buffer_) {
fo->Write(dmlc::BeginPtr(buf), buf.length());
}
}
template<typename StorageIndex>
class SparsePageLZ4Format : public SparsePageFormat<SparsePage> {
public:
explicit SparsePageLZ4Format(bool use_lz4_hc)
: use_lz4_hc_(use_lz4_hc) {
raw_bytes_ = raw_bytes_value_ = raw_bytes_index_ = 0;
encoded_bytes_value_ = encoded_bytes_index_ = 0;
nthread_ = dmlc::GetEnv("XGBOOST_LZ4_DECODE_NTHREAD", 4);
nthread_write_ = dmlc::GetEnv("XGBOOST_LZ4_COMPRESS_NTHREAD", 12);
}
virtual ~SparsePageLZ4Format() {
size_t encoded_bytes = raw_bytes_ + encoded_bytes_value_ + encoded_bytes_index_;
raw_bytes_ += raw_bytes_value_ + raw_bytes_index_;
if (raw_bytes_ != 0) {
LOG(CONSOLE) << "raw_bytes=" << raw_bytes_
<< ", encoded_bytes=" << encoded_bytes
<< ", ratio=" << double(encoded_bytes) / raw_bytes_
<< ", ratio-index=" << double(encoded_bytes_index_) /raw_bytes_index_
<< ", ratio-value=" << double(encoded_bytes_value_) /raw_bytes_value_;
}
}
bool Read(SparsePage* page, dmlc::SeekStream* fi) override {
auto& offset_vec = page->offset.HostVector();
auto& data_vec = page->data.HostVector();
if (!fi->Read(&(offset_vec))) return false;
CHECK_NE(offset_vec.size(), 0) << "Invalid SparsePage file";
this->LoadIndexValue(fi);
data_vec.resize(offset_vec.back());
CHECK_EQ(index_.data.size(), value_.data.size());
CHECK_EQ(index_.data.size(), data_vec.size());
for (size_t i = 0; i < data_vec.size(); ++i) {
data_vec[i] = Entry(index_.data[i] + min_index_, value_.data[i]);
}
return true;
}
bool Read(SparsePage* page,
dmlc::SeekStream* fi,
const std::vector<bst_uint>& sorted_index_set) override {
if (!fi->Read(&disk_offset_)) return false;
this->LoadIndexValue(fi);
auto& offset_vec = page->offset.HostVector();
auto& data_vec = page->data.HostVector();
offset_vec.clear();
offset_vec.push_back(0);
for (bst_uint cid : sorted_index_set) {
offset_vec.push_back(
offset_vec.back() + disk_offset_[cid + 1] - disk_offset_[cid]);
}
data_vec.resize(offset_vec.back());
CHECK_EQ(index_.data.size(), value_.data.size());
CHECK_EQ(index_.data.size(), disk_offset_.back());
for (size_t i = 0; i < sorted_index_set.size(); ++i) {
bst_uint cid = sorted_index_set[i];
size_t dst_begin = offset_vec[i];
size_t src_begin = disk_offset_[cid];
size_t num = disk_offset_[cid + 1] - disk_offset_[cid];
for (size_t j = 0; j < num; ++j) {
data_vec[dst_begin + j] = Entry(
index_.data[src_begin + j] + min_index_, value_.data[src_begin + j]);
}
}
return true;
}
void Write(const SparsePage& page, dmlc::Stream* fo) override {
const auto& offset_vec = page.offset.HostVector();
const auto& data_vec = page.data.HostVector();
CHECK(offset_vec.size() != 0 && offset_vec[0] == 0);
CHECK_EQ(offset_vec.back(), data_vec.size());
fo->Write(offset_vec);
min_index_ = page.base_rowid;
fo->Write(&min_index_, sizeof(min_index_));
index_.data.resize(data_vec.size());
value_.data.resize(data_vec.size());
for (size_t i = 0; i < data_vec.size(); ++i) {
bst_uint idx = data_vec[i].index - min_index_;
CHECK_LE(idx, static_cast<bst_uint>(std::numeric_limits<StorageIndex>::max()))
<< "The storage index is chosen to limited to smaller equal than "
<< std::numeric_limits<StorageIndex>::max()
<< "min_index=" << min_index_;
index_.data[i] = static_cast<StorageIndex>(idx);
value_.data[i] = data_vec[i].fvalue;
}
index_.InitCompressChunks(kChunkSize, kMaxChunk);
value_.InitCompressChunks(kChunkSize, kMaxChunk);
int nindex = index_.num_chunk();
int nvalue = value_.num_chunk();
int ntotal = nindex + nvalue;
dmlc::OMPException exc;
#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread_write_)
for (int i = 0; i < ntotal; ++i) {
exc.Run([&]() {
if (i < nindex) {
index_.Compress(i, use_lz4_hc_);
} else {
value_.Compress(i - nindex, use_lz4_hc_);
}
});
}
exc.Rethrow();
index_.Write(fo);
value_.Write(fo);
// statistics
raw_bytes_index_ += index_.RawBytes() * sizeof(bst_uint) / sizeof(StorageIndex);
raw_bytes_value_ += value_.RawBytes();
encoded_bytes_index_ += index_.EncodedBytes();
encoded_bytes_value_ += value_.EncodedBytes();
raw_bytes_ += offset_vec.size() * sizeof(size_t);
}
inline void LoadIndexValue(dmlc::SeekStream* fi) {
fi->Read(&min_index_, sizeof(min_index_));
index_.Read(fi);
value_.Read(fi);
int nindex = index_.num_chunk();
int nvalue = value_.num_chunk();
int ntotal = nindex + nvalue;
dmlc::OMPException exc;
#pragma omp parallel for schedule(dynamic, 1) num_threads(nthread_)
for (int i = 0; i < ntotal; ++i) {
exc.Run([&]() {
if (i < nindex) {
index_.Decompress(i);
} else {
value_.Decompress(i - nindex);
}
});
}
exc.Rethrow();
}
private:
// default chunk size.
static const size_t kChunkSize = 64 << 10UL;
// maximum chunk size.
static const size_t kMaxChunk = 128;
// bool whether use hc
bool use_lz4_hc_;
// number of threads
int nthread_;
// number of writing threads
int nthread_write_;
// raw bytes
size_t raw_bytes_, raw_bytes_index_, raw_bytes_value_;
// encoded bytes
size_t encoded_bytes_index_, encoded_bytes_value_;
/*! \brief minimum index value */
uint32_t min_index_;
/*! \brief external memory column offset */
std::vector<size_t> disk_offset_;
// internal index
CompressArray<StorageIndex> index_;
// value set.
CompressArray<bst_float> value_;
};
XGBOOST_REGISTER_SPARSE_PAGE_FORMAT(lz4)
.describe("Apply LZ4 binary data compression for ext memory.")
.set_body([]() {
return new SparsePageLZ4Format<bst_uint>(false);
});
XGBOOST_REGISTER_SPARSE_PAGE_FORMAT(lz4hc)
.describe("Apply LZ4 binary data compression(high compression ratio) for ext memory.")
.set_body([]() {
return new SparsePageLZ4Format<bst_uint>(true);
});
XGBOOST_REGISTER_SPARSE_PAGE_FORMAT(lz4i16hc)
.describe("Apply LZ4 binary data compression(16 bit index mode) for ext memory.")
.set_body([]() {
return new SparsePageLZ4Format<uint16_t>(true);
});
} // namespace data
} // namespace xgboost

View File

@ -26,7 +26,6 @@ USER_OPTIONS = {
'use-hdfs': (None, 'Build with HDFS support', 0), 'use-hdfs': (None, 'Build with HDFS support', 0),
'use-azure': (None, 'Build with AZURE support.', 0), 'use-azure': (None, 'Build with AZURE support.', 0),
'use-s3': (None, 'Build with S3 support', 0), 'use-s3': (None, 'Build with S3 support', 0),
'plugin-lz4': (None, 'Build lz4 plugin.', 0),
'plugin-dense-parser': (None, 'Build dense parser plugin.', 0), 'plugin-dense-parser': (None, 'Build dense parser plugin.', 0),
# Python specific # Python specific
'use-system-libxgboost': (None, 'Use libxgboost.so in system path.', 0) 'use-system-libxgboost': (None, 'Use libxgboost.so in system path.', 0)
@ -268,7 +267,6 @@ class Install(install.install): # pylint: disable=too-many-instance-attributes
self.use_azure = 0 self.use_azure = 0
self.use_s3 = 0 self.use_s3 = 0
self.plugin_lz4 = 0
self.plugin_dense_parser = 0 self.plugin_dense_parser = 0
self.use_system_libxgboost = 0 self.use_system_libxgboost = 0

View File

@ -7,7 +7,7 @@ SHELL ["/bin/bash", "-c"] # Use Bash as shell
# Install all basic requirements # Install all basic requirements
RUN \ RUN \
apt-get update && \ apt-get update && \
apt-get install -y tar unzip wget git build-essential doxygen graphviz llvm libasan2 libidn11 liblz4-dev ninja-build && \ apt-get install -y tar unzip wget git build-essential doxygen graphviz llvm libasan2 libidn11 ninja-build && \
# CMake # CMake
wget -nv -nc https://cmake.org/files/v3.14/cmake-3.14.0-Linux-x86_64.sh --no-check-certificate && \ wget -nv -nc https://cmake.org/files/v3.14/cmake-3.14.0-Linux-x86_64.sh --no-check-certificate && \
bash cmake-3.14.0-Linux-x86_64.sh --skip-license --prefix=/usr && \ bash cmake-3.14.0-Linux-x86_64.sh --skip-license --prefix=/usr && \

View File

@ -9,7 +9,7 @@ SHELL ["/bin/bash", "-c"] # Use Bash as shell
# Install all basic requirements # Install all basic requirements
RUN \ RUN \
apt-get update && \ apt-get update && \
apt-get install -y tar unzip wget bzip2 libgomp1 git build-essential doxygen graphviz llvm libasan2 libidn11 liblz4-dev ninja-build && \ apt-get install -y tar unzip wget bzip2 libgomp1 git build-essential doxygen graphviz llvm libasan2 libidn11 ninja-build && \
# CMake # CMake
wget -nv -nc https://cmake.org/files/v3.14/cmake-3.14.0-Linux-x86_64.sh --no-check-certificate && \ wget -nv -nc https://cmake.org/files/v3.14/cmake-3.14.0-Linux-x86_64.sh --no-check-certificate && \
bash cmake-3.14.0-Linux-x86_64.sh --skip-license --prefix=/usr && \ bash cmake-3.14.0-Linux-x86_64.sh --skip-license --prefix=/usr && \

View File

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
# https://travis-ci.community/t/macos-build-fails-because-of-homebrew-bundle-unknown-command/7296/27 # https://travis-ci.community/t/macos-build-fails-because-of-homebrew-bundle-unknown-command/7296/27
brew install cmake libomp lz4 brew install cmake libomp
if [ ${TASK} == "python_test" ] || [ ${TASK} == "python_sdist_test" ]; then if [ ${TASK} == "python_test" ] || [ ${TASK} == "python_sdist_test" ]; then