add recover data, do a round of review

This commit is contained in:
tqchen 2014-11-30 20:59:55 -08:00
parent b9b58a1275
commit ecb09a23bc
7 changed files with 156 additions and 40 deletions

View File

@ -5,7 +5,7 @@
* \brief This file defines a template wrapper of engine to give more flexible
* AllReduce operations
*
* \author Tianqi Chen, Nacho, Tianyi
* \author Tianqi Chen, Ignacio Cano, Tianyi Zhou
*/
#include "./engine.h"

View File

@ -3,7 +3,7 @@
* \brief this file governs which implementation of engine we are actually using
* provides an singleton of engine interface
*
* \author Tianqi, Nacho, Tianyi
* \author Tianqi Chen, Ignacio Cano, Tianyi Zhou
*/
#define _CRT_SECURE_NO_WARNINGS
#define _CRT_SECURE_NO_DEPRECATE

View File

@ -1,7 +1,8 @@
/*!
* \file engine_base.cc
* \brief Basic implementation of AllReduce
* \author Tianqi, Nacho, Tianyi
*
* \author Tianqi Chen, Ignacio Cano, Tianyi Zhou
*/
#define _CRT_SECURE_NO_WARNINGS
#define _CRT_SECURE_NO_DEPRECATE

View File

@ -5,8 +5,8 @@
*
* This implementation provides basic utility of AllReduce and Broadcast
* without considering node failure
*
* \author Tianqi, Nacho, Tianyi
*
* \author Tianqi Chen, Ignacio Cano, Tianyi Zhou
*/
#ifndef ALLREDUCE_ENGINE_BASE_H
#define ALLREDUCE_ENGINE_BASE_H
@ -136,7 +136,7 @@ class AllReduceBase : public IEngine {
inline void ResetSize(void) {
size_write = size_read = 0;
}
/*!
/*!
* \brief read data into ring-buffer, with care not to existing useful override data
* position after protect_start
* \param protect_start all data start from protect_start is still needed in buffer
@ -157,7 +157,7 @@ class AllReduceBase : public IEngine {
if (len == -1) return errno == EAGAIN || errno == EWOULDBLOCK;
size_read += static_cast<size_t>(len);
return true;
}
}
/*!
* \brief read data into array,
* this function can not be used together with ReadToRingBuffer

View File

@ -2,7 +2,7 @@
* \file engine_robust-inl.h
* \brief implementation of inline template function in AllReduceRobust
*
* \author Tianqi, Nacho, Tianyi
* \author Tianqi Chen
*/
#ifndef ALLREDUCE_ENGINE_ROBUST_INL_H
#define ALLREDUCE_ENGINE_ROBUST_INL_H

View File

@ -1,7 +1,8 @@
/*!
* \file engine_robust.cc
* \brief Robust implementation of AllReduce
* \author Tianqi, Nacho, Tianyi
*
* \author Tianqi Chen, Ignacio Cano, Tianyi Zhou
*/
#define _CRT_SECURE_NO_WARNINGS
#define _CRT_SECURE_NO_DEPRECATE
@ -272,24 +273,22 @@ inline char DataRequest(const std::pair<bool, int> &node_value,
/*!
* \brief try to decide the recovery message passing request
* \param role the current role of the node
* \param p_req_outlink used to store the output link the
* current node should recv data from,
* this can be -1 or -2,
* -1 means current node have the data
* -2 means current node do not have data, but also do not need to send/recv data
* \param p_req_in used to store the resulting vector, indicating which link we should send the data to
* \param p_size used to store the size of the message, for node in state kHaveData,
* this size must be set correctly before calling the function
* for others, this surves as output parameter
*
* \param p_recvlink used to store the link current node should recv data from, if necessary
* this can be -1, which means current node have the data
* \param p_req_in used to store the resulting vector, indicating which link we should send the data to
*
* \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details
* \sa ReturnType
*/
AllReduceRobust::ReturnType
AllReduceRobust::TryDecideRequest(AllReduceRobust::RecoverType role,
int *p_req_outlink,
std::vector<bool> *p_req_in,
size_t *p_size) {
AllReduceRobust::TryDecideRouting(AllReduceRobust::RecoverType role,
size_t *p_size,
int *p_recvlink,
std::vector<bool> *p_req_in) {
int best_link = -2;
{// get the shortest distance to the request point
std::vector< std::pair<int,size_t> > dist_in, dist_out;
@ -317,7 +316,6 @@ AllReduceRobust::TryDecideRequest(AllReduceRobust::RecoverType role,
ReturnType succ = MsgPassing(std::make_pair(role == kRequestData, best_link),
&req_in, &req_out, DataRequest);
if (succ != kSuccess) return succ;
bool need_recv = false;
// set p_req_in
p_req_in->resize(req_in.size());
for (size_t i = 0; i < req_in.size(); ++i) {
@ -326,16 +324,115 @@ AllReduceRobust::TryDecideRequest(AllReduceRobust::RecoverType role,
if (req_out[i] != 0) {
utils::Assert(req_in[i] == 0, "cannot get and receive request");
utils::Assert(static_cast<int>(i) == best_link, "request result inconsistent");
need_recv = true;
}
}
if (role == kPassData && !need_recv) {
for (size_t i = 0; i < req_in.size(); ++i) {
utils::Assert(req_in[i] == 0, "Bug in TryDecideRequest");
*p_recvlink = best_link;
return kSuccess;
}
/*!
* \brief try to finish the data recovery request,
* this function is used together with TryDecideRouting
* \param role the current role of the node
* \param sendrecvbuf_ the buffer to store the data to be sent/recived
* - if the role is kHaveData, this stores the data to be sent
* - if the role is kRequestData, this is the buffer to store the result
* - if the role is kPassData, this will not be used, and can be NULL
* \param size the size of the data, obtained from TryDecideRouting
* \param recv_link the link index to receive data, if necessary, obtained from TryDecideRouting
* \param req_in the request of each link to send data, obtained from TryDecideRouting
*
* \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details
* \sa ReturnType, TryDecideRouting
*/
AllReduceRobust::ReturnType
AllReduceRobust::TryRecoverData(RecoverType role,
void *sendrecvbuf_,
size_t size,
int recv_link,
const std::vector<bool> &req_in) {
// no need to run recovery for zero size message
if (size == 0) return kSuccess;
utils::Assert(req_in.size() == links.size(), "TryRecoverData");
const int nlink = static_cast<int>(links.size());
{
bool req_data = role == kRequestData;
for (int i = 0; i < nlink; ++i) {
if (req_in[i]) {
utils::Assert(i != recv_link, "TryDecideRouting");
req_data = true;
}
}
// do not need to provide data or receive data, directly exit
if (!req_data) return kSuccess;
}
for (int i = 0; i < nlink; ++i) {
links[i].ResetSize();
}
utils::Assert(recv_link >= 0 || role == kHaveData, "recv_link must be active");
if (role == kPassData) {
links[recv_link].InitBuffer(1, size, reduce_buffer_size);
}
while (true) {
bool finished = true;
utils::SelectHelper selecter;
for (int i = 0; i < nlink; ++i) {
if (i == recv_link && links[i].size_read != size) {
selecter.WatchRead(links[i].sock);
finished = false;
}
if (req_in[i] && links[i].size_write != size) {
selecter.WatchWrite(links[i].sock);
finished = false;
}
selecter.WatchException(links[i].sock);
}
if (finished) break;
selecter.Select();
if (role == kRequestData) {
const int pid = recv_link;
if (selecter.CheckRead(links[pid].sock)) {
if(!links[pid].ReadToArray(sendrecvbuf_, size)) return kSockError;
}
for (int i = 0; i < nlink; ++i) {
if (req_in[i] && links[i].size_write != links[pid].size_read &&
selecter.CheckWrite(links[i].sock)) {
if(!links[i].WriteFromArray(sendrecvbuf_, links[pid].size_read)) return kSockError;
}
}
}
if (role == kHaveData) {
for (int i = 0; i < nlink; ++i) {
if (req_in[i] && selecter.CheckWrite(links[i].sock)) {
if(!links[i].WriteFromArray(sendrecvbuf_, size)) return kSockError;
}
}
}
if (role == kPassData) {
const int pid = recv_link;
const size_t buffer_size = links[pid].buffer_size;
if (selecter.CheckRead(links[pid].sock)) {
size_t min_write = size;
for (int i = 0; i < nlink; ++i) {
if (req_in[i]) min_write = std::min(links[i].size_write, min_write);
}
utils::Assert(min_write <= links[pid].size_read, "boundary check");
if (!links[pid].ReadToRingBuffer(min_write)) return kSockError;
}
for (int i = 0; i < nlink; ++i) {
if (req_in[i] && selecter.CheckWrite(links[i].sock)) {
size_t start = links[i].size_write % buffer_size;
// send out data from ring buffer
size_t nwrite = std::min(buffer_size - start, links[pid].size_read - links[i].size_write);
ssize_t len = links[pid].sock.Send(links[pid].buffer_head + start, nwrite);
if (len != -1) {
links[i].size_write += len;
} else {
if (errno != EAGAIN && errno != EWOULDBLOCK) return kSockError;
}
}
}
}
*p_req_outlink = -2;
} else {
*p_req_outlink = best_link;
}
return kSuccess;
}

View File

@ -5,7 +5,7 @@
*
* This implementation considers the failure of nodes
*
* \author Tianqi, Nacho, Tianyi
* \author Tianqi Chen, Ignacio Cano, Tianyi Zhou
*/
#ifndef ALLREDUCE_ENGINE_ROBUST_H
#define ALLREDUCE_ENGINE_ROBUST_H
@ -257,25 +257,43 @@ class AllReduceRobust : public AllReduceBase {
*/
ReturnType TryGetResult(void *buf, size_t size, int seqno, bool requester);
/*!
* \brief try to decide the recovery message passing request
* \brief try to decide the routing strategy for recovery
* \param role the current role of the node
* \param p_req_outlink used to store the output link the
* current node should recv data from,
* this can be nonnegative value, -1 or -2,
* -1 means current node have the data
* -2 means current node do not have data, but also do not need to send/recv data
* \param p_req_in used to store the resulting vector, indicating which link we should send the data to
* \param p_size used to store the size of the message, for node in state kHaveData,
* this size must be set correctly before calling the function
* for others, this surves as output parameter
* \param p_recvlink used to store the link current node should recv data from, if necessary
* this can be -1, which means current node have the data
* \param p_req_in used to store the resulting vector, indicating which link we should send the data to
*
* \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details
* \sa ReturnType
* \sa ReturnType, TryRecoverData
*/
ReturnType TryDecideRouting(RecoverType role,
size_t *p_size,
int *p_recvlink,
std::vector<bool> *p_req_in);
/*!
* \brief try to finish the data recovery request,
* this function is used together with TryDecideRouting
* \param role the current role of the node
* \param sendrecvbuf_ the buffer to store the data to be sent/recived
* - if the role is kHaveData, this stores the data to be sent
* - if the role is kRequestData, this is the buffer to store the result
* - if the role is kPassData, this will not be used, and can be NULL
* \param size the size of the data, obtained from TryDecideRouting
* \param recv_link the link index to receive data, if necessary, obtained from TryDecideRouting
* \param req_in the request of each link to send data, obtained from TryDecideRouting
*
* \return this function can return kSuccess/kSockError/kGetExcept, see ReturnType for details
* \sa ReturnType, TryDecideRouting
*/
ReturnType TryDecideRequest(RecoverType role,
int *p_req_outlink,
std::vector<bool> *p_req_in,
size_t *p_size);
ReturnType TryRecoverData(RecoverType role,
void *sendrecvbuf_,
size_t size,
int recv_link,
const std::vector<bool> &req_in);
/*!
* \brief run message passing algorithm on the allreduce tree
* the result is edge message stored in p_edge_in and p_edge_out