diff --git a/src/io/io.cpp b/src/io/io.cpp index 1500ce658..1535a9e42 100644 --- a/src/io/io.cpp +++ b/src/io/io.cpp @@ -6,10 +6,6 @@ #include "../utils/io.h" #include "../utils/utils.h" #include "simple_dmatrix-inl.hpp" -#ifndef XGBOOST_STRICT_CXX98_ -#include "page_dmatrix-inl.hpp" -#include "page_fmatrix-inl.hpp" -#endif // implements data loads using dmatrix simple for now namespace xgboost { @@ -28,43 +24,12 @@ DataMatrix* LoadDataMatrix(const char *fname, bool silent, utils::FileStream fs(utils::FopenCheck(fname, "rb")); utils::Check(fs.Read(&magic, sizeof(magic)) != 0, "invalid input file format"); fs.Seek(0); - if (magic == DMatrixSimple::kMagic) { DMatrixSimple *dmat = new DMatrixSimple(); dmat->LoadBinary(fs, silent, fname); fs.Close(); return dmat; } -#ifndef XGBOOST_STRICT_CXX98_ - std::string tmp_fname; - const char *fname_ext = NULL; - if (strchr(fname, ';') != NULL) { - tmp_fname = fname; - char *ptr = strchr(&tmp_fname[0], ';'); - ptr[0] = '\0'; fname = &tmp_fname[0]; - fname_ext = ptr + 1; - } - if (magic == DMatrixPage::kMagic) { - if (fname_ext == NULL) { - DMatrixPage *dmat = new DMatrixPage(); - dmat->Load(fs, silent, fname); - return dmat; - } else { - DMatrixColPage *dmat = new DMatrixColPage(fname_ext); - dmat->Load(fs, silent, fname, true); - return dmat; - } - } - if (magic == DMatrixColPage::kMagic) { - std::string sfname = fname; - if (fname_ext == NULL) { - sfname += ".col"; fname_ext = sfname.c_str(); - } - DMatrixColPage *dmat = new DMatrixColPage(fname_ext); - dmat->Load(fs, silent, fname); - return dmat; - } - #endif fs.Close(); DMatrixSimple *dmat = new DMatrixSimple(); dmat->CacheLoad(fname, silent, savebuffer); @@ -72,16 +37,6 @@ DataMatrix* LoadDataMatrix(const char *fname, bool silent, } void SaveDataMatrix(const DataMatrix &dmat, const char *fname, bool silent) { -#ifndef XGBOOST_STRICT_CXX98_ - if (!strcmp(fname + strlen(fname) - 5, ".page")) { - DMatrixPage::Save(fname, dmat, silent); - return; - } - if (!strcmp(fname + strlen(fname) - 6, ".cpage")) { - DMatrixColPage::Save(fname, dmat, silent); - return; - } -#endif if (dmat.magic == DMatrixSimple::kMagic) { const DMatrixSimple *p_dmat = static_cast(&dmat); p_dmat->SaveBinary(fname, silent); diff --git a/src/io/simple_fmatrix-inl.hpp b/src/io/simple_fmatrix-inl.hpp index 21279900f..9f204536f 100644 --- a/src/io/simple_fmatrix-inl.hpp +++ b/src/io/simple_fmatrix-inl.hpp @@ -9,7 +9,8 @@ #include "../utils/utils.h" #include "../utils/random.h" #include "../utils/omp.h" -#include "../utils/matrix_csr.h" +#include "../utils/group_data.h" + namespace xgboost { namespace io { /*! @@ -147,21 +148,40 @@ class FMatrixS : public IFMatrix{ * \param pkeep probability to keep a row */ inline void InitColData(float pkeep, const std::vector &enabled) { + // clear rowset buffered_rowset_.clear(); - // note: this part of code is serial, todo, parallelize this transformer - utils::SparseCSRMBuilder builder(col_ptr_, col_data_); - builder.InitBudget(0); + // bit map + int nthread; + std::vector bmap; + #pragma omp parallel + { + nthread = omp_get_num_threads(); + } + // build the column matrix in parallel + utils::ParallelGroupBuilder builder(&col_ptr_, &col_data_); + builder.InitBudget(0, nthread); // start working iter_->BeforeFirst(); while (iter_->Next()) { const RowBatch &batch = iter_->Value(); - for (size_t i = 0; i < batch.size; ++i) { + bmap.resize(bmap.size() + batch.size, true); + for (size_t i = 0; i < batch.size; ++i) { + bst_uint ridx = static_cast(batch.base_rowid + i); if (pkeep == 1.0f || random::SampleBinary(pkeep)) { - buffered_rowset_.push_back(static_cast(batch.base_rowid+i)); + buffered_rowset_.push_back(ridx); + } else { + bmap[i] = false; + } + } + #pragma omp parallel for schedule(static) + for (size_t i = 0; i < batch.size; ++i) { + int tid = omp_get_thread_num(); + bst_uint ridx = static_cast(batch.base_rowid + i); + if (bmap[ridx]) { RowBatch::Inst inst = batch[i]; for (bst_uint j = 0; j < inst.length; ++j) { if (enabled[inst[j].index]){ - builder.AddBudget(inst[j].index); + builder.AddBudget(inst[j].index, tid); } } } @@ -170,19 +190,19 @@ class FMatrixS : public IFMatrix{ builder.InitStorage(); iter_->BeforeFirst(); - size_t ktop = 0; while (iter_->Next()) { const RowBatch &batch = iter_->Value(); + #pragma omp parallel for schedule(static) for (size_t i = 0; i < batch.size; ++i) { - if (ktop < buffered_rowset_.size() && - buffered_rowset_[ktop] == batch.base_rowid+i) { - ++ktop; + int tid = omp_get_thread_num(); + bst_uint ridx = static_cast(batch.base_rowid + i); + if (bmap[ridx]) { RowBatch::Inst inst = batch[i]; for (bst_uint j = 0; j < inst.length; ++j) { if (enabled[inst[j].index]) { - builder.PushElem(inst[j].index, - Entry((bst_uint)(batch.base_rowid+i), - inst[j].fvalue)); + builder.Push(inst[j].index, + Entry((bst_uint)(batch.base_rowid+i), + inst[j].fvalue), tid); } } } diff --git a/src/utils/group_data.h b/src/utils/group_data.h index a25eb1edd..6e12a39ff 100644 --- a/src/utils/group_data.h +++ b/src/utils/group_data.h @@ -40,7 +40,7 @@ struct ParallelGroupBuilder { * \param nkeys number of keys in the matrix, can be smaller than expected * \param nthread number of thread that will be used in construction */ - inline void InitBudget(size_t nkeys = 0, int nthread = 1) { + inline void InitBudget(size_t nkeys, int nthread) { thread_rptr.resize(nthread); for (size_t i = 0; i < thread_rptr.size(); ++i) { thread_rptr[i].resize(nkeys); @@ -53,7 +53,7 @@ struct ParallelGroupBuilder { * \param threadid the id of thread that calls this function * \param nelem number of element budget add to this row */ - inline void AddBudget(size_t key, int threadid = 0, SizeType nelem = 1) { + inline void AddBudget(size_t key, int threadid, SizeType nelem = 1) { std::vector &trptr = thread_rptr[threadid]; if (trptr.size() < key + 1) { trptr.resize(key + 1, 0); @@ -65,7 +65,7 @@ struct ParallelGroupBuilder { // set rptr to correct size for (size_t tid = 0; tid < thread_rptr.size(); ++tid) { if (rptr.size() <= thread_rptr[tid].size()) { - rptr.resize(thread_rptr[tid].size()+1); + rptr.resize(thread_rptr[tid].size() + 1); } } // initialize rptr to be beginning of each segment @@ -90,7 +90,7 @@ struct ParallelGroupBuilder { * \param key the key of * \param threadid the id of thread that calls this function */ - inline void Push(size_t key, ValueType value, int threadid = 0) { + inline void Push(size_t key, ValueType value, int threadid) { SizeType &rp = thread_rptr[threadid][key]; data[rp++] = value; } diff --git a/src/utils/matrix_csr.h b/src/utils/matrix_csr.h deleted file mode 100644 index 14e0667ee..000000000 --- a/src/utils/matrix_csr.h +++ /dev/null @@ -1,260 +0,0 @@ -#ifndef XGBOOST_UTILS_MATRIX_CSR_H_ -#define XGBOOST_UTILS_MATRIX_CSR_H_ -/*! - * \file matrix_csr.h - * \brief this file defines some easy to use STL based class for in memory sparse CSR matrix - * \author Tianqi Chen - */ -#include -#include -#include -#include "./io.h" -#include "./utils.h" -#include "./omp.h" - -namespace xgboost { -namespace utils { -/*! - * \brief a class used to help construct CSR format matrix, - * can be used to convert row major CSR to column major CSR - * \tparam IndexType type of index used to store the index position, usually unsigned or size_t - * \tparam whether enabling the usage of aclist, this option must be enabled manually - */ -template -struct SparseCSRMBuilder { - private: - /*! \brief dummy variable used in the indicator matrix construction */ - std::vector dummy_aclist; - /*! \brief pointer to each of the row */ - std::vector &rptr; - /*! \brief index of nonzero entries in each row */ - std::vector &findex; - /*! \brief a list of active rows, used when many rows are empty */ - std::vector &aclist; - - public: - SparseCSRMBuilder(std::vector &p_rptr, - std::vector &p_findex) - :rptr(p_rptr), findex(p_findex), aclist(dummy_aclist) { - Assert(!UseAcList, "enabling bug"); - } - /*! \brief use with caution! rptr must be cleaned before use */ - SparseCSRMBuilder(std::vector &p_rptr, - std::vector &p_findex, - std::vector &p_aclist) - :rptr(p_rptr), findex(p_findex), aclist(p_aclist) { - Assert(UseAcList, "must manually enable the option use aclist"); - } - - public: - /*! - * \brief step 1: initialize the number of rows in the data, not necessary exact - * \nrows number of rows in the matrix, can be smaller than expected - */ - inline void InitBudget(size_t nrows = 0) { - if (!UseAcList) { - rptr.clear(); - rptr.resize(nrows + 1, 0); - } else { - Assert(nrows + 1 == rptr.size(), "rptr must be initialized already"); - this->Cleanup(); - } - } - /*! - * \brief step 2: add budget to each rows, this function is called when aclist is used - * \param row_id the id of the row - * \param nelem number of element budget add to this row - */ - inline void AddBudget(size_t row_id, SizeType nelem = 1) { - if (rptr.size() < row_id + 2) { - rptr.resize(row_id + 2, 0); - } - if (UseAcList) { - if (rptr[row_id + 1] == 0) aclist.push_back(row_id); - } - rptr[row_id + 1] += nelem; - } - /*! \brief step 3: initialize the necessary storage */ - inline void InitStorage(void) { - // initialize rptr to be beginning of each segment - size_t start = 0; - if (!UseAcList) { - for (size_t i = 1; i < rptr.size(); i++) { - size_t rlen = rptr[i]; - rptr[i] = start; - start += rlen; - } - } else { - // case with active list - std::sort(aclist.begin(), aclist.end()); - for (size_t i = 0; i < aclist.size(); i++) { - size_t ridx = aclist[i]; - size_t rlen = rptr[ridx + 1]; - rptr[ridx + 1] = start; - // set previous rptr to right position if previous feature is not active - if (i == 0 || ridx != aclist[i - 1] + 1) rptr[ridx] = start; - start += rlen; - } - } - findex.resize(start); - } - /*! - * \brief step 4: - * used in indicator matrix construction, add new - * element to each row, the number of calls shall be exactly same as add_budget - */ - inline void PushElem(size_t row_id, IndexType col_id) { - SizeType &rp = rptr[row_id + 1]; - findex[rp++] = col_id; - } - /*! - * \brief step 5: only needed when aclist is used - * clean up the rptr for next usage - */ - inline void Cleanup(void) { - Assert(UseAcList, "this function can only be called use AcList"); - for (size_t i = 0; i < aclist.size(); i++) { - const size_t ridx = aclist[i]; - rptr[ridx] = 0; rptr[ridx + 1] = 0; - } - aclist.clear(); - } -}; - -/*! - * \brief a class used to help construct CSR format matrix file - * \tparam IndexType type of index used to store the index position - * \tparam SizeType type of size used in row pointer - */ -template -struct SparseCSRFileBuilder { - public: - explicit SparseCSRFileBuilder(utils::ISeekStream *fo, size_t buffer_size) - : fo(fo), buffer_size(buffer_size) { - } - /*! - * \brief step 1: initialize the number of rows in the data, not necessary exact - * \nrows number of rows in the matrix, can be smaller than expected - */ - inline void InitBudget(size_t nrows = 0) { - rptr.clear(); - rptr.resize(nrows + 1, 0); - } - /*! - * \brief step 2: add budget to each rows - * \param row_id the id of the row - * \param nelem number of element budget add to this row - */ - inline void AddBudget(size_t row_id, SizeType nelem = 1) { - if (rptr.size() < row_id + 2) { - rptr.resize(row_id + 2, 0); - } - rptr[row_id + 1] += nelem; - } - /*! \brief step 3: initialize the necessary storage */ - inline void InitStorage(void) { - SizeType nelem = 0; - for (size_t i = 1; i < rptr.size(); i++) { - nelem += rptr[i]; - rptr[i] = nelem; - } - begin_data = static_cast(fo->Tell()) + sizeof(SizeType); - SizeType begin_meta = begin_data + nelem * sizeof(IndexType); - fo->Write(&begin_meta, sizeof(begin_meta)); - fo->Seek(begin_meta); - fo->Write(rptr); - // setup buffer space - buffer_rptr.resize(rptr.size()); - buffer_temp.reserve(buffer_size); - buffer_data.resize(buffer_size); - saved_offset = rptr; - saved_offset.resize(rptr.size() - 1); - this->ClearBuffer(); - } - /*! \brief step 4: push element into buffer */ - inline void PushElem(SizeType row_id, IndexType col_id) { - if (buffer_temp.size() == buffer_size) { - this->WriteBuffer(); - this->ClearBuffer(); - } - buffer_rptr[row_id + 1] += 1; - buffer_temp.push_back(std::make_pair(row_id, col_id)); - } - /*! \brief finalize the construction */ - inline void Finalize(void) { - this->WriteBuffer(); - for (size_t i = 0; i < saved_offset.size(); ++i) { - utils::Assert(saved_offset[i] == rptr[i+1], "some block not write out"); - } - } - /*! \brief content must be in wb+ */ - template - inline void SortRows(Comparator comp, size_t step) { - for (size_t i = 0; i < rptr.size() - 1; i += step) { - bst_omp_uint begin = static_cast(i); - bst_omp_uint end = static_cast(std::min(rptr.size() - 1, i + step)); - if (rptr[end] != rptr[begin]) { - fo->Seek(begin_data + rptr[begin] * sizeof(IndexType)); - buffer_data.resize(rptr[end] - rptr[begin]); - fo->Read(BeginPtr(buffer_data), (rptr[end] - rptr[begin]) * sizeof(IndexType)); - // do parallel sorting - #pragma omp parallel for schedule(static) - for (bst_omp_uint j = begin; j < end; ++j) { - std::sort(&buffer_data[0] + rptr[j] - rptr[begin], - &buffer_data[0] + rptr[j+1] - rptr[begin], - comp); - } - fo->Seek(begin_data + rptr[begin] * sizeof(IndexType)); - fo->Write(BeginPtr(buffer_data), (rptr[end] - rptr[begin]) * sizeof(IndexType)); - } - } - } - protected: - inline void WriteBuffer(void) { - SizeType start = 0; - for (size_t i = 1; i < buffer_rptr.size(); ++i) { - size_t rlen = buffer_rptr[i]; - buffer_rptr[i] = start; - start += rlen; - } - for (size_t i = 0; i < buffer_temp.size(); ++i) { - SizeType &rp = buffer_rptr[buffer_temp[i].first + 1]; - buffer_data[rp++] = buffer_temp[i].second; - } - // write out - for (size_t i = 0; i < buffer_rptr.size() - 1; ++i) { - size_t nelem = buffer_rptr[i+1] - buffer_rptr[i]; - if (nelem != 0) { - utils::Assert(saved_offset[i] + nelem <= rptr[i+1], "data exceed bound"); - fo->Seek(saved_offset[i] * sizeof(IndexType) + begin_data); - fo->Write(&buffer_data[0] + buffer_rptr[i], nelem * sizeof(IndexType)); - saved_offset[i] += nelem; - } - } - } - inline void ClearBuffer(void) { - buffer_temp.clear(); - std::fill(buffer_rptr.begin(), buffer_rptr.end(), 0); - } - private: - /*! \brief output file pointer the data */ - utils::ISeekStream *fo; - /*! \brief pointer to each of the row */ - std::vector rptr; - /*! \brief saved top space of each item */ - std::vector saved_offset; - /*! \brief beginning position of data */ - size_t begin_data; - // ----- the following are buffer space - /*! \brief maximum size of content buffer*/ - size_t buffer_size; - /*! \brief store the data content */ - std::vector< std::pair > buffer_temp; - /*! \brief saved top space of each item */ - std::vector buffer_rptr; - /*! \brief saved top space of each item */ - std::vector buffer_data; -}; -} // namespace utils -} // namespace xgboost -#endif diff --git a/wrapper/xgboost_wrapper.cpp b/wrapper/xgboost_wrapper.cpp index b70939951..dec266ff6 100644 --- a/wrapper/xgboost_wrapper.cpp +++ b/wrapper/xgboost_wrapper.cpp @@ -19,7 +19,7 @@ using namespace std; #include "../src/learner/learner-inl.hpp" #include "../src/io/io.h" #include "../src/utils/utils.h" -#include "../src/utils/matrix_csr.h" +#include "../src/utils/group_data.h" #include "../src/io/simple_dmatrix-inl.hpp" using namespace xgboost; @@ -139,20 +139,32 @@ extern "C"{ const float *data, bst_ulong nindptr, bst_ulong nelem) { + int nthread; + #pragma omp parallel + { + nthread = omp_get_num_threads(); + } + DMatrixSimple *p_mat = new DMatrixSimple(); DMatrixSimple &mat = *p_mat; - utils::SparseCSRMBuilder builder(mat.row_ptr_, mat.row_data_); - builder.InitBudget(); + utils::ParallelGroupBuilder builder(&mat.row_ptr_, &mat.row_data_); + builder.InitBudget(0, nthread); bst_ulong ncol = nindptr - 1; + #pragma omp parallel for schedule(static) for (bst_ulong i = 0; i < ncol; ++i) { + int tid = omp_get_thread_num(); for (unsigned j = col_ptr[i]; j < col_ptr[i+1]; ++j) { - builder.AddBudget(indices[j]); + builder.AddBudget(indices[j], tid); } } builder.InitStorage(); + #pragma omp parallel for schedule(static) for (bst_ulong i = 0; i < ncol; ++i) { + int tid = omp_get_thread_num(); for (unsigned j = col_ptr[i]; j < col_ptr[i+1]; ++j) { - builder.PushElem(indices[j], RowBatch::Entry(static_cast(i), data[j])); + builder.Push(indices[j], + RowBatch::Entry(static_cast(i), data[j]), + tid); } } mat.info.info.num_row = mat.row_ptr_.size() - 1;