Implement column sampler in CUDA. (#9785)

- CUDA implementation.
- Extract the broadcasting logic, we will need the context parameter after revamping the collective implementation.
- Some changes to the event loop for fixing a deadlock in CI.
- Move argsort into algorithms.cuh, add support for cuda stream.
This commit is contained in:
Jiaming Yuan
2023-11-17 04:29:08 +08:00
committed by GitHub
parent 178cfe70a8
commit fedd9674c8
20 changed files with 447 additions and 232 deletions

View File

@@ -117,11 +117,14 @@ void Loop::Process() {
break;
}
auto unlock_notify = [&](bool is_blocking) {
auto unlock_notify = [&](bool is_blocking, bool stop) {
if (!is_blocking) {
return;
std::lock_guard guard{mu_};
stop_ = stop;
} else {
stop_ = stop;
lock.unlock();
}
lock.unlock();
cv_.notify_one();
};
@@ -145,13 +148,14 @@ void Loop::Process() {
auto rc = this->EmptyQueue(&qcopy);
// Handle error
if (!rc.OK()) {
unlock_notify(is_blocking, true);
std::lock_guard<std::mutex> guard{rc_lock_};
this->rc_ = std::move(rc);
unlock_notify(is_blocking);
return;
}
CHECK(qcopy.empty());
unlock_notify(is_blocking);
unlock_notify(is_blocking, false);
}
}
@@ -170,12 +174,21 @@ Result Loop::Stop() {
}
[[nodiscard]] Result Loop::Block() {
{
std::lock_guard<std::mutex> guard{rc_lock_};
if (!rc_.OK()) {
return std::move(rc_);
}
}
this->Submit(Op{Op::kBlock});
{
std::unique_lock lock{mu_};
cv_.wait(lock, [this] { return (this->queue_.empty()) || stop_; });
}
return std::move(rc_);
{
std::lock_guard<std::mutex> lock{rc_lock_};
return std::move(rc_);
}
}
Loop::Loop(std::chrono::seconds timeout) : timeout_{timeout} {

View File

@@ -42,7 +42,10 @@ class Loop {
std::mutex mu_;
std::queue<Op> queue_;
std::chrono::seconds timeout_;
Result rc_;
std::mutex rc_lock_; // lock for transferring error info.
bool stop_{false};
std::exception_ptr curr_exce_{nullptr};
common::Monitor mutable timer_;