From 1b9ed4a4a1ed80f77cbf30448fc2e0c417130755 Mon Sep 17 00:00:00 2001 From: Philip Hyunsu Cho Date: Wed, 1 Nov 2023 10:03:56 -0700 Subject: [PATCH 1/4] [CI] Improve CI for Mac M1 (#9748) * [CI] Improve CI for Mac M1 * Add -v flag * Disable OpenMP in libxgboost4j.dylib * Target MacOS 10.15+ to use C++17 --- CMakeLists.txt | 3 +++ tests/buildkite/test-macos-m1-clang11.sh | 19 +++++++++++-------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e93427ed9..4ad10723f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,6 +32,9 @@ elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") message(FATAL_ERROR "Need Clang 9.0 or newer to build XGBoost") endif() endif() +if(APPLE) + set(CMAKE_MACOSX_DEPLOYMENT_TARGET 10.15) +endif() include(${xgboost_SOURCE_DIR}/cmake/PrefetchIntrinsics.cmake) find_prefetch_intrinsics() diff --git a/tests/buildkite/test-macos-m1-clang11.sh b/tests/buildkite/test-macos-m1-clang11.sh index fdd1aba84..cc5406810 100755 --- a/tests/buildkite/test-macos-m1-clang11.sh +++ b/tests/buildkite/test-macos-m1-clang11.sh @@ -12,14 +12,17 @@ sysctl -n machdep.cpu.brand_string uname -m set +x -# Create new Conda env -echo "--- Set up Conda env" -. $HOME/mambaforge/etc/profile.d/conda.sh -. $HOME/mambaforge/etc/profile.d/mamba.sh -conda_env=xgboost_dev_$(uuidgen | tr '[:upper:]' '[:lower:]' | tr -d '-') -mamba create -y -n ${conda_env} python=3.8 -conda activate ${conda_env} -mamba env update -n ${conda_env} --file tests/ci_build/conda_env/macos_cpu_test.yml +# Build XGBoost4J binary +echo "--- Build libxgboost4j.dylib" +set -x +mkdir build +pushd build +export JAVA_HOME=$(/usr/libexec/java_home) +cmake .. -GNinja -DJVM_BINDINGS=ON -DUSE_OPENMP=OFF +ninja -v +popd +rm -rf build +set +x # Ensure that XGBoost can be built with Clang 11 echo "--- Build and Test XGBoost with MacOS M1, Clang 11" From 0ff8572737fa41fd63e76f478ceaab8c0f97f567 Mon Sep 17 00:00:00 2001 From: Philip Hyunsu Cho Date: Wed, 1 Nov 2023 11:20:28 -0700 Subject: [PATCH 2/4] [CI] Build libxgboost4j.dylib with CMAKE_OSX_DEPLOYMENT_TARGET (#9749) --- CMakeLists.txt | 3 --- tests/buildkite/test-macos-m1-clang11.sh | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4ad10723f..e93427ed9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,9 +32,6 @@ elseif(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") message(FATAL_ERROR "Need Clang 9.0 or newer to build XGBoost") endif() endif() -if(APPLE) - set(CMAKE_MACOSX_DEPLOYMENT_TARGET 10.15) -endif() include(${xgboost_SOURCE_DIR}/cmake/PrefetchIntrinsics.cmake) find_prefetch_intrinsics() diff --git a/tests/buildkite/test-macos-m1-clang11.sh b/tests/buildkite/test-macos-m1-clang11.sh index cc5406810..97b4fef93 100755 --- a/tests/buildkite/test-macos-m1-clang11.sh +++ b/tests/buildkite/test-macos-m1-clang11.sh @@ -18,7 +18,7 @@ set -x mkdir build pushd build export JAVA_HOME=$(/usr/libexec/java_home) -cmake .. -GNinja -DJVM_BINDINGS=ON -DUSE_OPENMP=OFF +cmake .. -GNinja -DJVM_BINDINGS=ON -DUSE_OPENMP=OFF -DCMAKE_OSX_DEPLOYMENT_TARGET=10.15 ninja -v popd rm -rf build From 4da4e092b51aab7df1e8e2333faef9e62ac5cbb9 Mon Sep 17 00:00:00 2001 From: Jiaming Yuan Date: Thu, 2 Nov 2023 04:06:46 +0800 Subject: [PATCH 3/4] [coll] Improvements and fixes for tracker and allreduce. (#9745) - Allow the tracker to wait. - Fix allreduce type cast - Return args from the federated tracker. --- plugin/federated/federated_tracker.cc | 33 +++--- plugin/federated/federated_tracker.h | 5 +- src/collective/coll.cc | 107 ++++++++++++------ src/collective/tracker.cc | 34 +++++- src/collective/tracker.h | 16 +-- src/common/timer.h | 1 + tests/cpp/collective/test_tracker.cc | 9 ++ .../federated/test_federated_tracker.cc | 36 ++++++ 8 files changed, 184 insertions(+), 57 deletions(-) create mode 100644 tests/cpp/plugin/federated/test_federated_tracker.cc diff --git a/plugin/federated/federated_tracker.cc b/plugin/federated/federated_tracker.cc index aca468d32..37b6c3639 100644 --- a/plugin/federated/federated_tracker.cc +++ b/plugin/federated/federated_tracker.cc @@ -6,7 +6,6 @@ #include // for InsecureServerCredentials, ... #include // for ServerBuilder -#include // for ms #include // for int32_t #include // for exception #include // for numeric_limits @@ -61,7 +60,7 @@ FederatedTracker::FederatedTracker(Json const& config) : Tracker{config} { } std::future FederatedTracker::Run() { - return std::async([this]() { + return std::async(std::launch::async, [this]() { std::string const server_address = "0.0.0.0:" + std::to_string(this->port_); xgboost::collective::federated::FederatedService service{ static_cast(this->n_workers_)}; @@ -98,10 +97,13 @@ std::future FederatedTracker::Run() { try { server_ = builder.BuildAndStart(); + ready_ = true; server_->Wait(); } catch (std::exception const& e) { return collective::Fail(std::string{e.what()}); } + + ready_ = false; return collective::Success(); }); } @@ -109,18 +111,8 @@ std::future FederatedTracker::Run() { FederatedTracker::~FederatedTracker() = default; Result FederatedTracker::Shutdown() { - common::Timer timer; - timer.Start(); - using namespace std::chrono_literals; - while (!server_) { - timer.Stop(); - auto ela = timer.ElapsedSeconds(); - if (ela > this->Timeout().count()) { - return Fail("Failed to shutdown, timeout:" + std::to_string(this->Timeout().count()) + - " seconds."); - } - std::this_thread::sleep_for(10ms); - } + auto rc = this->WaitUntilReady(); + CHECK(rc.OK()) << rc.Report(); try { server_->Shutdown(); @@ -130,4 +122,17 @@ Result FederatedTracker::Shutdown() { return Success(); } + +[[nodiscard]] Json FederatedTracker::WorkerArgs() const { + auto rc = this->WaitUntilReady(); + CHECK(rc.OK()) << rc.Report(); + + std::string host; + rc = GetHostAddress(&host); + CHECK(rc.OK()); + Json args{Object{}}; + args["DMLC_TRACKER_URI"] = String{host}; + args["DMLC_TRACKER_PORT"] = this->Port(); + return args; +} } // namespace xgboost::collective diff --git a/plugin/federated/federated_tracker.h b/plugin/federated/federated_tracker.h index 9ad48bee1..33592fefe 100644 --- a/plugin/federated/federated_tracker.h +++ b/plugin/federated/federated_tracker.h @@ -57,9 +57,8 @@ class FederatedTracker : public collective::Tracker { explicit FederatedTracker(Json const& config); ~FederatedTracker() override; std::future Run() override; - // federated tracker do not provide initialization parameters, users have to provide it - // themseleves. - [[nodiscard]] Json WorkerArgs() const override { return Json{Null{}}; } + + [[nodiscard]] Json WorkerArgs() const override; [[nodiscard]] Result Shutdown(); }; } // namespace xgboost::collective diff --git a/src/collective/coll.cc b/src/collective/coll.cc index 598e6129d..755d44c90 100644 --- a/src/collective/coll.cc +++ b/src/collective/coll.cc @@ -3,19 +3,35 @@ */ #include "coll.h" -#include // for min, max -#include // for size_t -#include // for int8_t, int64_t -#include // for bit_and, bit_or, bit_xor, plus +#include // for min, max, copy_n +#include // for size_t +#include // for int8_t, int64_t +#include // for bit_and, bit_or, bit_xor, plus +#include // for is_floating_point_v, is_same_v +#include // for move -#include "allgather.h" // for RingAllgatherV, RingAllgather -#include "allreduce.h" // for Allreduce -#include "broadcast.h" // for Broadcast -#include "comm.h" // for Comm +#include "../data/array_interface.h" // for ArrayInterfaceHandler +#include "allgather.h" // for RingAllgatherV, RingAllgather +#include "allreduce.h" // for Allreduce +#include "broadcast.h" // for Broadcast +#include "comm.h" // for Comm + +#if defined(XGBOOST_USE_CUDA) +#include "cuda_fp16.h" // for __half +#endif namespace xgboost::collective { +template +bool constexpr IsFloatingPointV() { +#if defined(XGBOOST_USE_CUDA) + return std::is_floating_point_v || std::is_same_v; +#else + return std::is_floating_point_v; +#endif // defined(XGBOOST_USE_CUDA) +} + [[nodiscard]] Result Coll::Allreduce(Comm const& comm, common::Span data, - ArrayInterfaceHandler::Type, Op op) { + ArrayInterfaceHandler::Type type, Op op) { namespace coll = ::xgboost::collective; auto redop_fn = [](auto lhs, auto out, auto elem_op) { @@ -25,32 +41,59 @@ namespace xgboost::collective { p_out[i] = elem_op(p_lhs[i], p_out[i]); } }; - auto fn = [&](auto elem_op) { - return coll::Allreduce( - comm, data, [redop_fn, elem_op](auto lhs, auto rhs) { redop_fn(lhs, rhs, elem_op); }); + + auto fn = [&](auto elem_op, auto t) { + using T = decltype(t); + auto erased_fn = [redop_fn, elem_op](common::Span lhs, + common::Span out) { + CHECK_EQ(lhs.size(), out.size()) << "Invalid input for reduction."; + auto lhs_t = common::RestoreType(lhs); + auto rhs_t = common::RestoreType(out); + + redop_fn(lhs_t, rhs_t, elem_op); + }; + + return cpu_impl::RingAllreduce(comm, data, erased_fn, type); }; - switch (op) { - case Op::kMax: { - return fn([](auto l, auto r) { return std::max(l, r); }); + auto rc = DispatchDType(type, [&](auto t) { + using T = decltype(t); + switch (op) { + case Op::kMax: { + return fn([](auto l, auto r) { return std::max(l, r); }, t); + } + case Op::kMin: { + return fn([](auto l, auto r) { return std::min(l, r); }, t); + } + case Op::kSum: { + return fn(std::plus<>{}, t); + } + case Op::kBitwiseAND: { + if constexpr (IsFloatingPointV()) { + return Fail("Invalid type."); + } else { + return fn(std::bit_and<>{}, t); + } + } + case Op::kBitwiseOR: { + if constexpr (IsFloatingPointV()) { + return Fail("Invalid type."); + } else { + return fn(std::bit_or<>{}, t); + } + } + case Op::kBitwiseXOR: { + if constexpr (IsFloatingPointV()) { + return Fail("Invalid type."); + } else { + return fn(std::bit_xor<>{}, t); + } + } } - case Op::kMin: { - return fn([](auto l, auto r) { return std::min(l, r); }); - } - case Op::kSum: { - return fn(std::plus<>{}); - } - case Op::kBitwiseAND: { - return fn(std::bit_and<>{}); - } - case Op::kBitwiseOR: { - return fn(std::bit_or<>{}); - } - case Op::kBitwiseXOR: { - return fn(std::bit_xor<>{}); - } - } - return comm.Block(); + return Fail("Invalid op."); + }); + + return std::move(rc) << [&] { return comm.Block(); }; } [[nodiscard]] Result Coll::Broadcast(Comm const& comm, common::Span data, diff --git a/src/collective/tracker.cc b/src/collective/tracker.cc index 043e93359..4837e2ace 100644 --- a/src/collective/tracker.cc +++ b/src/collective/tracker.cc @@ -16,7 +16,7 @@ #endif // defined(_WIN32) #include // for sort -#include // for seconds +#include // for seconds, ms #include // for int32_t #include // for string #include // for move, forward @@ -37,6 +37,25 @@ Tracker::Tracker(Json const& config) timeout_{std::chrono::seconds{OptionalArg( config, "timeout", static_cast(collective::DefaultTimeoutSec()))}} {} +Result Tracker::WaitUntilReady() const { + using namespace std::chrono_literals; // NOLINT + + // Busy waiting. The function is mostly for waiting for the OS to launch an async + // thread, which should be reasonably fast. + common::Timer timer; + timer.Start(); + while (!this->Ready()) { + auto ela = timer.Duration().count(); + if (ela > this->Timeout().count()) { + return Fail("Failed to start tracker, timeout:" + std::to_string(this->Timeout().count()) + + " seconds."); + } + std::this_thread::sleep_for(100ms); + } + + return Success(); +} + RabitTracker::WorkerProxy::WorkerProxy(std::int32_t world, TCPSocket sock, SockAddrV4 addr) : sock_{std::move(sock)} { auto host = addr.Addr(); @@ -76,6 +95,7 @@ RabitTracker::RabitTracker(Json const& config) : Tracker{config} { auto rc = collective::GetHostAddress(&self); auto host = OptionalArg(config, "host", self); + host_ = host; listener_ = TCPSocket::Create(SockDomain::kV4); rc = listener_.Bind(host, &this->port_); CHECK(rc.OK()) << rc.Report(); @@ -173,6 +193,7 @@ Result RabitTracker::Bootstrap(std::vector* p_workers) { while (state.ShouldContinue()) { TCPSocket sock; SockAddrV4 addr; + this->ready_ = true; auto rc = listener_.Accept(&sock, &addr); if (!rc.OK()) { return Fail("Failed to accept connection.", std::move(rc)); @@ -237,10 +258,21 @@ Result RabitTracker::Bootstrap(std::vector* p_workers) { } } } + ready_ = false; return Success(); }); } +[[nodiscard]] Json RabitTracker::WorkerArgs() const { + auto rc = this->WaitUntilReady(); + CHECK(rc.OK()) << rc.Report(); + + Json args{Object{}}; + args["DMLC_TRACKER_URI"] = String{host_}; + args["DMLC_TRACKER_PORT"] = this->Port(); + return args; +} + [[nodiscard]] Result GetHostAddress(std::string* out) { auto rc = GetHostName(out); if (!rc.OK()) { diff --git a/src/collective/tracker.h b/src/collective/tracker.h index f90373220..24e47bb4e 100644 --- a/src/collective/tracker.h +++ b/src/collective/tracker.h @@ -40,6 +40,7 @@ class Tracker { std::int32_t n_workers_{0}; std::int32_t port_{-1}; std::chrono::seconds timeout_{0}; + std::atomic ready_{false}; public: explicit Tracker(Json const& config); @@ -47,10 +48,17 @@ class Tracker { : n_workers_{n_worders}, port_{port}, timeout_{timeout} {} virtual ~Tracker() noexcept(false){}; // NOLINT + + [[nodiscard]] Result WaitUntilReady() const; + [[nodiscard]] virtual std::future Run() = 0; [[nodiscard]] virtual Json WorkerArgs() const = 0; [[nodiscard]] std::chrono::seconds Timeout() const { return timeout_; } [[nodiscard]] virtual std::int32_t Port() const { return port_; } + /** + * @brief Flag to indicate whether the server is running. + */ + [[nodiscard]] bool Ready() const { return ready_; } }; class RabitTracker : public Tracker { @@ -124,13 +132,7 @@ class RabitTracker : public Tracker { ~RabitTracker() noexcept(false) override = default; std::future Run() override; - - [[nodiscard]] Json WorkerArgs() const override { - Json args{Object{}}; - args["DMLC_TRACKER_URI"] = String{host_}; - args["DMLC_TRACKER_PORT"] = this->Port(); - return args; - } + [[nodiscard]] Json WorkerArgs() const override; }; // Prob the public IP address of the host, need a better method. diff --git a/src/common/timer.h b/src/common/timer.h index 3daaeda8c..80748e7b8 100644 --- a/src/common/timer.h +++ b/src/common/timer.h @@ -29,6 +29,7 @@ struct Timer { void Start() { start = ClockT::now(); } void Stop() { elapsed += ClockT::now() - start; } double ElapsedSeconds() const { return SecondsT(elapsed).count(); } + SecondsT Duration() const { return ClockT::now() - start; } void PrintElapsed(std::string label) { char buffer[255]; snprintf(buffer, sizeof(buffer), "%s:\t %fs", label.c_str(), diff --git a/tests/cpp/collective/test_tracker.cc b/tests/cpp/collective/test_tracker.cc index 8fc5f0b3f..0dce33c0c 100644 --- a/tests/cpp/collective/test_tracker.cc +++ b/tests/cpp/collective/test_tracker.cc @@ -27,9 +27,15 @@ class PrintWorker : public WorkerForTest { TEST_F(TrackerTest, Bootstrap) { RabitTracker tracker{host, n_workers, 0, timeout}; + ASSERT_FALSE(tracker.Ready()); auto fut = tracker.Run(); std::vector workers; + + auto args = tracker.WorkerArgs(); + ASSERT_TRUE(tracker.Ready()); + ASSERT_EQ(get(args["DMLC_TRACKER_URI"]), host); + std::int32_t port = tracker.Port(); for (std::int32_t i = 0; i < n_workers; ++i) { @@ -47,6 +53,9 @@ TEST_F(TrackerTest, Print) { auto fut = tracker.Run(); std::vector workers; + auto rc = tracker.WaitUntilReady(); + ASSERT_TRUE(rc.OK()); + std::int32_t port = tracker.Port(); for (std::int32_t i = 0; i < n_workers; ++i) { diff --git a/tests/cpp/plugin/federated/test_federated_tracker.cc b/tests/cpp/plugin/federated/test_federated_tracker.cc new file mode 100644 index 000000000..81ff95540 --- /dev/null +++ b/tests/cpp/plugin/federated/test_federated_tracker.cc @@ -0,0 +1,36 @@ +/** + * Copyright 2023, XGBoost Contributors + */ +#include + +#include // for make_unique +#include // for string + +#include "../../../../src/collective/tracker.h" // for GetHostAddress +#include "federated_tracker.h" +#include "test_worker.h" +#include "xgboost/json.h" // for Json + +namespace xgboost::collective { +TEST(FederatedTrackerTest, Basic) { + Json config{Object()}; + config["federated_secure"] = Boolean{false}; + config["n_workers"] = Integer{3}; + + auto tracker = std::make_unique(config); + ASSERT_FALSE(tracker->Ready()); + auto fut = tracker->Run(); + auto args = tracker->WorkerArgs(); + ASSERT_TRUE(tracker->Ready()); + + ASSERT_GE(tracker->Port(), 1); + std::string host; + auto rc = GetHostAddress(&host); + ASSERT_EQ(get(args["DMLC_TRACKER_URI"]), host); + + rc = tracker->Shutdown(); + ASSERT_TRUE(rc.OK()); + ASSERT_TRUE(fut.get().OK()); + ASSERT_FALSE(tracker->Ready()); +} +} // namespace xgboost::collective From be20df8c23c063f9b5ff242e66c29ebd66578ca6 Mon Sep 17 00:00:00 2001 From: david-cortes Date: Thu, 2 Nov 2023 00:20:44 +0100 Subject: [PATCH 4/4] [Python] Accept numpy generators as `random_state` (#9743) * accept numpy generators for random_state * make linter happy * fix tests --- python-package/xgboost/sklearn.py | 10 ++++++++-- tests/python/test_with_sklearn.py | 4 ++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/python-package/xgboost/sklearn.py b/python-package/xgboost/sklearn.py index cb738477b..d5e20439a 100644 --- a/python-package/xgboost/sklearn.py +++ b/python-package/xgboost/sklearn.py @@ -248,7 +248,7 @@ __model_doc = f""" Balancing of positive and negative weights. base_score : Optional[float] The initial prediction score of all instances, global bias. - random_state : Optional[Union[numpy.random.RandomState, int]] + random_state : Optional[Union[numpy.random.RandomState, numpy.random.Generator, int]] Random number seed. .. note:: @@ -651,7 +651,9 @@ class XGBModel(XGBModelBase): reg_lambda: Optional[float] = None, scale_pos_weight: Optional[float] = None, base_score: Optional[float] = None, - random_state: Optional[Union[np.random.RandomState, int]] = None, + random_state: Optional[ + Union[np.random.RandomState, np.random.Generator, int] + ] = None, missing: float = np.nan, num_parallel_tree: Optional[int] = None, monotone_constraints: Optional[Union[Dict[str, int], str]] = None, @@ -789,6 +791,10 @@ class XGBModel(XGBModelBase): params["random_state"] = params["random_state"].randint( np.iinfo(np.int32).max ) + elif isinstance(params["random_state"], np.random.Generator): + params["random_state"] = int( + params["random_state"].integers(np.iinfo(np.int32).max) + ) return params diff --git a/tests/python/test_with_sklearn.py b/tests/python/test_with_sklearn.py index b40ae67c5..c919a01ad 100644 --- a/tests/python/test_with_sklearn.py +++ b/tests/python/test_with_sklearn.py @@ -702,6 +702,10 @@ def test_sklearn_random_state(): clf = xgb.XGBClassifier(random_state=random_state) assert isinstance(clf.get_xgb_params()['random_state'], int) + random_state = np.random.default_rng(seed=404) + clf = xgb.XGBClassifier(random_state=random_state) + assert isinstance(clf.get_xgb_params()['random_state'], int) + def test_sklearn_n_jobs(): clf = xgb.XGBClassifier(n_jobs=1)