/*! * Copyright (c) 2015 by Contributors * \file sparse_batch_writer.cc * \param Writer class sparse page. */ #include #include #include "./sparse_page_writer.h" #if DMLC_ENABLE_STD_THREAD namespace xgboost { namespace data { SparsePageWriter::SparsePageWriter( const std::vector& name_shards, const std::vector& format_shards, size_t extra_buffer_capacity) : num_free_buffer_(extra_buffer_capacity + name_shards.size()), clock_ptr_(0), workers_(name_shards.size()), qworkers_(name_shards.size()) { CHECK_EQ(name_shards.size(), format_shards.size()); // start writer threads for (size_t i = 0; i < name_shards.size(); ++i) { std::string name_shard = name_shards[i]; std::string format_shard = format_shards[i]; auto* wqueue = &qworkers_[i]; workers_[i].reset(new std::thread( [this, name_shard, format_shard, wqueue] () { std::unique_ptr fo( dmlc::Stream::Create(name_shard.c_str(), "w")); std::unique_ptr fmt( SparsePageFormat::Create(format_shard)); fo->Write(format_shard); std::shared_ptr page; while (wqueue->Pop(&page)) { if (page == nullptr) break; fmt->Write(*page, fo.get()); qrecycle_.Push(std::move(page)); } fo.reset(nullptr); LOG(INFO) << "SparsePage::Writer Finished writing to " << name_shard; })); } } SparsePageWriter::~SparsePageWriter() { for (auto& queue : qworkers_) { // use nullptr to signal termination. std::shared_ptr sig(nullptr); queue.Push(std::move(sig)); } for (auto& thread : workers_) { thread->join(); } } void SparsePageWriter::PushWrite(std::shared_ptr&& page) { qworkers_[clock_ptr_].Push(std::move(page)); clock_ptr_ = (clock_ptr_ + 1) % workers_.size(); } void SparsePageWriter::Alloc(std::shared_ptr* out_page) { CHECK(*out_page == nullptr); if (num_free_buffer_ != 0) { out_page->reset(new SparsePage()); --num_free_buffer_; } else { CHECK(qrecycle_.Pop(out_page)); } } } // namespace data } // namespace xgboost #endif // DMLC_ENABLE_STD_THREAD