diff --git a/src/utils/quantile.h b/src/utils/quantile.h index 5b536bd00..0fadd9c35 100644 --- a/src/utils/quantile.h +++ b/src/utils/quantile.h @@ -13,14 +13,14 @@ namespace xgboost { namespace utils { -/*! - * \brief a helper class to compute streaming quantile + +/*! + * \brief experimental wsummary * \tparam DType type of data content * \tparam RType type of rank */ -template -class WQuantileSketch { - public: +template +struct WQSummary { /*! \brief an entry in the sketch summary */ struct Entry { /*! \brief minimum rank */ @@ -32,27 +32,17 @@ class WQuantileSketch { /*! \brief the value of data */ DType value; // constructor - Entry(void) { - rmin = rmax = wmin = 0.0; - } + Entry(void) {} // constructor Entry(RType rmin, RType rmax, RType wmin, DType value) - : rmin(rmin), rmax(rmax), wmin(wmin), value(value) { - if (!(this->rmax - this->rmin - this->wmin > -0.1)) { - rmax = rmin + wmin; - printf("correct\n"); - printf("rmax=%f, rmin=%f, wmin=%f, plus=%f, v=%f\n", - rmax, rmin, wmin, rmin+wmin, value); - } - } - /*! \brief debug function, check Valid */ + : rmin(rmin), rmax(rmax), wmin(wmin), value(value) {} + /*! + * \brief debug function, check Valid + * \param eps the tolerate level for violating the relation + */ inline void CheckValid(RType eps = 0) const { utils::Assert(rmin >= 0 && rmax >= 0 && wmin >= 0, "nonneg constraint"); - if (!(rmax - rmin - wmin > -eps)) { - printf("rmax=%f, rmin=%f, wmin=%f, plus=%f, v=%f\n", - rmax, rmin, wmin, rmin+wmin, value); - } - utils::Assert(rmax- rmin - wmin > -eps , "relation constraint: min/max"); + utils::Assert(rmax- rmin - wmin > -eps, "relation constraint: min/max"); } /*! \return rmin estimation for v strictly bigger than value */ inline RType rmin_next(void) const { @@ -63,161 +53,222 @@ class WQuantileSketch { return rmax - wmin; } }; - /*! - * \brief this is data structure presenting one summary - */ - struct Summary { - /*! \brief data field */ - Entry *data; - /*! \brief number of elements in the summary */ - size_t size; - // constructor - Summary(void) : size(0) {} - /*! - * \brief the maximum error of the Summary - */ - inline RType MaxError(void) const { - RType res = data[0].rmax - data[0].rmin - data[0].wmin; - for (size_t i = 1; i < size; ++i) { - res = std::max(data[i].rmax_prev() - data[i - 1].rmin_next(), res); - res = std::max(data[i].rmax - data[i].rmin - data[i].wmin, res); + /*! \brief input data queue before entering the summary */ + struct Queue { + // entry in the queue + struct QEntry { + // value of the instance + DType value; + // weight of instance + RType weight; + // default constructor + QEntry(void) {} + // constructor + QEntry(DType value, RType weight) + : value(value), weight(weight) {} + // comparator on value + inline bool operator<(const QEntry &b) const { + return value < b.value; } - return res; - } - /*! - * \brief debug function, validate whether the summary - * run consistency check to check if it is a valid summary - * \param eps the tolerate error level, used when RType is floating point and - * some inconsistency could occur due to rounding error - */ - inline void CheckValid(RType eps) const { - for (size_t i = 0; i < size; ++i) { - data[i].CheckValid(eps); - if (i != 0) { - utils::Assert(data[i].rmin >= data[i - 1].rmin + data[i - 1].wmin, "rmin range constraint"); - utils::Assert(data[i].rmax >= data[i - 1].rmax + data[i].wmin, "rmax range constraint"); + }; + // the input queue + std::vector queue; + // end of the queue + size_t qtail; + // push data to the queue + inline void Push(DType x, RType w) { + if (qtail == 0 || queue[qtail - 1].value != x) { + queue[qtail++] = QEntry(x, w); + } else { + queue[qtail - 1].weight += w; + } + } + inline void MakeSummary(WQSummary *out) { + std::sort(queue.begin(), queue.begin() + qtail); + out->size = 0; + // start update sketch + RType wsum = 0; + // construct data with unique weights + for (size_t i = 0; i < qtail;) { + size_t j = i + 1; + RType w = queue[i].weight; + while (j < qtail && queue[j].value == queue[i].value) { + w += queue[j].weight; ++j; } + out->data[out->size++] = Entry(wsum, wsum + w, w, queue[i].value); + wsum += w; i = j; } } - /*! \return maximum rank in the summary */ - inline RType MaxRank(void) const { - return data[size - 1].rmax; - } - /*! \brief set size to 0 */ - inline void Clear(void) { - size = 0; - } - /*! \brief used for debug purpose, print the summary */ - inline void Print(void) const { - for (size_t i = 0; i < size; ++i) { - printf("x=%f\t[%f, %f] wmin=%f\n", - data[i].value, - data[i].rmin, - data[i].rmax, - data[i].wmin); - } - } - /*! - * \brief copy content from src - * \param src source sketch - */ - inline void CopyFrom(const Summary &src) { - size = src.size; - std::memcpy(data, src.data, sizeof(Entry) * size); - } - /*! - * \brief set current summary to be pruned summary of src - * assume data field is already allocated to be at least maxsize - * \param src source summary - * \param maxsize size we can afford in the pruned sketch - */ - inline void SetPrune(const Summary &src, RType maxsize) { - if (src.size <= maxsize) { - this->CopyFrom(src); return; - } - const RType max_rank = src.MaxRank(); - const size_t n = maxsize - 1; - data[0] = src.data[0]; - this->size = 1; - // lastidx is used to avoid duplicated records - size_t i = 0, lastidx = 0; - for (RType k = 1; k < n; ++k) { - RType dx2 = (2 * k * max_rank) / n; - // find first i such that d < (rmax[i+1] + rmin[i+1]) / 2 - while (i < src.size - 1 && - dx2 >= src.data[i + 1].rmax + src.data[i + 1].rmin) ++i; - if (i == src.size - 1) break; - if (dx2 < src.data[i].rmin_next() + src.data[i + 1].rmax_prev()) { - if (i != lastidx) { - data[size++] = src.data[i]; lastidx = i; - } - } else { - if (i + 1 != lastidx) { - data[size++] = src.data[i + 1]; lastidx = i + 1; - } - } - } - if (lastidx != src.size - 1) { - data[size++] = src.data[src.size - 1]; - } - } - /*! - * \brief set current summary to be merged summary of sa and sb - * \param sa first input summary to be merged - * \param sb second input summar to be merged - */ - inline void SetCombine(const Summary &sa, - const Summary &sb) { - utils::Assert(sa.size > 0 && sb.size > 0, "invalid input for merge"); - const Entry *a = sa.data, *a_end = sa.data + sa.size; - const Entry *b = sb.data, *b_end = sb.data + sb.size; - // extended rmin value - RType aprev_rmin = 0, bprev_rmin = 0; - Entry *dst = this->data; - while (a != a_end && b != b_end) { - // duplicated value entry - if (a->value == b->value) { - *dst = Entry(a->rmin + b->rmin, - a->rmax + b->rmax, - a->wmin + b->wmin, a->value); - aprev_rmin = a->rmin_next(); - bprev_rmin = b->rmin_next(); - ++dst; ++a; ++b; - } else if (a->value < b->value) { - *dst = Entry(a->rmin + bprev_rmin, - a->rmax + b->rmax_prev(), - a->wmin, a->value); - aprev_rmin = a->rmin_next(); - ++dst; ++a; - } else { - *dst = Entry(b->rmin + aprev_rmin, - b->rmax + a->rmax_prev(), - b->wmin, b->value); - bprev_rmin = b->rmin_next(); - ++dst; ++b; - } - } - if (a != a_end) { - RType brmax = (b_end - 1)->rmax; - do { - *dst = Entry(a->rmin + bprev_rmin, a->rmax + brmax, a->wmin, a->value); - ++dst; ++a; - } while (a != a_end); - } - if (b != b_end) { - RType armax = (a_end - 1)->rmax; - do { - *dst = Entry(b->rmin + aprev_rmin, b->rmax + armax, b->wmin, b->value); - ++dst; ++b; - } while (b != b_end); - } - this->size = dst - data; - utils::Assert(size <= sa.size + sb.size, "bug in combine"); - } }; - // same as summary, but use STL to backup the space + + /*! \brief data field */ + Entry *data; + /*! \brief number of elements in the summary */ + size_t size; + // constructor + WQSummary(Entry *data, size_t size) + : data(data), size(size) {} + /*! + * \return the maximum error of the Summary + */ + inline RType MaxError(void) const { + RType res = data[0].rmax - data[0].rmin - data[0].wmin; + for (size_t i = 1; i < size; ++i) { + res = std::max(data[i].rmax_prev() - data[i - 1].rmin_next(), res); + res = std::max(data[i].rmax - data[i].rmin - data[i].wmin, res); + } + return res; + } + /*! \return maximum rank in the summary */ + inline RType MaxRank(void) const { + return data[size - 1].rmax; + } + /*! \brief set size to 0 */ + inline void Clear(void) { + size = 0; + } + /*! + * \brief copy content from src + * \param src source sketch + */ + inline void CopyFrom(const WQSummary &src) { + size = src.size; + std::memcpy(data, src.data, sizeof(Entry) * size); + +} /*! + * \brief debug function, validate whether the summary + * run consistency check to check if it is a valid summary + * \param eps the tolerate error level, used when RType is floating point and + * some inconsistency could occur due to rounding error + */ + inline void CheckValid(RType eps) const { + for (size_t i = 0; i < size; ++i) { + data[i].CheckValid(eps); + if (i != 0) { + utils::Assert(data[i].rmin >= data[i - 1].rmin + data[i - 1].wmin, "rmin range constraint"); + utils::Assert(data[i].rmax >= data[i - 1].rmax + data[i].wmin, "rmax range constraint"); + } + } + } + /*! \brief used for debug purpose, print the summary */ + inline void Print(void) const { + for (size_t i = 0; i < size; ++i) { + printf("x=%f\t[%f, %f] wmin=%f\n", + data[i].value, data[i].rmin, + data[i].rmax, data[i].wmin); + } + } + /*! + * \brief set current summary to be pruned summary of src + * assume data field is already allocated to be at least maxsize + * \param src source summary + * \param maxsize size we can afford in the pruned sketch + */ + + inline void SetPrune(const WQSummary &src, RType maxsize) { + if (src.size <= maxsize) { + this->CopyFrom(src); return; + } + const RType max_rank = src.MaxRank(); + const size_t n = maxsize - 1; + data[0] = src.data[0]; + this->size = 1; + // lastidx is used to avoid duplicated records + size_t i = 0, lastidx = 0; + for (RType k = 1; k < n; ++k) { + RType dx2 = (2 * k * max_rank) / n; + // find first i such that d < (rmax[i+1] + rmin[i+1]) / 2 + while (i < src.size - 1 && + dx2 >= src.data[i + 1].rmax + src.data[i + 1].rmin) ++i; + if (i == src.size - 1) break; + if (dx2 < src.data[i].rmin_next() + src.data[i + 1].rmax_prev()) { + if (i != lastidx) { + data[size++] = src.data[i]; lastidx = i; + } + } else { + if (i + 1 != lastidx) { + data[size++] = src.data[i + 1]; lastidx = i + 1; + } + } + } + if (lastidx != src.size - 1) { + data[size++] = src.data[src.size - 1]; + } + } + /*! + * \brief set current summary to be merged summary of sa and sb + * \param sa first input summary to be merged + * \param sb second input summar to be merged + */ + inline void SetCombine(const WQSummary &sa, + const WQSummary &sb) { + utils::Assert(sa.size > 0 && sb.size > 0, "invalid input for merge"); + const Entry *a = sa.data, *a_end = sa.data + sa.size; + const Entry *b = sb.data, *b_end = sb.data + sb.size; + // extended rmin value + RType aprev_rmin = 0, bprev_rmin = 0; + Entry *dst = this->data; + while (a != a_end && b != b_end) { + // duplicated value entry + if (a->value == b->value) { + *dst = Entry(a->rmin + b->rmin, + a->rmax + b->rmax, + a->wmin + b->wmin, a->value); + aprev_rmin = a->rmin_next(); + bprev_rmin = b->rmin_next(); + ++dst; ++a; ++b; + } else if (a->value < b->value) { + *dst = Entry(a->rmin + bprev_rmin, + a->rmax + b->rmax_prev(), + a->wmin, a->value); + aprev_rmin = a->rmin_next(); + ++dst; ++a; + } else { + *dst = Entry(b->rmin + aprev_rmin, + b->rmax + a->rmax_prev(), + b->wmin, b->value); + bprev_rmin = b->rmin_next(); + ++dst; ++b; + } + } + if (a != a_end) { + RType brmax = (b_end - 1)->rmax; + do { + *dst = Entry(a->rmin + bprev_rmin, a->rmax + brmax, a->wmin, a->value); + ++dst; ++a; + } while (a != a_end); + } + if (b != b_end) { + RType armax = (a_end - 1)->rmax; + do { + *dst = Entry(b->rmin + aprev_rmin, b->rmax + armax, b->wmin, b->value); + ++dst; ++b; + } while (b != b_end); + } + this->size = dst - data; + utils::Assert(size <= sa.size + sb.size, "bug in combine"); + } +}; + +/*! + * \brief template for all quantle sketch algorithm + * that uses merge/prune scheme + * \tparam DType type of data content + * \tparam RType type of rank + * \tparam TSummary actual summary data structure it uses + */ +template +class QuantileSketchTemplate { + public: + /*! \brief type of summary type */ + typedef TSummary Summary; + /*! \brief the entry type */ + typedef typename Summary::Entry Entry; + /*! \brief same as summary, but use STL to backup the space */ struct SummaryContainer : public Summary { std::vector space; + SummaryContainer(void) : Summary(NULL, 0) { + } /*! \brief reserve space for summary */ inline void Reserve(size_t size) { if (size > space.size()) { @@ -267,8 +318,8 @@ class WQuantileSketch { utils::Assert((1 << nlevel) * limit_size >= maxn, "invalid init parameter"); utils::Assert(nlevel <= limit_size * eps, "invalid init parameter"); // lazy reserve the space, if there is only one value, no need to allocate space - inqueue.resize(1); - qtail = 0; + inqueue.queue.resize(1); + inqueue.qtail = 0; data.clear(); level.clear(); } @@ -277,15 +328,15 @@ class WQuantileSketch { * \param x the elemented added to the sketch */ inline void Push(DType x, RType w = 1) { - if (qtail == inqueue.size()) { + if (inqueue.qtail == inqueue.queue.size()) { // jump from lazy one value to limit_size * 2 - if (inqueue.size() == 1) { - inqueue.resize(limit_size * 2); + if (inqueue.queue.size() == 1) { + inqueue.queue.resize(limit_size * 2); } else { temp.Reserve(limit_size * 2); - this->Queue2Summary(&temp); + inqueue.MakeSummary(&temp); // cleanup queue - qtail = 0; + inqueue.qtail = 0; for (size_t l = 1; true; ++l) { this->InitLevel(l + 1); // check if level l is empty @@ -307,18 +358,16 @@ class WQuantileSketch { } } } - if (qtail == 0 || inqueue[qtail - 1].value != x) { - inqueue[qtail++] = QEntry(x, w); - } else { - inqueue[qtail - 1].weight += w; - } + inqueue.Push(x, w); } /*! \brief get the summary after finalize */ inline void GetSummary(SummaryContainer *out) { if (level.size() != 0) { out->Reserve(limit_size * 2); + } else { + out->Reserve(inqueue.queue.size()); } - this->Queue2Summary(out); + inqueue.MakeSummary(out); if (level.size() != 0) { level[0].SetPrune(*out, limit_size); for (size_t l = 1; l < level.size(); ++l) { @@ -343,56 +392,19 @@ class WQuantileSketch { inline void CheckValid(RType eps) const { for (size_t l = 1; l < level.size(); ++l) { level[l].CheckValid(eps); - } + } } - // initialize level space to at least nlevel inline void InitLevel(size_t nlevel) { if (level.size() >= nlevel) return; data.resize(limit_size * nlevel); - level.resize(nlevel, Summary()); + level.resize(nlevel, Summary(NULL, 0)); for (size_t l = 0; l < level.size(); ++l) { level[l].data = BeginPtr(data) + l * limit_size; } } - inline void Queue2Summary(SummaryContainer *temp) { - // reserve temp space - temp->Reserve(inqueue.size()); - temp->size = 0; - // start update sketch - std::sort(inqueue.begin(), inqueue.begin() + qtail); - RType wsum = 0; - // construct data with unique weights - for (size_t i = 0; i < qtail;) { - size_t j = i + 1; - RType w = inqueue[i].weight; - while (j < qtail && inqueue[j].value == inqueue[i].value) { - w += inqueue[j].weight; ++j; - } - temp->data[temp->size++] = Entry(wsum, wsum + w, w, inqueue[i].value); - wsum += w; i = j; - } - } - // entry in the queue - struct QEntry { - // value of the instance - DType value; - // weight of instance - RType weight; - // default constructor - QEntry(void) {} - // constructor - QEntry(DType value, RType weight) - : value(value), weight(weight) {} - // comparator on value - inline bool operator<(const QEntry &b) const { - return value < b.value; - } - }; - // the input queue - std::vector inqueue; - // end of the queue - size_t qtail; + // input data queue + typename Summary::Queue inqueue; // number of levels size_t nlevel; // size of summary in each level @@ -405,6 +417,16 @@ class WQuantileSketch { SummaryContainer temp; }; +/*! + * \brief Quantiel sketch use WQSummary + * \tparam DType type of data content + * \tparam RType type of rank + */ +template +class WQuantileSketch : + public QuantileSketchTemplate >{ +}; + /*! * \brief a helper class to compute streaming quantile * \tparam DType type of data content diff --git a/test/Makefile b/test/Makefile index 5057619ec..6d135e317 100644 --- a/test/Makefile +++ b/test/Makefile @@ -1,5 +1,5 @@ export CC = gcc -export CXX = g++ +export CXX = clang++ export MPICXX = mpicxx export LDFLAGS= -pthread -lm export CFLAGS = -Wall -O3 -msse2 -Wno-unknown-pragmas -fPIC -I../src diff --git a/test/mkquantest.py b/test/mkquantest.py new file mode 100755 index 000000000..709d4bb78 --- /dev/null +++ b/test/mkquantest.py @@ -0,0 +1,35 @@ +#!/usr/bin/python +import math +import sys +import random +import subprocess + +funcs = { + 'seq': 'lambda n: sorted([(x,1) for x in range(1,n+1)], key = lambda x:random.random())', + 'seqlogw': 'lambda n: sorted([(x, math.log(x)) for x in range(1,n+1)], key = lambda x:random.random())' +} + +if len(sys.argv) < 3: + print 'Usage: python mkquantest.py [generate-type] [ndata]|./test_quantile' + print 'Possible generate-types:' + for k, v in funcs.items(): + print '\t%s: %s' % (k, v) + exit(-1) +random.seed(0) +maxn = int(sys.argv[1]) +eps = float(sys.argv[2]) +if len(sys.argv) > 3: + method = sys.argv[3] + assert method in funcs, ('cannot find method %s' % method) +else: + method = 'seq' +if len(sys.argv) > 4: + ndata = int(sys.argv[4]) + assert ndata <= maxn, 'ndata must be smaller than maxn' +else: + ndata = maxn + +fo = sys.stdout +fo.write('%d\t%g\n' % (maxn, eps)) +for x, w in eval(funcs[method])(ndata): + fo.write(str(x)+'\t'+str(w)+'\n') diff --git a/test/test_quantile.cpp b/test/test_quantile.cpp new file mode 100644 index 000000000..89e976897 --- /dev/null +++ b/test/test_quantile.cpp @@ -0,0 +1,23 @@ +#include +#include +using namespace xgboost; + +int main(int argc, char *argv[]) { + utils::WQuantileSketch sketch; + size_t n; + double wsum = 0.0; + float eps, x, w; + utils::Check(scanf("%lu%f", &n, &eps) == 2, "needs to start with n eps"); + sketch.Init(n, eps); + printf("nlevel = %lu, limit_size=%lu\n", sketch.nlevel, sketch.limit_size); + while (scanf("%f%f", &x, &w) == 2) { + sketch.Push(x, w); + wsum += w; + } + sketch.CheckValid(0.1); + utils::WQuantileSketch::SummaryContainer out; + sketch.GetSummary(&out); + printf("MaxError=%f/%f = %g\n", out.MaxError(), wsum, out.MaxError() / wsum); + out.Print(); + return 0; +}