fix bug in queue2summary

This commit is contained in:
tqchen 2014-11-09 21:09:07 -08:00
parent 7c1ec78a01
commit 5561dd9cb0

View File

@ -59,8 +59,7 @@ class WQuantileSketch {
/*! \brief number of elements in the summary */ /*! \brief number of elements in the summary */
size_t size; size_t size;
// constructor // constructor
Summary(void) : size(0) { Summary(void) : size(0) {}
}
/*! /*!
* \brief the maximum error of the Summary * \brief the maximum error of the Summary
*/ */
@ -118,12 +117,12 @@ class WQuantileSketch {
// lastidx is used to avoid duplicated records // lastidx is used to avoid duplicated records
size_t i = 0, lastidx = 0; size_t i = 0, lastidx = 0;
for (RType k = 1; k < n; ++k) { for (RType k = 1; k < n; ++k) {
RType d2 = (k * max_rank) / n * 2; RType dx2 = (k * max_rank) / n * 2;
// find first i such that d < (rmax[i+1] + rmin[i+1]) / 2 // find first i such that d < (rmax[i+1] + rmin[i+1]) / 2
while (i < src.size - 1 && while (i < src.size - 1 &&
d2 < src.data[i + 1].rmax + src.data[i + 1].rmin) ++i; dx2 < src.data[i + 1].rmax + src.data[i + 1].rmin) ++i;
if (i == src.size - 1) break; if (i == src.size - 1) break;
if (d2 < src.data[i].rmin_next() + src.data[i + 1].rmax_prev()) { if (dx2 < src.data[i].rmin_next() + src.data[i + 1].rmax_prev()) {
if (i != lastidx) { if (i != lastidx) {
data[size++] = src.data[i]; lastidx = i; data[size++] = src.data[i]; lastidx = i;
} }
@ -155,13 +154,13 @@ class WQuantileSketch {
bprev_rmin = b->rmin_next(); bprev_rmin = b->rmin_next();
++dst; ++a; ++b; ++dst; ++a; ++b;
} else if (a->value < b->value) { } else if (a->value < b->value) {
*dst = Entry(bprev_rmin + a->rmin, *dst = Entry(a->rmin + bprev_rmin,
a->rmax + b->rmax_prev(), a->rmax + b->rmax_prev(),
a->wmin, a->value); a->wmin, a->value);
aprev_rmin = a->rmin_next(); aprev_rmin = a->rmin_next();
++dst; ++a; ++dst; ++a;
} else { } else {
*dst = Entry(aprev_rmin + b->rmin, *dst = Entry(b->rmin + aprev_rmin,
b->rmax + a->rmax_prev(), b->rmax + a->rmax_prev(),
b->wmin, b->value); b->wmin, b->value);
bprev_rmin = b->rmin_next(); bprev_rmin = b->rmin_next();
@ -171,14 +170,14 @@ class WQuantileSketch {
if (a != a_end) { if (a != a_end) {
RType brmax = (b_end - 1)->rmax; RType brmax = (b_end - 1)->rmax;
do { do {
*dst = Entry(bprev_rmin + a->rmin, brmax + a->rmax, a->wmin, a->value); *dst = Entry(a->rmin + bprev_rmin, a->rmax + brmax, a->wmin, a->value);
++dst; ++a; ++dst; ++a;
} while (a != a_end); } while (a != a_end);
} }
if (b != b_end) { if (b != b_end) {
RType armax = (a_end - 1)->rmax; RType armax = (a_end - 1)->rmax;
do { do {
*dst = Entry(aprev_rmin + b->rmin, armax + b->rmax, b->wmin, b->value); *dst = Entry(b->rmin + aprev_rmin, b->rmax + armax, b->wmin, b->value);
++dst; ++b; ++dst; ++b;
} while (b != b_end); } while (b != b_end);
} }
@ -236,12 +235,12 @@ class WQuantileSketch {
++b; ++b;
} }
nlevel += 1; nlevel += 1;
level_batch = (b + 1) / 2 + 1; limit_size = (b + 1) / 2 + 1;
// lazy reserve the space, if there is only one value, no need to allocate space // lazy reserve the space, if there is only one value, no need to allocate space
inqueue.resize(1); inqueue.resize(1);
data.resize(0);
level.resize(0);
qtail = 0; qtail = 0;
data.clear();
level.clear();
} }
/*! /*!
* \brief add an element to a sketch * \brief add an element to a sketch
@ -249,22 +248,24 @@ class WQuantileSketch {
*/ */
inline void Add(DType x, RType w = 1) { inline void Add(DType x, RType w = 1) {
if (qtail == inqueue.size()) { if (qtail == inqueue.size()) {
// jump from lazy one value to level_batch * 2 // jump from lazy one value to limit_size * 2
if (inqueue.size() == 1) { if (inqueue.size() == 1) {
inqueue.resize(level_batch * 2); inqueue.resize(limit_size * 2);
} else { } else {
temp.Reserve(2 * level_batch); temp.Reserve(limit_size * 2);
this->Queue2Summary(&temp); this->Queue2Summary(&temp);
// cleanup queue
qtail = 0;
for (size_t l = 1; true; ++l) { for (size_t l = 1; true; ++l) {
this->InitLevel(std::max(l + 1, nlevel)); this->InitLevel(std::max(l + 1, nlevel));
// check if level l is empty // check if level l is empty
if (level[l].size == 0) { if (level[l].size == 0) {
level[l].SetPrune(temp, level_batch); break; level[l].SetPrune(temp, limit_size); break;
} else { } else {
// level 0 is actually temp space // level 0 is actually temp space
level[0].SetPrune(temp, level_batch); level[0].SetPrune(temp, limit_size);
temp.SetCombine(level[0], level[l]); temp.SetCombine(level[0], level[l]);
if (temp.size > level_batch) { if (temp.size > limit_size) {
// try next level // try next level
level[l].size = 0; level[l].size = 0;
} else { } else {
@ -284,18 +285,18 @@ class WQuantileSketch {
/*! \brief get the summary after finalize */ /*! \brief get the summary after finalize */
inline void GetSummary(SummaryContainer *out) { inline void GetSummary(SummaryContainer *out) {
if (level.size() != 0) { if (level.size() != 0) {
out->Reserve(level_batch * 2); out->Reserve(limit_size * 2);
} }
this->Queue2Summary(out); this->Queue2Summary(out);
if (level.size() != 0) { if (level.size() != 0) {
level[0].SetPrune(*out, level_batch); level[0].SetPrune(*out, limit_size);
for (size_t l = 1; l < level.size(); ++l) { for (size_t l = 1; l < level.size(); ++l) {
if (level[l].size == 0) continue; if (level[l].size == 0) continue;
if (level[0].size == 0) { if (level[0].size == 0) {
level[0].CopyFrom(level[l]); level[0].CopyFrom(level[l]);
} else { } else {
out->SetCombine(level[0], level[l]); out->SetCombine(level[0], level[l]);
level[0].SetPrune(*out, level_batch); level[0].SetPrune(*out, limit_size);
} }
} }
out->CopyFrom(level[0]); out->CopyFrom(level[0]);
@ -306,10 +307,10 @@ class WQuantileSketch {
// initialize level space to at least nlevel // initialize level space to at least nlevel
inline void InitLevel(size_t nlevel) { inline void InitLevel(size_t nlevel) {
if (level.size() >= nlevel) return; if (level.size() >= nlevel) return;
data.resize(level_batch * nlevel); data.resize(limit_size * nlevel);
level.resize(nlevel, Summary()); level.resize(nlevel, Summary());
for (size_t l = 0; l < level.size(); ++l) { for (size_t l = 0; l < level.size(); ++l) {
level[l].data = BeginPtr(data) + l * level_batch; level[l].data = BeginPtr(data) + l * limit_size;
} }
} }
inline void Queue2Summary(SummaryContainer *temp) { inline void Queue2Summary(SummaryContainer *temp) {
@ -321,7 +322,7 @@ class WQuantileSketch {
RType wsum = 0; RType wsum = 0;
// construct data with unique weights // construct data with unique weights
for (size_t i = 0; i < qtail;) { for (size_t i = 0; i < qtail;) {
size_t j = 1; size_t j = i + 1;
RType w = inqueue[i].weight; RType w = inqueue[i].weight;
while (j < qtail && inqueue[j].value == inqueue[i].value) { while (j < qtail && inqueue[j].value == inqueue[i].value) {
w += inqueue[j].weight; ++j; w += inqueue[j].weight; ++j;
@ -329,8 +330,6 @@ class WQuantileSketch {
temp->data[temp->size++] = Entry(wsum, wsum + w, w, inqueue[i].value); temp->data[temp->size++] = Entry(wsum, wsum + w, w, inqueue[i].value);
wsum += w; i = j; wsum += w; i = j;
} }
// clean up queue
qtail = 0;
} }
// entry in the queue // entry in the queue
struct QEntry { struct QEntry {
@ -355,7 +354,7 @@ class WQuantileSketch {
// number of levels // number of levels
size_t nlevel; size_t nlevel;
// size of summary in each level // size of summary in each level
size_t level_batch; size_t limit_size;
// the level of each summaries // the level of each summaries
std::vector<Summary> level; std::vector<Summary> level;
// content of the summary // content of the summary
@ -531,12 +530,12 @@ class GKQuantileSketch {
} }
L += 1; L += 1;
inqueue.resize(b); inqueue.resize(b);
level_batch = (b + 1) / 2 + 1; limit_size = (b + 1) / 2 + 1;
temp.Reserve(level_batch * 2); temp.Reserve(limit_size * 2);
data.resize(level_batch * L); data.resize(limit_size * L);
for (size_t l = 0; l < L; ++l) { for (size_t l = 0; l < L; ++l) {
Summary s; s.size = 0; Summary s; s.size = 0;
s.data = BeginPtr(data) + l * level_batch; s.data = BeginPtr(data) + l * limit_size;
level.push_back(s); level.push_back(s);
} }
qtail = 0; qtail = 0;
@ -559,11 +558,11 @@ class GKQuantileSketch {
for (size_t l = 1; l < level.size(); ++l) { for (size_t l = 1; l < level.size(); ++l) {
// check if level l is empty // check if level l is empty
if (level[l].size == 0) { if (level[l].size == 0) {
level[l].SetPrune(temp, level_batch); level[l].SetPrune(temp, limit_size);
return; return;
} else { } else {
// level 0 is actually temp space // level 0 is actually temp space
level[0].SetPrune(temp, level_batch); level[0].SetPrune(temp, limit_size);
temp.SetCombine(level[0], level[l]); temp.SetCombine(level[0], level[l]);
level[l].size = 0; level[l].size = 0;
} }
@ -583,10 +582,10 @@ class GKQuantileSketch {
temp.data[i] = Entry(i + 1, i + 1, inqueue[i]); temp.data[i] = Entry(i + 1, i + 1, inqueue[i]);
} }
temp.size = static_cast<RType>(qtail); temp.size = static_cast<RType>(qtail);
if (temp.size < level_batch) { if (temp.size < limit_size) {
level[0].CopyFrom(temp); level[0].CopyFrom(temp);
} else { } else {
level[0].SetPrune(temp, level_batch); level[0].SetPrune(temp, limit_size);
} }
// start adding other things in // start adding other things in
for (size_t l = 1; l < level.size(); ++l) { for (size_t l = 1; l < level.size(); ++l) {
@ -595,7 +594,7 @@ class GKQuantileSketch {
level[0].CopyFrom(level[l]); level[0].CopyFrom(level[l]);
} else { } else {
temp.SetCombine(level[0], level[l]); temp.SetCombine(level[0], level[l]);
level[0].SetPrune(temp, level_batch); level[0].SetPrune(temp, limit_size);
} }
level[l].size = 0; level[l].size = 0;
} }
@ -611,7 +610,7 @@ class GKQuantileSketch {
// end of the queue // end of the queue
size_t qtail; size_t qtail;
// size of summary in each level // size of summary in each level
size_t level_batch; size_t limit_size;
// content of the summary // content of the summary
std::vector<Entry> data; std::vector<Entry> data;
// different level of summary // different level of summary