From d816208797501817df2f9cf12a1da59bba9f2493 Mon Sep 17 00:00:00 2001 From: tqchen Date: Sat, 21 May 2016 18:46:36 -0700 Subject: [PATCH] [DATA] fix async data writing --- src/data/sparse_page_dmatrix.cc | 65 +++++++------- src/data/sparse_page_source.cc | 148 ++++++++++++++++---------------- src/data/sparse_page_writer.cc | 5 +- 3 files changed, 114 insertions(+), 104 deletions(-) diff --git a/src/data/sparse_page_dmatrix.cc b/src/data/sparse_page_dmatrix.cc index d25e06492..74a85e9ca 100644 --- a/src/data/sparse_page_dmatrix.cc +++ b/src/data/sparse_page_dmatrix.cc @@ -256,41 +256,44 @@ void SparsePageDMatrix::InitColAccess(const std::vector& enabled, name_shards.push_back(prefix + ".col.page"); format_shards.push_back(SparsePage::Format::DecideFormat(prefix).second); } - SparsePage::Writer writer(name_shards, format_shards, 6); - std::unique_ptr page; - writer.Alloc(&page); page->Clear(); - double tstart = dmlc::GetTime(); - size_t bytes_write = 0; - // print every 4 sec. - const double kStep = 4.0; - size_t tick_expected = kStep; + { + SparsePage::Writer writer(name_shards, format_shards, 6); + std::unique_ptr page; + writer.Alloc(&page); page->Clear(); - while (make_next_col(page.get())) { - for (size_t i = 0; i < page->Size(); ++i) { - col_size_[i] += page->offset[i + 1] - page->offset[i]; - } - - bytes_write += page->MemCostBytes(); - writer.PushWrite(std::move(page)); - writer.Alloc(&page); - page->Clear(); - - double tdiff = dmlc::GetTime() - tstart; - if (tdiff >= tick_expected) { - LOG(CONSOLE) << "Writing col.page file to " << cache_info_ - << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " - << (bytes_write >> 20UL) << " MB writen"; - tick_expected += kStep; + double tstart = dmlc::GetTime(); + size_t bytes_write = 0; + // print every 4 sec. + const double kStep = 4.0; + size_t tick_expected = kStep; + + while (make_next_col(page.get())) { + for (size_t i = 0; i < page->Size(); ++i) { + col_size_[i] += page->offset[i + 1] - page->offset[i]; + } + + bytes_write += page->MemCostBytes(); + writer.PushWrite(std::move(page)); + writer.Alloc(&page); + page->Clear(); + + double tdiff = dmlc::GetTime() - tstart; + if (tdiff >= tick_expected) { + LOG(CONSOLE) << "Writing col.page file to " << cache_info_ + << " in " << ((bytes_write >> 20UL) / tdiff) << " MB/s, " + << (bytes_write >> 20UL) << " MB writen"; + tick_expected += kStep; + } } + // save meta data + std::string col_meta_name = cache_shards[0] + ".col.meta"; + std::unique_ptr fo( + dmlc::Stream::Create(col_meta_name.c_str(), "w")); + fo->Write(buffered_rowset_); + fo->Write(col_size_); + fo.reset(nullptr); } - // save meta data - std::string col_meta_name = cache_shards[0] + ".col.meta"; - std::unique_ptr fo( - dmlc::Stream::Create(col_meta_name.c_str(), "w")); - fo->Write(buffered_rowset_); - fo->Write(col_size_); - fo.reset(nullptr); // initialize column data CHECK(TryInitColData()); } diff --git a/src/data/sparse_page_source.cc b/src/data/sparse_page_source.cc index 3f499739f..a37e1c144 100644 --- a/src/data/sparse_page_source.cc +++ b/src/data/sparse_page_source.cc @@ -110,58 +110,60 @@ void SparsePageSource::Create(dmlc::Parser* src, name_shards.push_back(prefix + ".row.page"); format_shards.push_back(SparsePage::Format::DecideFormat(prefix).first); } - SparsePage::Writer writer(name_shards, format_shards, 6); - std::unique_ptr page; - writer.Alloc(&page); page->Clear(); + { + SparsePage::Writer writer(name_shards, format_shards, 6); + std::unique_ptr page; + writer.Alloc(&page); page->Clear(); - MetaInfo info; - size_t bytes_write = 0; - double tstart = dmlc::GetTime(); - // print every 4 sec. - const double kStep = 4.0; - size_t tick_expected = kStep; + MetaInfo info; + size_t bytes_write = 0; + double tstart = dmlc::GetTime(); + // print every 4 sec. + const double kStep = 4.0; + size_t tick_expected = kStep; - while (src->Next()) { - const dmlc::RowBlock& batch = src->Value(); - if (batch.label != nullptr) { - info.labels.insert(info.labels.end(), batch.label, batch.label + batch.size); - } - if (batch.weight != nullptr) { - info.weights.insert(info.weights.end(), batch.weight, batch.weight + batch.size); - } - info.num_row += batch.size; - info.num_nonzero += batch.offset[batch.size] - batch.offset[0]; - for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) { - uint32_t index = batch.index[i]; - info.num_col = std::max(info.num_col, - static_cast(index + 1)); - } - page->Push(batch); - if (page->MemCostBytes() >= kPageSize) { - bytes_write += page->MemCostBytes(); - writer.PushWrite(std::move(page)); - writer.Alloc(&page); - page->Clear(); + while (src->Next()) { + const dmlc::RowBlock& batch = src->Value(); + if (batch.label != nullptr) { + info.labels.insert(info.labels.end(), batch.label, batch.label + batch.size); + } + if (batch.weight != nullptr) { + info.weights.insert(info.weights.end(), batch.weight, batch.weight + batch.size); + } + info.num_row += batch.size; + info.num_nonzero += batch.offset[batch.size] - batch.offset[0]; + for (size_t i = batch.offset[0]; i < batch.offset[batch.size]; ++i) { + uint32_t index = batch.index[i]; + info.num_col = std::max(info.num_col, + static_cast(index + 1)); + } + page->Push(batch); + if (page->MemCostBytes() >= kPageSize) { + bytes_write += page->MemCostBytes(); + writer.PushWrite(std::move(page)); + writer.Alloc(&page); + page->Clear(); - double tdiff = dmlc::GetTime() - tstart; - if (tdiff >= tick_expected) { - LOG(CONSOLE) << "Writing row.page to " << cache_info << " in " - << ((bytes_write >> 20UL) / tdiff) << " MB/s, " - << (bytes_write >> 20UL) << " written"; - tick_expected += kStep; + double tdiff = dmlc::GetTime() - tstart; + if (tdiff >= tick_expected) { + LOG(CONSOLE) << "Writing row.page to " << cache_info << " in " + << ((bytes_write >> 20UL) / tdiff) << " MB/s, " + << (bytes_write >> 20UL) << " written"; + tick_expected += kStep; + } } } - } - if (page->data.size() != 0) { - writer.PushWrite(std::move(page)); - } + if (page->data.size() != 0) { + writer.PushWrite(std::move(page)); + } - std::unique_ptr fo( - dmlc::Stream::Create(name_info.c_str(), "w")); - int tmagic = kMagic; - fo->Write(&tmagic, sizeof(tmagic)); - info.SaveBinary(fo.get()); + std::unique_ptr fo( + dmlc::Stream::Create(name_info.c_str(), "w")); + int tmagic = kMagic; + fo->Write(&tmagic, sizeof(tmagic)); + info.SaveBinary(fo.get()); + } LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info; } @@ -176,38 +178,40 @@ void SparsePageSource::Create(DMatrix* src, name_shards.push_back(prefix + ".row.page"); format_shards.push_back(SparsePage::Format::DecideFormat(prefix).first); } - SparsePage::Writer writer(name_shards, format_shards, 6); - std::unique_ptr page; - writer.Alloc(&page); page->Clear(); + { + SparsePage::Writer writer(name_shards, format_shards, 6); + std::unique_ptr page; + writer.Alloc(&page); page->Clear(); - MetaInfo info; - size_t bytes_write = 0; - double tstart = dmlc::GetTime(); - dmlc::DataIter* iter = src->RowIterator(); + MetaInfo info; + size_t bytes_write = 0; + double tstart = dmlc::GetTime(); + dmlc::DataIter* iter = src->RowIterator(); - while (iter->Next()) { - page->Push(iter->Value()); - if (page->MemCostBytes() >= kPageSize) { - bytes_write += page->MemCostBytes(); - writer.PushWrite(std::move(page)); - writer.Alloc(&page); - page->Clear(); - double tdiff = dmlc::GetTime() - tstart; - LOG(CONSOLE) << "Writing to " << cache_info << " in " - << ((bytes_write >> 20UL) / tdiff) << " MB/s, " - << (bytes_write >> 20UL) << " written"; + while (iter->Next()) { + page->Push(iter->Value()); + if (page->MemCostBytes() >= kPageSize) { + bytes_write += page->MemCostBytes(); + writer.PushWrite(std::move(page)); + writer.Alloc(&page); + page->Clear(); + double tdiff = dmlc::GetTime() - tstart; + LOG(CONSOLE) << "Writing to " << cache_info << " in " + << ((bytes_write >> 20UL) / tdiff) << " MB/s, " + << (bytes_write >> 20UL) << " written"; + } } - } - if (page->data.size() != 0) { - writer.PushWrite(std::move(page)); - } + if (page->data.size() != 0) { + writer.PushWrite(std::move(page)); + } - std::unique_ptr fo( - dmlc::Stream::Create(name_info.c_str(), "w")); - int tmagic = kMagic; - fo->Write(&tmagic, sizeof(tmagic)); - info.SaveBinary(fo.get()); + std::unique_ptr fo( + dmlc::Stream::Create(name_info.c_str(), "w")); + int tmagic = kMagic; + fo->Write(&tmagic, sizeof(tmagic)); + info.SaveBinary(fo.get()); + } LOG(CONSOLE) << "SparsePageSource: Finished writing to " << name_info; } diff --git a/src/data/sparse_page_writer.cc b/src/data/sparse_page_writer.cc index 33f9172d6..e16d1aee6 100644 --- a/src/data/sparse_page_writer.cc +++ b/src/data/sparse_page_writer.cc @@ -34,6 +34,7 @@ SparsePage::Writer::Writer( fo->Write(format_shard); std::unique_ptr page; while (wqueue->Pop(&page)) { + if (page.get() == nullptr) break; fmt->Write(*page, fo.get()); qrecycle_.Push(std::move(page)); } @@ -45,7 +46,9 @@ SparsePage::Writer::Writer( SparsePage::Writer::~Writer() { for (auto& queue : qworkers_) { - queue.SignalForKill(); + // use nullptr to signal termination. + std::unique_ptr sig(nullptr); + queue.Push(std::move(sig)); } for (auto& thread : workers_) { thread->join();