diff --git a/include/xgboost/collective/socket.h b/include/xgboost/collective/socket.h index 844534110..3bc3b389c 100644 --- a/include/xgboost/collective/socket.h +++ b/include/xgboost/collective/socket.h @@ -436,28 +436,38 @@ class TCPSocket { * \brief Accept new connection, returns a new TCP socket for the new connection. */ TCPSocket Accept() { - HandleT newfd = accept(Handle(), nullptr, nullptr); + SockAddress addr; + TCPSocket newsock; + auto rc = this->Accept(&newsock, &addr); + SafeColl(rc); + return newsock; + } + + [[nodiscard]] Result Accept(TCPSocket *out, SockAddress *addr) { #if defined(_WIN32) auto interrupt = WSAEINTR; #else auto interrupt = EINTR; #endif - if (newfd == InvalidSocket() && system::LastError() != interrupt) { - system::ThrowAtError("accept"); + if (this->Domain() == SockDomain::kV4) { + struct sockaddr_in caddr; + socklen_t caddr_len = sizeof(caddr); + HandleT newfd = accept(Handle(), reinterpret_cast(&caddr), &caddr_len); + if (newfd == InvalidSocket() && system::LastError() != interrupt) { + return system::FailWithCode("Failed to accept."); + } + *addr = SockAddress{SockAddrV4{caddr}}; + *out = TCPSocket{newfd}; + } else { + struct sockaddr_in6 caddr; + socklen_t caddr_len = sizeof(caddr); + HandleT newfd = accept(Handle(), reinterpret_cast(&caddr), &caddr_len); + if (newfd == InvalidSocket() && system::LastError() != interrupt) { + return system::FailWithCode("Failed to accept."); + } + *addr = SockAddress{SockAddrV6{caddr}}; + *out = TCPSocket{newfd}; } - TCPSocket newsock{newfd}; - return newsock; - } - - [[nodiscard]] Result Accept(TCPSocket *out, SockAddrV4 *addr) { - struct sockaddr_in caddr; - socklen_t caddr_len = sizeof(caddr); - HandleT newfd = accept(Handle(), reinterpret_cast(&caddr), &caddr_len); - if (newfd == InvalidSocket()) { - return system::FailWithCode("Failed to accept."); - } - *addr = SockAddrV4{caddr}; - *out = TCPSocket{newfd}; return Success(); } diff --git a/python-package/xgboost/testing/__init__.py b/python-package/xgboost/testing/__init__.py index f7d9510fa..409fd0274 100644 --- a/python-package/xgboost/testing/__init__.py +++ b/python-package/xgboost/testing/__init__.py @@ -429,8 +429,8 @@ def make_categorical( categories = np.arange(0, n_categories) for col in df.columns: if rng.binomial(1, cat_ratio, size=1)[0] == 1: - df[col] = df[col].astype("category") - df[col] = df[col].cat.set_categories(categories) + df.loc[:, col] = df[col].astype("category") + df.loc[:, col] = df[col].cat.set_categories(categories) if sparsity > 0.0: for i in range(n_features): diff --git a/src/collective/coll.cc b/src/collective/coll.cc index 755d44c90..1f47d0c55 100644 --- a/src/collective/coll.cc +++ b/src/collective/coll.cc @@ -1,5 +1,5 @@ /** - * Copyright 2023, XGBoost Contributors + * Copyright 2023-2024, XGBoost Contributors */ #include "coll.h" @@ -7,6 +7,7 @@ #include // for size_t #include // for int8_t, int64_t #include // for bit_and, bit_or, bit_xor, plus +#include // for string #include // for is_floating_point_v, is_same_v #include // for move @@ -56,6 +57,8 @@ bool constexpr IsFloatingPointV() { return cpu_impl::RingAllreduce(comm, data, erased_fn, type); }; + std::string msg{"Floating point is not supported for bit wise collective operations."}; + auto rc = DispatchDType(type, [&](auto t) { using T = decltype(t); switch (op) { @@ -70,21 +73,21 @@ bool constexpr IsFloatingPointV() { } case Op::kBitwiseAND: { if constexpr (IsFloatingPointV()) { - return Fail("Invalid type."); + return Fail(msg); } else { return fn(std::bit_and<>{}, t); } } case Op::kBitwiseOR: { if constexpr (IsFloatingPointV()) { - return Fail("Invalid type."); + return Fail(msg); } else { return fn(std::bit_or<>{}, t); } } case Op::kBitwiseXOR: { if constexpr (IsFloatingPointV()) { - return Fail("Invalid type."); + return Fail(msg); } else { return fn(std::bit_xor<>{}, t); } diff --git a/src/collective/comm.cc b/src/collective/comm.cc index 783278b65..8260b28f6 100644 --- a/src/collective/comm.cc +++ b/src/collective/comm.cc @@ -75,9 +75,11 @@ Result ConnectTrackerImpl(proto::PeerInfo info, std::chrono::seconds timeout, st } << [&] { return next->NonBlocking(true); } << [&] { - SockAddrV4 addr; + SockAddress addr; return listener->Accept(prev.get(), &addr); - } << [&] { return prev->NonBlocking(true); }; + } << [&] { + return prev->NonBlocking(true); + }; if (!rc.OK()) { return rc; } @@ -157,10 +159,13 @@ Result ConnectTrackerImpl(proto::PeerInfo info, std::chrono::seconds timeout, st } for (std::int32_t r = 0; r < comm.Rank(); ++r) { - SockAddrV4 addr; auto peer = std::shared_ptr(TCPSocket::CreatePtr(comm.Domain())); - rc = std::move(rc) << [&] { return listener->Accept(peer.get(), &addr); } - << [&] { return peer->RecvTimeout(timeout); }; + rc = std::move(rc) << [&] { + SockAddress addr; + return listener->Accept(peer.get(), &addr); + } << [&] { + return peer->RecvTimeout(timeout); + }; if (!rc.OK()) { return rc; } @@ -187,7 +192,9 @@ RabitComm::RabitComm(std::string const& host, std::int32_t port, std::chrono::se : HostComm{std::move(host), port, timeout, retry, std::move(task_id)}, nccl_path_{std::move(nccl_path)} { auto rc = this->Bootstrap(timeout_, retry_, task_id_); - CHECK(rc.OK()) << rc.Report(); + if (!rc.OK()) { + SafeColl(Fail("Failed to bootstrap the communication group.", std::move(rc))); + } } #if !defined(XGBOOST_USE_NCCL) @@ -247,10 +254,12 @@ Comm* RabitComm::MakeCUDAVar(Context const*, std::shared_ptr) const { // get ring neighbors std::string snext; tracker.Recv(&snext); + if (!rc.OK()) { + return Fail("Failed to receive the rank for the next worker.", std::move(rc)); + } auto jnext = Json::Load(StringView{snext}); proto::PeerInfo ninfo{jnext}; - // get the rank of this worker this->rank_ = BootstrapPrev(ninfo.rank, world); this->tracker_.rank = rank_; @@ -258,7 +267,7 @@ Comm* RabitComm::MakeCUDAVar(Context const*, std::shared_ptr) const { std::vector> workers; rc = ConnectWorkers(*this, &listener, lport, ninfo, timeout, retry, &workers); if (!rc.OK()) { - return rc; + return Fail("Failed to connect to other workers.", std::move(rc)); } CHECK(this->channels_.empty()); diff --git a/src/collective/tracker.cc b/src/collective/tracker.cc index 88c51d8a9..3fdf75ead 100644 --- a/src/collective/tracker.cc +++ b/src/collective/tracker.cc @@ -1,5 +1,5 @@ /** - * Copyright 2023, XGBoost Contributors + * Copyright 2023-2024, XGBoost Contributors */ #if defined(__unix__) || defined(__APPLE__) #include // gethostbyname @@ -27,12 +27,14 @@ #include "tracker.h" #include "xgboost/collective/result.h" // for Result, Fail, Success #include "xgboost/collective/socket.h" // for GetHostName, FailWithCode, MakeSockAddress, ... -#include "xgboost/json.h" +#include "xgboost/json.h" // for Json namespace xgboost::collective { Tracker::Tracker(Json const& config) - : n_workers_{static_cast( - RequiredArg(config, "n_workers", __func__))}, + : sortby_{static_cast( + OptionalArg(config, "sortby", static_cast(SortBy::kHost)))}, + n_workers_{ + static_cast(RequiredArg(config, "n_workers", __func__))}, port_{static_cast(OptionalArg(config, "port", Integer::Int{0}))}, timeout_{std::chrono::seconds{OptionalArg( config, "timeout", static_cast(collective::DefaultTimeoutSec()))}} {} @@ -56,13 +58,15 @@ Result Tracker::WaitUntilReady() const { return Success(); } -RabitTracker::WorkerProxy::WorkerProxy(std::int32_t world, TCPSocket sock, SockAddrV4 addr) +RabitTracker::WorkerProxy::WorkerProxy(std::int32_t world, TCPSocket sock, SockAddress addr) : sock_{std::move(sock)} { std::int32_t rank{0}; Json jcmd; std::int32_t port{0}; - rc_ = Success() << [&] { return proto::Magic{}.Verify(&sock_); } << [&] { + rc_ = Success() << [&] { + return proto::Magic{}.Verify(&sock_); + } << [&] { return proto::Connect{}.TrackerRecv(&sock_, &world_, &rank, &task_id_); } << [&] { std::string cmd; @@ -83,8 +87,13 @@ RabitTracker::WorkerProxy::WorkerProxy(std::int32_t world, TCPSocket sock, SockA } return Success(); } << [&] { - auto host = addr.Addr(); - info_ = proto::PeerInfo{host, port, rank}; + if (addr.IsV4()) { + auto host = addr.V4().Addr(); + info_ = proto::PeerInfo{host, port, rank}; + } else { + auto host = addr.V6().Addr(); + info_ = proto::PeerInfo{host, port, rank}; + } return Success(); }; } @@ -92,19 +101,19 @@ RabitTracker::WorkerProxy::WorkerProxy(std::int32_t world, TCPSocket sock, SockA RabitTracker::RabitTracker(Json const& config) : Tracker{config} { std::string self; auto rc = collective::GetHostAddress(&self); - auto host = OptionalArg(config, "host", self); + host_ = OptionalArg(config, "host", self); - host_ = host; - listener_ = TCPSocket::Create(SockDomain::kV4); - rc = listener_.Bind(host, &this->port_); - CHECK(rc.OK()) << rc.Report(); + auto addr = MakeSockAddress(xgboost::StringView{host_}, 0); + listener_ = TCPSocket::Create(addr.IsV4() ? SockDomain::kV4 : SockDomain::kV6); + rc = listener_.Bind(host_, &this->port_); + SafeColl(rc); listener_.Listen(); } Result RabitTracker::Bootstrap(std::vector* p_workers) { auto& workers = *p_workers; - std::sort(workers.begin(), workers.end(), WorkerCmp{}); + std::sort(workers.begin(), workers.end(), WorkerCmp{this->sortby_}); std::vector bootstrap_threads; for (std::int32_t r = 0; r < n_workers_; ++r) { @@ -224,7 +233,7 @@ Result RabitTracker::Bootstrap(std::vector* p_workers) { while (state.ShouldContinue()) { TCPSocket sock; - SockAddrV4 addr; + SockAddress addr; this->ready_ = true; auto rc = listener_.Accept(&sock, &addr); if (!rc.OK()) { @@ -291,7 +300,7 @@ Result RabitTracker::Bootstrap(std::vector* p_workers) { [[nodiscard]] Json RabitTracker::WorkerArgs() const { auto rc = this->WaitUntilReady(); - CHECK(rc.OK()) << rc.Report(); + SafeColl(rc); Json args{Object{}}; args["DMLC_TRACKER_URI"] = String{host_}; diff --git a/src/collective/tracker.h b/src/collective/tracker.h index f336a82f9..e15aaee59 100644 --- a/src/collective/tracker.h +++ b/src/collective/tracker.h @@ -1,5 +1,5 @@ /** - * Copyright 2023, XGBoost Contributors + * Copyright 2023-2024, XGBoost Contributors */ #pragma once #include // for seconds @@ -36,6 +36,16 @@ namespace xgboost::collective { * signal an error to the tracker and the tracker will notify other workers. */ class Tracker { + protected: + // How to sort the workers, either by host name or by task ID. When using a multi-GPU + // setting, multiple workers can occupy the same host, in which case one should sort + // workers by task. Due to compatibility reason, the task ID is not always available, so + // we use host as the default. + enum class SortBy : std::int8_t { + kHost = 0, + kTask = 1, + } sortby_; + protected: std::int32_t n_workers_{0}; std::int32_t port_{-1}; @@ -76,7 +86,7 @@ class RabitTracker : public Tracker { Result rc_; public: - explicit WorkerProxy(std::int32_t world, TCPSocket sock, SockAddrV4 addr); + explicit WorkerProxy(std::int32_t world, TCPSocket sock, SockAddress addr); WorkerProxy(WorkerProxy const& that) = delete; WorkerProxy(WorkerProxy&& that) = default; WorkerProxy& operator=(WorkerProxy const&) = delete; @@ -96,11 +106,14 @@ class RabitTracker : public Tracker { void Send(StringView value) { this->sock_.Send(value); } }; - // provide an ordering for workers, this helps us get deterministic topology. + // Provide an ordering for workers, this helps us get deterministic topology. struct WorkerCmp { + SortBy sortby; + explicit WorkerCmp(SortBy sortby) : sortby{sortby} {} + [[nodiscard]] bool operator()(WorkerProxy const& lhs, WorkerProxy const& rhs) { - auto const& lh = lhs.Host(); - auto const& rh = rhs.Host(); + auto const& lh = sortby == Tracker::SortBy::kHost ? lhs.Host() : lhs.TaskID(); + auto const& rh = sortby == Tracker::SortBy::kHost ? rhs.Host() : rhs.TaskID(); if (lh != rh) { return lh < rh; diff --git a/src/learner.cc b/src/learner.cc index eed9dd5cd..ca6704944 100644 --- a/src/learner.cc +++ b/src/learner.cc @@ -18,7 +18,6 @@ #include // for int32_t, uint32_t, int64_t, uint64_t #include // for atoi #include // for memcpy, size_t, memset -#include // for less #include // for operator<<, setiosflags #include // for back_insert_iterator, distance, back_inserter #include // for numeric_limits diff --git a/tests/ci_build/Dockerfile.gpu b/tests/ci_build/Dockerfile.gpu index 0a5adb6ea..698a61e93 100644 --- a/tests/ci_build/Dockerfile.gpu +++ b/tests/ci_build/Dockerfile.gpu @@ -25,7 +25,7 @@ RUN \ mamba create -n gpu_test -c rapidsai-nightly -c rapidsai -c nvidia -c conda-forge -c defaults \ python=3.10 cudf=$RAPIDS_VERSION_ARG* rmm=$RAPIDS_VERSION_ARG* cudatoolkit=$CUDA_VERSION_ARG \ nccl>=$(cut -d "-" -f 1 << $NCCL_VERSION_ARG) \ - dask \ + dask=2024.1.1 \ dask-cuda=$RAPIDS_VERSION_ARG* dask-cudf=$RAPIDS_VERSION_ARG* cupy \ numpy pytest pytest-timeout scipy scikit-learn pandas matplotlib wheel python-kubernetes urllib3 graphviz hypothesis \ pyspark>=3.4.0 cloudpickle cuda-python && \ diff --git a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py index 0d9bc2e6b..a3c2e76a0 100644 --- a/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py +++ b/tests/test_distributed/test_gpu_with_dask/test_gpu_with_dask.py @@ -252,7 +252,7 @@ class TestDistributedGPU: X_onehot, _ = make_categorical(local_cuda_client, 10000, 30, 13, True) X_onehot = dask_cudf.from_dask_dataframe(X_onehot) - run_categorical(local_cuda_client, "gpu_hist", X, X_onehot, y) + run_categorical(local_cuda_client, "hist", "cuda", X, X_onehot, y) @given( params=hist_parameter_strategy, diff --git a/tests/test_distributed/test_with_dask/test_with_dask.py b/tests/test_distributed/test_with_dask/test_with_dask.py index ffea1d058..ca55716bb 100644 --- a/tests/test_distributed/test_with_dask/test_with_dask.py +++ b/tests/test_distributed/test_with_dask/test_with_dask.py @@ -315,8 +315,15 @@ def test_dask_sparse(client: "Client") -> None: ) -def run_categorical(client: "Client", tree_method: str, X, X_onehot, y) -> None: - parameters = {"tree_method": tree_method, "max_cat_to_onehot": 9999} # force onehot +def run_categorical( + client: "Client", tree_method: str, device: str, X, X_onehot, y +) -> None: + # Force onehot + parameters = { + "tree_method": tree_method, + "device": device, + "max_cat_to_onehot": 9999, + } rounds = 10 m = xgb.dask.DaskDMatrix(client, X_onehot, y, enable_categorical=True) by_etl_results = xgb.dask.train( @@ -364,6 +371,7 @@ def run_categorical(client: "Client", tree_method: str, X, X_onehot, y) -> None: enable_categorical=True, n_estimators=10, tree_method=tree_method, + device=device, # force onehot max_cat_to_onehot=9999, ) @@ -378,7 +386,10 @@ def run_categorical(client: "Client", tree_method: str, X, X_onehot, y) -> None: reg.fit(X, y) # check partition based reg = xgb.dask.DaskXGBRegressor( - enable_categorical=True, n_estimators=10, tree_method=tree_method + enable_categorical=True, + n_estimators=10, + tree_method=tree_method, + device=device, ) reg.fit(X, y, eval_set=[(X, y)]) assert tm.non_increasing(reg.evals_result()["validation_0"]["rmse"]) @@ -398,8 +409,8 @@ def run_categorical(client: "Client", tree_method: str, X, X_onehot, y) -> None: def test_categorical(client: "Client") -> None: X, y = make_categorical(client, 10000, 30, 13) X_onehot, _ = make_categorical(client, 10000, 30, 13, True) - run_categorical(client, "approx", X, X_onehot, y) - run_categorical(client, "hist", X, X_onehot, y) + run_categorical(client, "approx", "cpu", X, X_onehot, y) + run_categorical(client, "hist", "cpu", X, X_onehot, y) ft = ["c"] * X.shape[1] reg = xgb.dask.DaskXGBRegressor(