[UPDATE] Update rabit and threadlocal (#2114)

* [UPDATE] Update rabit and threadlocal

* minor fix to make build system happy

* upgrade requirement to g++4.8

* upgrade dmlc-core

* update travis
This commit is contained in:
Tianqi Chen
2017-03-16 18:48:37 -07:00
committed by GitHub
parent b0c972aa4d
commit d581a3d0e7
28 changed files with 59 additions and 48 deletions

View File

@@ -207,14 +207,14 @@ class SparsePage::Writer {
* writing is done by another thread inside writer.
* \param page The page to be written
*/
void PushWrite(std::unique_ptr<SparsePage>&& page);
void PushWrite(std::shared_ptr<SparsePage>&& page);
/*!
* \brief Allocate a page to store results.
* This function can block when the writer is too slow and buffer pages
* have not yet been recycled.
* \param out_page Used to store the allocated pages.
*/
void Alloc(std::unique_ptr<SparsePage>* out_page);
void Alloc(std::shared_ptr<SparsePage>* out_page);
private:
/*! \brief number of allocated pages */
@@ -224,9 +224,9 @@ class SparsePage::Writer {
/*! \brief writer threads */
std::vector<std::unique_ptr<std::thread> > workers_;
/*! \brief recycler queue */
dmlc::ConcurrentBlockingQueue<std::unique_ptr<SparsePage> > qrecycle_;
dmlc::ConcurrentBlockingQueue<std::shared_ptr<SparsePage> > qrecycle_;
/*! \brief worker threads */
std::vector<dmlc::ConcurrentBlockingQueue<std::unique_ptr<SparsePage> > > qworkers_;
std::vector<dmlc::ConcurrentBlockingQueue<std::shared_ptr<SparsePage> > > qworkers_;
};
#endif // DMLC_ENABLE_STD_THREAD

View File

@@ -254,7 +254,7 @@ void SparsePageDMatrix::InitColAccess(const std::vector<bool>& enabled,
{
SparsePage::Writer writer(name_shards, format_shards, 6);
std::unique_ptr<SparsePage> page;
std::shared_ptr<SparsePage> page;
writer.Alloc(&page); page->Clear();
double tstart = dmlc::GetTime();

View File

@@ -16,7 +16,7 @@ class SparsePageRawFormat : public SparsePage::Format {
public:
bool Read(SparsePage* page, dmlc::SeekStream* fi) override {
if (!fi->Read(&(page->offset))) return false;
CHECK_NE(page->offset.size(), 0) << "Invalid SparsePage file";
CHECK_NE(page->offset.size(), 0U) << "Invalid SparsePage file";
page->data.resize(page->offset.back());
if (page->data.size() != 0) {
CHECK_EQ(fi->Read(dmlc::BeginPtr(page->data),

View File

@@ -18,7 +18,7 @@ SparsePageSource::SparsePageSource(const std::string& cache_info)
: base_rowid_(0), page_(nullptr), clock_ptr_(0) {
// read in the info files
std::vector<std::string> cache_shards = common::Split(cache_info, ':');
CHECK_NE(cache_shards.size(), 0);
CHECK_NE(cache_shards.size(), 0U);
{
std::string name_info = cache_shards[0];
std::unique_ptr<dmlc::Stream> finfo(dmlc::Stream::Create(name_info.c_str(), "r"));
@@ -85,7 +85,7 @@ const RowBatch& SparsePageSource::Value() const {
bool SparsePageSource::CacheExist(const std::string& cache_info) {
std::vector<std::string> cache_shards = common::Split(cache_info, ':');
CHECK_NE(cache_shards.size(), 0);
CHECK_NE(cache_shards.size(), 0U);
{
std::string name_info = cache_shards[0];
std::unique_ptr<dmlc::Stream> finfo(dmlc::Stream::Create(name_info.c_str(), "r", true));
@@ -102,7 +102,7 @@ bool SparsePageSource::CacheExist(const std::string& cache_info) {
void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
const std::string& cache_info) {
std::vector<std::string> cache_shards = common::Split(cache_info, ':');
CHECK_NE(cache_shards.size(), 0);
CHECK_NE(cache_shards.size(), 0U);
// read in the info files.
std::string name_info = cache_shards[0];
std::vector<std::string> name_shards, format_shards;
@@ -112,7 +112,7 @@ void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
}
{
SparsePage::Writer writer(name_shards, format_shards, 6);
std::unique_ptr<SparsePage> page;
std::shared_ptr<SparsePage> page;
writer.Alloc(&page); page->Clear();
MetaInfo info;
@@ -170,7 +170,7 @@ void SparsePageSource::Create(dmlc::Parser<uint32_t>* src,
void SparsePageSource::Create(DMatrix* src,
const std::string& cache_info) {
std::vector<std::string> cache_shards = common::Split(cache_info, ':');
CHECK_NE(cache_shards.size(), 0);
CHECK_NE(cache_shards.size(), 0U);
// read in the info files.
std::string name_info = cache_shards[0];
std::vector<std::string> name_shards, format_shards;
@@ -180,7 +180,7 @@ void SparsePageSource::Create(DMatrix* src,
}
{
SparsePage::Writer writer(name_shards, format_shards, 6);
std::unique_ptr<SparsePage> page;
std::shared_ptr<SparsePage> page;
writer.Alloc(&page); page->Clear();
MetaInfo info = src->info();

View File

@@ -32,7 +32,7 @@ SparsePage::Writer::Writer(
std::unique_ptr<SparsePage::Format> fmt(
SparsePage::Format::Create(format_shard));
fo->Write(format_shard);
std::unique_ptr<SparsePage> page;
std::shared_ptr<SparsePage> page;
while (wqueue->Pop(&page)) {
if (page.get() == nullptr) break;
fmt->Write(*page, fo.get());
@@ -47,7 +47,7 @@ SparsePage::Writer::Writer(
SparsePage::Writer::~Writer() {
for (auto& queue : qworkers_) {
// use nullptr to signal termination.
std::unique_ptr<SparsePage> sig(nullptr);
std::shared_ptr<SparsePage> sig(nullptr);
queue.Push(std::move(sig));
}
for (auto& thread : workers_) {
@@ -55,12 +55,12 @@ SparsePage::Writer::~Writer() {
}
}
void SparsePage::Writer::PushWrite(std::unique_ptr<SparsePage>&& page) {
void SparsePage::Writer::PushWrite(std::shared_ptr<SparsePage>&& page) {
qworkers_[clock_ptr_].Push(std::move(page));
clock_ptr_ = (clock_ptr_ + 1) % workers_.size();
}
void SparsePage::Writer::Alloc(std::unique_ptr<SparsePage>* out_page) {
void SparsePage::Writer::Alloc(std::shared_ptr<SparsePage>* out_page) {
CHECK(out_page->get() == nullptr);
if (num_free_buffer_ != 0) {
out_page->reset(new SparsePage());