get multinode in

This commit is contained in:
tqchen
2014-11-19 19:19:53 -08:00
parent 7c3a392136
commit c42ba8d281
14 changed files with 157 additions and 23 deletions

View File

@@ -160,13 +160,13 @@ class SerializeReducer {
inline void AllReduce(DType *sendrecvobj, size_t max_n4byte, size_t count) {
buffer.resize(max_n4byte * count);
for (size_t i = 0; i < count; ++i) {
utils::MemoryFixSizeBuffer fs(BeginPtr(buffer) + i * max_n4byte * 4, max_n4byte * 4);
sendrecvobj[i]->Save(fs);
utils::MemoryFixSizeBuffer fs(BeginPtr(buffer) + i * max_n4byte, max_n4byte * 4);
sendrecvobj[i].Save(fs);
}
handle.AllReduce(BeginPtr(buffer), max_n4byte, count);
for (size_t i = 0; i < count; ++i) {
utils::MemoryFixSizeBuffer fs(BeginPtr(buffer) + i * max_n4byte * 4, max_n4byte * 4);
sendrecvobj[i]->Load(fs);
utils::MemoryFixSizeBuffer fs(BeginPtr(buffer) + i * max_n4byte, max_n4byte * 4);
sendrecvobj[i].Load(fs);
}
}
@@ -178,12 +178,12 @@ class SerializeReducer {
// temp space
DType tsrc, tdst;
for (int i = 0; i < len_; ++i) {
utils::MemoryFixSizeBuffer fsrc((void*)(src_) + i * nbytes, nbytes);
utils::MemoryFixSizeBuffer fdst(dst_ + i * nbytes, nbytes);
utils::MemoryFixSizeBuffer fsrc((char*)(src_) + i * nbytes, nbytes);
utils::MemoryFixSizeBuffer fdst((char*)(dst_) + i * nbytes, nbytes);
tsrc.Load(fsrc);
tdst.Load(fdst);
// govern const check
tdst.Reduce(static_cast<const DType &>(tsrc));
tdst.Reduce(static_cast<const DType &>(tsrc), nbytes);
fdst.Seek(0);
tdst.Save(fdst);
}

View File

@@ -38,6 +38,9 @@ void Bcast(std::string *sendrecv_data, int root) {
ReduceHandle::ReduceHandle(void) : handle(NULL) {}
ReduceHandle::~ReduceHandle(void) {}
int ReduceHandle::TypeSize(const MPI::Datatype &dtype) {
return 0;
}
void ReduceHandle::Init(ReduceFunction redfunc, size_t type_n4bytes, bool commute) {}
void ReduceHandle::AllReduce(void *sendrecvbuf, size_t type_n4bytes, size_t n4byte) {}
} // namespace sync

View File

@@ -97,9 +97,12 @@ void ReduceHandle::AllReduce(void *sendrecvbuf, size_t type_n4bytes, size_t coun
utils::Assert(handle != NULL, "must intialize handle to call AllReduce");
MPI::Op *op = reinterpret_cast<MPI::Op*>(handle);
MPI::Datatype *dtype = reinterpret_cast<MPI::Datatype*>(htype);
if (created_type_n4bytes != type_n4bytes || htype == NULL) {
dtype->Free();
if (created_type_n4bytes != type_n4bytes || dtype == NULL) {
if (dtype == NULL) {
dtype = new MPI::Datatype();
} else {
dtype->Free();
}
*dtype = MPI::INT.Create_contiguous(type_n4bytes);
dtype->Commit();
created_type_n4bytes = type_n4bytes;

View File

@@ -18,7 +18,7 @@ IUpdater* CreateUpdater(const char *name) {
if (!strcmp(name, "sync")) return new TreeSyncher();
if (!strcmp(name, "refresh")) return new TreeRefresher<GradStats>();
if (!strcmp(name, "grow_colmaker")) return new ColMaker<GradStats>();
//if (!strcmp(name, "grow_histmaker")) return new CQHistMaker<GradStats>();
if (!strcmp(name, "grow_histmaker")) return new CQHistMaker<GradStats>();
//if (!strcmp(name, "grow_skmaker")) return new SketchMaker();
if (!strcmp(name, "distcol")) return new DistColMaker<GradStats>();

View File

@@ -507,7 +507,7 @@ class CQHistMaker: public HistMaker<TStats> {
// node statistics
std::vector<TStats> node_stats;
// summary array
std::vector< WXQSketch::SummaryContainer> summary_array;
std::vector<WXQSketch::SummaryContainer> summary_array;
// reducer for summary
sync::SerializeReducer<WXQSketch::SummaryContainer> sreducer;
// per node, per feature sketch
@@ -517,6 +517,7 @@ class CQHistMaker: public HistMaker<TStats> {
template<typename TStats>
class QuantileHistMaker: public HistMaker<TStats> {
protected:
typedef utils::WXQuantileSketch<bst_float, bst_float> WXQSketch;
virtual void ResetPosAndPropose(const std::vector<bst_gpair> &gpair,
IFMatrix *p_fmat,
const BoosterInfo &info,
@@ -624,9 +625,8 @@ class QuantileHistMaker: public HistMaker<TStats> {
}
private:
typedef utils::WXQuantileSketch<bst_float, bst_float> WXQSketch;
// summary array
std::vector< WXQSketch::SummaryContainer> summary_array;
std::vector<WXQSketch::SummaryContainer> summary_array;
// reducer for summary
sync::SerializeReducer<WXQSketch::SummaryContainer> sreducer;
// local temp column data structure

View File

@@ -106,7 +106,7 @@ struct MemoryFixSizeBuffer : public ISeekStream {
}
virtual ~MemoryFixSizeBuffer(void) {}
virtual size_t Read(void *ptr, size_t size) {
utils::Assert(curr_ptr_ <= buffer_size_,
utils::Assert(curr_ptr_ + size <= buffer_size_,
"read can not have position excceed buffer length");
size_t nread = std::min(buffer_size_ - curr_ptr_, size);
if (nread != 0) memcpy(ptr, p_buffer_ + curr_ptr_, nread);

View File

@@ -519,12 +519,12 @@ class QuantileSketchTemplate {
/*! \brief same as summary, but use STL to backup the space */
struct SummaryContainer : public Summary {
std::vector<Entry> space;
explicit SummaryContainer(void) : Summary(NULL, 0) {
}
explicit SummaryContainer(const SummaryContainer &src) : Summary(NULL, src.size) {
SummaryContainer(const SummaryContainer &src) : Summary(NULL, src.size) {
this->space = src.space;
this->data = BeginPtr(this->space);
}
SummaryContainer(void) : Summary(NULL, 0) {
}
/*! \brief reserve space for summary */
inline void Reserve(size_t size) {
if (size > space.size()) {
@@ -576,13 +576,17 @@ class QuantileSketchTemplate {
/*! \brief save the data structure into stream */
inline void Save(IStream &fo) const {
fo.Write(&(this->size), sizeof(this->size));
fo.Write(data, this->size * sizeof(Entry));
if (this->size != 0) {
fo.Write(this->data, this->size * sizeof(Entry));
}
}
/*! \brief load data structure from input stream */
inline void Load(IStream &fi) {
utils::Check(fi.Read(&this->size, sizeof(this->size)) != 0, "invalid SummaryArray 1");
this->Reserve(this->size);
utils::Check(fi.Read(data, this->size * sizeof(Entry)) != 0, "invalid SummaryArray 2");
this->Reserve(this->size);
if (this->size != 0) {
utils::Check(fi.Read(this->data, this->size * sizeof(Entry)) != 0, "invalid SummaryArray 2");
}
}
};
/*!