check unity back

This commit is contained in:
tqchen
2014-08-29 18:31:24 -07:00
parent 04c520ea3d
commit 551b3b70f1
6 changed files with 844 additions and 32 deletions

View File

@@ -88,11 +88,19 @@ class IStream {
}
};
/*! \brief implementation of file i/o stream */
class FileStream : public IStream {
private:
FILE *fp;
/*! \brief interface of i/o stream that support seek */
class ISeekStream: public IStream {
public:
/*! \brief seek to certain position of the file */
virtual void Seek(size_t pos) = 0;
/*! \brief tell the position of the stream */
virtual size_t Tell(void) = 0;
};
/*! \brief implementation of file i/o stream */
class FileStream : public ISeekStream {
public:
explicit FileStream(void) {}
explicit FileStream(FILE *fp) {
this->fp = fp;
}
@@ -102,12 +110,18 @@ class FileStream : public IStream {
virtual void Write(const void *ptr, size_t size) {
fwrite(ptr, size, 1, fp);
}
inline void Seek(size_t pos) {
fseek(fp, 0, SEEK_SET);
virtual void Seek(size_t pos) {
fseek(fp, pos, SEEK_SET);
}
virtual size_t Tell(void) {
return static_cast<size_t>(ftell(fp));
}
inline void Close(void) {
fclose(fp);
}
private:
FILE *fp;
};
} // namespace utils

146
src/utils/thread.h Normal file
View File

@@ -0,0 +1,146 @@
#ifndef XGBOOST_UTILS_THREAD_H
#define XGBOOST_UTILS_THREAD_H
/*!
* \file thread.h
* \brief this header include the minimum necessary resource for multi-threading
* \author Tianqi Chen
* Acknowledgement: this file is adapted from SVDFeature project, by same author.
* The MAC support part of this code is provided by Artemy Kolchinsky
*/
#ifdef _MSC_VER
#include "utils.h"
#include <windows.h>
#include <process.h>
namespace xgboost {
namespace utils {
/*! \brief simple semaphore used for synchronization */
class Semaphore {
public :
inline void Init(int init_val) {
sem = CreateSemaphore(NULL, init_val, 10, NULL);
utils::Assert(sem != NULL, "create Semaphore error");
}
inline void Destroy(void) {
CloseHandle(sem);
}
inline void Wait(void) {
utils::Assert(WaitForSingleObject(sem, INFINITE) == WAIT_OBJECT_0, "WaitForSingleObject error");
}
inline void Post(void) {
utils::Assert(ReleaseSemaphore(sem, 1, NULL) != 0, "ReleaseSemaphore error");
}
private:
HANDLE sem;
};
/*! \brief simple thread that wraps windows thread */
class Thread {
private:
HANDLE thread_handle;
unsigned thread_id;
public:
inline void Start(unsigned int __stdcall entry(void*), void *param) {
thread_handle = (HANDLE)_beginthreadex(NULL, 0, entry, param, 0, &thread_id);
}
inline int Join(void) {
WaitForSingleObject(thread_handle, INFINITE);
return 0;
}
};
/*! \brief exit function called from thread */
inline void ThreadExit(void *status) {
_endthreadex(0);
}
#define XGBOOST_THREAD_PREFIX unsigned int __stdcall
} // namespace utils
} // namespace xgboost
#else
// thread interface using g++
#include <semaphore.h>
#include <pthread.h>
namespace xgboost {
namespace utils {
/*!\brief semaphore class */
class Semaphore {
#ifdef __APPLE__
private:
sem_t* semPtr;
char sema_name[20];
private:
inline void GenRandomString(char *s, const int len) {
static const char alphanum[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" ;
for (int i = 0; i < len; ++i) {
s[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
}
s[len] = 0;
}
public:
inline void Init(int init_val) {
sema_name[0]='/';
sema_name[1]='s';
sema_name[2]='e';
sema_name[3]='/';
GenRandomString(&sema_name[4], 16);
if((semPtr = sem_open(sema_name, O_CREAT, 0644, init_val)) == SEM_FAILED) {
perror("sem_open");
exit(1);
}
utils::Assert(semPtr != NULL, "create Semaphore error");
}
inline void Destroy(void) {
if (sem_close(semPtr) == -1) {
perror("sem_close");
exit(EXIT_FAILURE);
}
if (sem_unlink(sema_name) == -1) {
perror("sem_unlink");
exit(EXIT_FAILURE);
}
}
inline void Wait(void) {
sem_wait(semPtr);
}
inline void Post(void) {
sem_post(semPtr);
}
#else
private:
sem_t sem;
public:
inline void Init(int init_val) {
sem_init(&sem, 0, init_val);
}
inline void Destroy(void) {
sem_destroy(&sem);
}
inline void Wait(void) {
sem_wait(&sem);
}
inline void Post(void) {
sem_post(&sem);
}
#endif
};
/*!\brief simple thread class */
class Thread {
private:
pthread_t thread;
public :
inline void Start(void * entry(void*), void *param) {
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
pthread_create(&thread, &attr, entry, param);
}
inline int Join(void) {
void *status;
return pthread_join(thread, &status);
}
};
inline void ThreadExit(void *status) {
pthread_exit(status);
}
} // namespace utils
} // namespace xgboost
#define XGBOOST_THREAD_PREFIX void *
#endif
#endif

200
src/utils/thread_buffer.h Normal file
View File

@@ -0,0 +1,200 @@
#ifndef XGBOOST_UTILS_THREAD_BUFFER_H
#define XGBOOST_UTILS_THREAD_BUFFER_H
/*!
* \file thread_buffer.h
* \brief multi-thread buffer, iterator, can be used to create parallel pipeline
* \author Tianqi Chen
*/
#include <vector>
#include <cstring>
#include <cstdlib>
#include "./utils.h"
#include "./thread.h"
namespace xgboost {
namespace utils {
/*!
* \brief buffered loading iterator that uses multithread
* this template method will assume the following paramters
* \tparam Elem elememt type to be buffered
* \tparam ElemFactory factory type to implement in order to use thread buffer
*/
template<typename Elem, typename ElemFactory>
class ThreadBuffer {
public:
/*!\brief constructor */
ThreadBuffer(void) {
this->init_end = false;
this->buf_size = 30;
}
~ThreadBuffer(void) {
if(init_end) this->Destroy();
}
/*!\brief set parameter, will also pass the parameter to factory */
inline void SetParam(const char *name, const char *val) {
if (!strcmp( name, "buffer_size")) buf_size = atoi(val);
factory.SetParam(name, val);
}
/*!
* \brief initalize the buffered iterator
* \param param a initialize parameter that will pass to factory, ignore it if not necessary
* \return false if the initlization can't be done, e.g. buffer file hasn't been created
*/
inline bool Init(void) {
if (!factory.Init()) return false;
for (int i = 0; i < buf_size; ++i) {
bufA.push_back(factory.Create());
bufB.push_back(factory.Create());
}
this->init_end = true;
this->StartLoader();
return true;
}
/*!\brief place the iterator before first value */
inline void BeforeFirst(void) {
// wait till last loader end
loading_end.Wait();
// critcal zone
current_buf = 1;
factory.BeforeFirst();
// reset terminate limit
endA = endB = buf_size;
// wake up loader for first part
loading_need.Post();
// wait til first part is loaded
loading_end.Wait();
// set current buf to right value
current_buf = 0;
// wake loader for next part
data_loaded = false;
loading_need.Post();
// set buffer value
buf_index = 0;
}
/*! \brief destroy the buffer iterator, will deallocate the buffer */
inline void Destroy(void) {
// wait until the signal is consumed
this->destroy_signal = true;
loading_need.Post();
loader_thread.Join();
loading_need.Destroy();
loading_end.Destroy();
for (size_t i = 0; i < bufA.size(); ++i) {
factory.FreeSpace(bufA[i]);
}
for (size_t i = 0; i < bufB.size(); ++i) {
factory.FreeSpace(bufB[i]);
}
bufA.clear(); bufB.clear();
factory.Destroy();
this->init_end = false;
}
/*!
* \brief get the next element needed in buffer
* \param elem element to store into
* \return whether reaches end of data
*/
inline bool Next(Elem &elem) {
// end of buffer try to switch
if (buf_index == buf_size) {
this->SwitchBuffer();
buf_index = 0;
}
if (buf_index >= (current_buf ? endA : endB)) {
return false;
}
std::vector<Elem> &buf = current_buf ? bufA : bufB;
elem = buf[buf_index];
++buf_index;
return true;
}
/*!
* \brief get the factory object
*/
inline ElemFactory &get_factory(void) {
return factory;
}
// size of buffer
int buf_size;
private:
// factory object used to load configures
ElemFactory factory;
// index in current buffer
int buf_index;
// indicate which one is current buffer
int current_buf;
// max limit of visit, also marks termination
int endA, endB;
// double buffer, one is accessed by loader
// the other is accessed by consumer
// buffer of the data
std::vector<Elem> bufA, bufB;
// initialization end
bool init_end;
// singal whether the data is loaded
bool data_loaded;
// signal to kill the thread
bool destroy_signal;
// thread object
Thread loader_thread;
// signal of the buffer
Semaphore loading_end, loading_need;
/*!
* \brief slave thread
* this implementation is like producer-consumer style
*/
inline void RunLoader(void) {
while(!destroy_signal) {
// sleep until loading is needed
loading_need.Wait();
std::vector<Elem> &buf = current_buf ? bufB : bufA;
int i;
for (i = 0; i < buf_size ; ++i) {
if (!factory.LoadNext(buf[i])) {
int &end = current_buf ? endB : endA;
end = i; // marks the termination
break;
}
}
// signal that loading is done
data_loaded = true;
loading_end.Post();
}
}
/*!\brief entry point of loader thread */
inline static XGBOOST_THREAD_PREFIX LoaderEntry(void *pthread) {
static_cast< ThreadBuffer<Elem,ElemFactory>* >(pthread)->RunLoader();
ThreadExit(NULL);
return NULL;
}
/*!\brief start loader thread */
inline void StartLoader(void) {
destroy_signal = false;
// set param
current_buf = 1;
loading_need.Init(1);
loading_end .Init(0);
// reset terminate limit
endA = endB = buf_size;
loader_thread.Start(LoaderEntry, this);
// wait until first part of data is loaded
loading_end.Wait();
// set current buf to right value
current_buf = 0;
// wake loader for next part
data_loaded = false;
loading_need.Post();
buf_index = 0;
}
/*!\brief switch double buffer */
inline void SwitchBuffer(void) {
loading_end.Wait();
// loader shall be sleep now, critcal zone!
current_buf = !current_buf;
// wake up loader
data_loaded = false;
loading_need.Post();
}
};
} // namespace utils
} // namespace xgboost
#endif