/*! * Copyright (c) 2015 by Contributors * \file sparse_batch_writer.cc * \param Writer class sparse page. */ #include #include #include "./sparse_batch_page.h" #if DMLC_ENABLE_STD_THREAD namespace xgboost { namespace data { SparsePage::Writer::Writer( const std::vector& name_shards, const std::vector& format_shards, size_t extra_buffer_capacity) : num_free_buffer_(extra_buffer_capacity + name_shards.size()), clock_ptr_(0), workers_(name_shards.size()), qworkers_(name_shards.size()) { CHECK_EQ(name_shards.size(), format_shards.size()); // start writer threads for (size_t i = 0; i < name_shards.size(); ++i) { std::string name_shard = name_shards[i]; std::string format_shard = format_shards[i]; auto* wqueue = &qworkers_[i]; workers_[i].reset(new std::thread( [this, name_shard, format_shard, wqueue] () { std::unique_ptr fo( dmlc::Stream::Create(name_shard.c_str(), "w")); std::unique_ptr fmt( SparsePage::Format::Create(format_shard)); fo->Write(format_shard); std::unique_ptr page; while (wqueue->Pop(&page)) { if (page.get() == nullptr) break; fmt->Write(*page, fo.get()); qrecycle_.Push(std::move(page)); } fo.reset(nullptr); LOG(CONSOLE) << "SparsePage::Writer Finished writing to " << name_shard; })); } } SparsePage::Writer::~Writer() { for (auto& queue : qworkers_) { // use nullptr to signal termination. std::unique_ptr sig(nullptr); queue.Push(std::move(sig)); } for (auto& thread : workers_) { thread->join(); } } void SparsePage::Writer::PushWrite(std::unique_ptr&& page) { qworkers_[clock_ptr_].Push(std::move(page)); clock_ptr_ = (clock_ptr_ + 1) % workers_.size(); } void SparsePage::Writer::Alloc(std::unique_ptr* out_page) { CHECK(out_page->get() == nullptr); if (num_free_buffer_ != 0) { out_page->reset(new SparsePage()); --num_free_buffer_; } else { CHECK(qrecycle_.Pop(out_page)); } } } // namespace data } // namespace xgboost #endif // DMLC_ENABLE_STD_THREAD