Merge branch 'master' into sync-condition-2023Apr11

This commit is contained in:
amdsc21
2023-04-11 19:38:38 +02:00
56 changed files with 1912 additions and 983 deletions

View File

@@ -31,6 +31,5 @@ dependencies:
- pyspark
- cloudpickle
- pip:
- shap
- awscli
- auditwheel

View File

@@ -37,7 +37,6 @@ dependencies:
- pyarrow
- protobuf
- cloudpickle
- shap>=0.41
- modin
# TODO: Replace it with pyspark>=3.4 once 3.4 released.
# - https://ml-team-public-read.s3.us-west-2.amazonaws.com/pyspark-3.4.0.dev0.tar.gz

View File

@@ -146,6 +146,7 @@ def main(args: argparse.Namespace) -> None:
"tests/python/test_config.py",
"tests/python/test_data_iterator.py",
"tests/python/test_dt.py",
"tests/python/test_predict.py",
"tests/python/test_quantile_dmatrix.py",
"tests/python/test_tree_regularization.py",
"tests/python-gpu/test_gpu_data_iterator.py",

View File

@@ -0,0 +1,106 @@
/**
* Copyright 2023 by XGBoost Contributors
*/
#include "test_lambdarank_obj.h"
#include <gtest/gtest.h> // for Test, Message, TestPartResult, CmpHel...
#include <cstddef> // for size_t
#include <initializer_list> // for initializer_list
#include <map> // for map
#include <memory> // for unique_ptr, shared_ptr, make_shared
#include <numeric> // for iota
#include <string> // for char_traits, basic_string, string
#include <vector> // for vector
#include "../../../src/common/ranking_utils.h" // for LambdaRankParam
#include "../../../src/common/ranking_utils.h" // for NDCGCache, LambdaRankParam
#include "../helpers.h" // for CheckRankingObjFunction, CheckConfigReload
#include "xgboost/base.h" // for GradientPair, bst_group_t, Args
#include "xgboost/context.h" // for Context
#include "xgboost/data.h" // for MetaInfo, DMatrix
#include "xgboost/host_device_vector.h" // for HostDeviceVector
#include "xgboost/linalg.h" // for Tensor, All, TensorView
#include "xgboost/objective.h" // for ObjFunction
#include "xgboost/span.h" // for Span
namespace xgboost::obj {
void InitMakePairTest(Context const* ctx, MetaInfo* out_info, HostDeviceVector<float>* out_predt) {
out_predt->SetDevice(ctx->gpu_id);
MetaInfo& info = *out_info;
info.num_row_ = 128;
info.labels.ModifyInplace([&](HostDeviceVector<float>* data, common::Span<std::size_t> shape) {
shape[0] = info.num_row_;
shape[1] = 1;
auto& h_data = data->HostVector();
h_data.resize(shape[0]);
for (std::size_t i = 0; i < h_data.size(); ++i) {
h_data[i] = i % 2;
}
});
std::vector<float> predt(info.num_row_);
std::iota(predt.rbegin(), predt.rend(), 0.0f);
out_predt->HostVector() = predt;
}
TEST(LambdaRank, MakePair) {
Context ctx;
MetaInfo info;
HostDeviceVector<float> predt;
InitMakePairTest(&ctx, &info, &predt);
ltr::LambdaRankParam param;
param.UpdateAllowUnknown(Args{{"lambdarank_pair_method", "topk"}});
ASSERT_TRUE(param.HasTruncation());
std::shared_ptr<ltr::RankingCache> p_cache = std::make_shared<ltr::NDCGCache>(&ctx, info, param);
auto const& h_predt = predt.ConstHostVector();
{
auto rank_idx = p_cache->SortedIdx(&ctx, h_predt);
for (std::size_t i = 0; i < h_predt.size(); ++i) {
ASSERT_EQ(rank_idx[i], static_cast<std::size_t>(*(h_predt.crbegin() + i)));
}
std::int32_t n_pairs{0};
MakePairs(&ctx, 0, p_cache, 0, info.labels.HostView().Slice(linalg::All(), 0), rank_idx,
[&](auto i, auto j) {
ASSERT_GT(j, i);
ASSERT_LT(i, p_cache->Param().NumPair());
++n_pairs;
});
ASSERT_EQ(n_pairs, 3568);
}
auto const h_label = info.labels.HostView();
{
param.UpdateAllowUnknown(Args{{"lambdarank_pair_method", "mean"}});
auto p_cache = std::make_shared<ltr::NDCGCache>(&ctx, info, param);
ASSERT_FALSE(param.HasTruncation());
std::int32_t n_pairs = 0;
auto rank_idx = p_cache->SortedIdx(&ctx, h_predt);
MakePairs(&ctx, 0, p_cache, 0, info.labels.HostView().Slice(linalg::All(), 0), rank_idx,
[&](auto i, auto j) {
++n_pairs;
// Not in the same bucket
ASSERT_NE(h_label(rank_idx[i]), h_label(rank_idx[j]));
});
ASSERT_EQ(n_pairs, info.num_row_ * param.NumPair());
}
{
param.UpdateAllowUnknown(Args{{"lambdarank_num_pair_per_sample", "2"}});
auto p_cache = std::make_shared<ltr::NDCGCache>(&ctx, info, param);
auto rank_idx = p_cache->SortedIdx(&ctx, h_predt);
std::int32_t n_pairs = 0;
MakePairs(&ctx, 0, p_cache, 0, info.labels.HostView().Slice(linalg::All(), 0), rank_idx,
[&](auto i, auto j) {
++n_pairs;
// Not in the same bucket
ASSERT_NE(h_label(rank_idx[i]), h_label(rank_idx[j]));
});
ASSERT_EQ(param.NumPair(), 2);
ASSERT_EQ(n_pairs, info.num_row_ * param.NumPair());
}
}
} // namespace xgboost::obj

View File

@@ -0,0 +1,138 @@
/**
* Copyright 2023 by XGBoost Contributors
*/
#include <gtest/gtest.h>
#include <xgboost/context.h> // for Context
#include <cstdint> // for uint32_t
#include <vector> // for vector
#include "../../../src/common/cuda_context.cuh" // for CUDAContext
#include "../../../src/objective/lambdarank_obj.cuh"
#include "test_lambdarank_obj.h"
namespace xgboost::obj {
void TestGPUMakePair() {
Context ctx;
ctx.gpu_id = 0;
MetaInfo info;
HostDeviceVector<float> predt;
InitMakePairTest(&ctx, &info, &predt);
ltr::LambdaRankParam param;
auto make_args = [&](std::shared_ptr<ltr::RankingCache> p_cache, auto rank_idx,
common::Span<std::size_t const> y_sorted_idx) {
linalg::Vector<double> dummy;
auto d = dummy.View(ctx.gpu_id);
linalg::Vector<GradientPair> dgpair;
auto dg = dgpair.View(ctx.gpu_id);
cuda_impl::KernelInputs args{d,
d,
d,
d,
p_cache->DataGroupPtr(&ctx),
p_cache->CUDAThreadsGroupPtr(),
rank_idx,
info.labels.View(ctx.gpu_id),
predt.ConstDeviceSpan(),
{},
dg,
nullptr,
y_sorted_idx,
0};
return args;
};
{
param.UpdateAllowUnknown(Args{{"lambdarank_pair_method", "topk"}});
auto p_cache = std::make_shared<ltr::NDCGCache>(&ctx, info, param);
auto rank_idx = p_cache->SortedIdx(&ctx, predt.ConstDeviceSpan());
ASSERT_EQ(p_cache->CUDAThreads(), 3568);
auto args = make_args(p_cache, rank_idx, {});
auto n_pairs = p_cache->Param().NumPair();
auto make_pair = cuda_impl::MakePairsOp<true>{args};
dh::LaunchN(p_cache->CUDAThreads(), ctx.CUDACtx()->Stream(),
[=] XGBOOST_DEVICE(std::size_t idx) {
auto [i, j] = make_pair(idx, 0);
SPAN_CHECK(j > i);
SPAN_CHECK(i < n_pairs);
});
}
{
param.UpdateAllowUnknown(Args{{"lambdarank_pair_method", "mean"}});
auto p_cache = std::make_shared<ltr::NDCGCache>(&ctx, info, param);
auto rank_idx = p_cache->SortedIdx(&ctx, predt.ConstDeviceSpan());
auto y_sorted_idx = cuda_impl::SortY(&ctx, info, rank_idx, p_cache);
ASSERT_FALSE(param.HasTruncation());
ASSERT_EQ(p_cache->CUDAThreads(), info.num_row_ * param.NumPair());
auto args = make_args(p_cache, rank_idx, y_sorted_idx);
auto make_pair = cuda_impl::MakePairsOp<false>{args};
auto n_pairs = p_cache->Param().NumPair();
ASSERT_EQ(n_pairs, 1);
dh::LaunchN(
p_cache->CUDAThreads(), ctx.CUDACtx()->Stream(), [=] XGBOOST_DEVICE(std::size_t idx) {
idx = 97;
auto [i, j] = make_pair(idx, 0);
// Not in the same bucket
SPAN_CHECK(make_pair.args.labels(rank_idx[i]) != make_pair.args.labels(rank_idx[j]));
});
}
{
param.UpdateAllowUnknown(Args{{"lambdarank_num_pair_per_sample", "2"}});
auto p_cache = std::make_shared<ltr::NDCGCache>(&ctx, info, param);
auto rank_idx = p_cache->SortedIdx(&ctx, predt.ConstDeviceSpan());
auto y_sorted_idx = cuda_impl::SortY(&ctx, info, rank_idx, p_cache);
auto args = make_args(p_cache, rank_idx, y_sorted_idx);
auto make_pair = cuda_impl::MakePairsOp<false>{args};
dh::LaunchN(
p_cache->CUDAThreads(), ctx.CUDACtx()->Stream(), [=] XGBOOST_DEVICE(std::size_t idx) {
auto [i, j] = make_pair(idx, 0);
// Not in the same bucket
SPAN_CHECK(make_pair.args.labels(rank_idx[i]) != make_pair.args.labels(rank_idx[j]));
});
ASSERT_EQ(param.NumPair(), 2);
ASSERT_EQ(p_cache->CUDAThreads(), info.num_row_ * param.NumPair());
}
}
TEST(LambdaRank, GPUMakePair) { TestGPUMakePair(); }
template <typename CountFunctor>
void RankItemCountImpl(std::vector<std::uint32_t> const &sorted_items, CountFunctor f,
std::uint32_t find_val, std::uint32_t exp_val) {
EXPECT_NE(std::find(sorted_items.begin(), sorted_items.end(), find_val), sorted_items.end());
EXPECT_EQ(f(&sorted_items[0], sorted_items.size(), find_val), exp_val);
}
TEST(LambdaRank, RankItemCountOnLeft) {
// Items sorted descendingly
std::vector<std::uint32_t> sorted_items{10, 10, 6, 4, 4, 4, 4, 1, 1, 1, 1, 1, 0};
auto wrapper = [](auto const &...args) { return cuda_impl::CountNumItemsToTheLeftOf(args...); };
RankItemCountImpl(sorted_items, wrapper, 10, static_cast<uint32_t>(0));
RankItemCountImpl(sorted_items, wrapper, 6, static_cast<uint32_t>(2));
RankItemCountImpl(sorted_items, wrapper, 4, static_cast<uint32_t>(3));
RankItemCountImpl(sorted_items, wrapper, 1, static_cast<uint32_t>(7));
RankItemCountImpl(sorted_items, wrapper, 0, static_cast<uint32_t>(12));
}
TEST(LambdaRank, RankItemCountOnRight) {
// Items sorted descendingly
std::vector<std::uint32_t> sorted_items{10, 10, 6, 4, 4, 4, 4, 1, 1, 1, 1, 1, 0};
auto wrapper = [](auto const &...args) { return cuda_impl::CountNumItemsToTheRightOf(args...); };
RankItemCountImpl(sorted_items, wrapper, 10, static_cast<uint32_t>(11));
RankItemCountImpl(sorted_items, wrapper, 6, static_cast<uint32_t>(10));
RankItemCountImpl(sorted_items, wrapper, 4, static_cast<uint32_t>(6));
RankItemCountImpl(sorted_items, wrapper, 1, static_cast<uint32_t>(1));
RankItemCountImpl(sorted_items, wrapper, 0, static_cast<uint32_t>(0));
}
} // namespace xgboost::obj

View File

@@ -0,0 +1,26 @@
/**
* Copyright 2023, XGBoost Contributors
*/
#ifndef XGBOOST_OBJECTIVE_TEST_LAMBDARANK_OBJ_H_
#define XGBOOST_OBJECTIVE_TEST_LAMBDARANK_OBJ_H_
#include <gtest/gtest.h>
#include <xgboost/data.h> // for MetaInfo
#include <xgboost/host_device_vector.h> // for HostDeviceVector
#include <xgboost/linalg.h> // for All
#include <xgboost/objective.h> // for ObjFunction
#include <memory> // for shared_ptr, make_shared
#include <numeric> // for iota
#include <vector> // for vector
#include "../../../src/common/ranking_utils.h" // for LambdaRankParam, MAPCache
#include "../../../src/objective/lambdarank_obj.h" // for MAPStat
#include "../helpers.h" // for EmptyDMatrix
namespace xgboost::obj {
/**
* \brief Initialize test data for make pair tests.
*/
void InitMakePairTest(Context const* ctx, MetaInfo* out_info, HostDeviceVector<float>* out_predt);
} // namespace xgboost::obj
#endif // XGBOOST_OBJECTIVE_TEST_LAMBDARANK_OBJ_H_

View File

@@ -89,43 +89,6 @@ TEST(Objective, RankSegmentSorterAscendingTest) {
5, 4, 6});
}
using CountFunctor = uint32_t (*)(const int *, uint32_t, int);
void RankItemCountImpl(const std::vector<int> &sorted_items, CountFunctor f,
int find_val, uint32_t exp_val) {
EXPECT_NE(std::find(sorted_items.begin(), sorted_items.end(), find_val), sorted_items.end());
EXPECT_EQ(f(&sorted_items[0], sorted_items.size(), find_val), exp_val);
}
TEST(Objective, RankItemCountOnLeft) {
// Items sorted descendingly
std::vector<int> sorted_items{10, 10, 6, 4, 4, 4, 4, 1, 1, 1, 1, 1, 0};
RankItemCountImpl(sorted_items, &xgboost::obj::CountNumItemsToTheLeftOf,
10, static_cast<uint32_t>(0));
RankItemCountImpl(sorted_items, &xgboost::obj::CountNumItemsToTheLeftOf,
6, static_cast<uint32_t>(2));
RankItemCountImpl(sorted_items, &xgboost::obj::CountNumItemsToTheLeftOf,
4, static_cast<uint32_t>(3));
RankItemCountImpl(sorted_items, &xgboost::obj::CountNumItemsToTheLeftOf,
1, static_cast<uint32_t>(7));
RankItemCountImpl(sorted_items, &xgboost::obj::CountNumItemsToTheLeftOf,
0, static_cast<uint32_t>(12));
}
TEST(Objective, RankItemCountOnRight) {
// Items sorted descendingly
std::vector<int> sorted_items{10, 10, 6, 4, 4, 4, 4, 1, 1, 1, 1, 1, 0};
RankItemCountImpl(sorted_items, &xgboost::obj::CountNumItemsToTheRightOf,
10, static_cast<uint32_t>(11));
RankItemCountImpl(sorted_items, &xgboost::obj::CountNumItemsToTheRightOf,
6, static_cast<uint32_t>(10));
RankItemCountImpl(sorted_items, &xgboost::obj::CountNumItemsToTheRightOf,
4, static_cast<uint32_t>(6));
RankItemCountImpl(sorted_items, &xgboost::obj::CountNumItemsToTheRightOf,
1, static_cast<uint32_t>(1));
RankItemCountImpl(sorted_items, &xgboost::obj::CountNumItemsToTheRightOf,
0, static_cast<uint32_t>(0));
}
TEST(Objective, NDCGLambdaWeightComputerTest) {
std::vector<float> hlabels = {3.1f, 1.2f, 2.3f, 4.4f, // Labels
7.8f, 5.01f, 6.96f,

View File

@@ -0,0 +1,32 @@
/**
* Copyright (c) 2023, XGBoost contributors
*/
#include <dmlc/registry.h> // for Registry
#include <gtest/gtest.h>
#include <xgboost/objective.h> // for ObjFunctionReg
#include <algorithm> // for transform
#include <iterator> // for back_insert_iterator, back_inserter
#include <string> // for string
#include <vector> // for vector
namespace xgboost {
inline auto MakeObjNamesForTest() {
auto list = ::dmlc::Registry<::xgboost::ObjFunctionReg>::List();
std::vector<std::string> names;
std::transform(list.cbegin(), list.cend(), std::back_inserter(names),
[](auto const* entry) { return entry->name; });
return names;
}
template <typename ParamType>
inline std::string ObjTestNameGenerator(const ::testing::TestParamInfo<ParamType>& info) {
auto name = std::string{info.param};
// Name must be a valid c++ symbol
auto it = std::find(name.cbegin(), name.cend(), ':');
if (it != name.cend()) {
name[std::distance(name.cbegin(), it)] = '_';
}
return name;
};
} // namespace xgboost

View File

@@ -8,6 +8,7 @@
#include <xgboost/json.h>
#include <random>
#include <thread> // for thread, sleep_for
#include "../../../plugin/federated/federated_server.h"
#include "../../../src/collective/communicator-inl.h"
@@ -33,13 +34,17 @@ inline std::string GetServerAddress() {
namespace xgboost {
class BaseFederatedTest : public ::testing::Test {
protected:
void SetUp() override {
class ServerForTest {
std::string server_address_;
std::unique_ptr<std::thread> server_thread_;
std::unique_ptr<grpc::Server> server_;
public:
explicit ServerForTest(std::int32_t world_size) {
server_address_ = GetServerAddress();
server_thread_.reset(new std::thread([this] {
server_thread_.reset(new std::thread([this, world_size] {
grpc::ServerBuilder builder;
xgboost::federated::FederatedService service{kWorldSize};
xgboost::federated::FederatedService service{world_size};
builder.AddListeningPort(server_address_, grpc::InsecureServerCredentials());
builder.RegisterService(&service);
server_ = builder.BuildAndStart();
@@ -47,15 +52,21 @@ class BaseFederatedTest : public ::testing::Test {
}));
}
void TearDown() override {
~ServerForTest() {
server_->Shutdown();
server_thread_->join();
}
auto Address() const { return server_address_; }
};
class BaseFederatedTest : public ::testing::Test {
protected:
void SetUp() override { server_ = std::make_unique<ServerForTest>(kWorldSize); }
void TearDown() override { server_.reset(nullptr); }
static int const kWorldSize{3};
std::string server_address_;
std::unique_ptr<std::thread> server_thread_;
std::unique_ptr<grpc::Server> server_;
std::unique_ptr<ServerForTest> server_;
};
template <typename Function, typename... Args>

View File

@@ -29,7 +29,7 @@ TEST(FederatedAdapterSimpleTest, ThrowOnInvalidCommunicator) {
TEST_F(FederatedAdapterTest, DeviceAllReduceSum) {
std::vector<std::thread> threads;
for (auto rank = 0; rank < kWorldSize; rank++) {
threads.emplace_back([rank, server_address = server_address_] {
threads.emplace_back([rank, server_address = server_->Address()] {
FederatedCommunicator comm{kWorldSize, rank, server_address};
// Assign device 0 to all workers, since we run gtest in a single-GPU machine
DeviceCommunicatorAdapter adapter{0, &comm};
@@ -52,7 +52,7 @@ TEST_F(FederatedAdapterTest, DeviceAllReduceSum) {
TEST_F(FederatedAdapterTest, DeviceAllGatherV) {
std::vector<std::thread> threads;
for (auto rank = 0; rank < kWorldSize; rank++) {
threads.emplace_back([rank, server_address = server_address_] {
threads.emplace_back([rank, server_address = server_->Address()] {
FederatedCommunicator comm{kWorldSize, rank, server_address};
// Assign device 0 to all workers, since we run gtest in a single-GPU machine
DeviceCommunicatorAdapter adapter{0, &comm};

View File

@@ -92,7 +92,7 @@ TEST(FederatedCommunicatorSimpleTest, ThrowOnWorldSizeNotInteger) {
config["federated_server_address"] = server_address;
config["federated_world_size"] = std::string("1");
config["federated_rank"] = Integer(0);
auto *comm = FederatedCommunicator::Create(config);
FederatedCommunicator::Create(config);
};
EXPECT_THROW(construct(), dmlc::Error);
}
@@ -104,7 +104,7 @@ TEST(FederatedCommunicatorSimpleTest, ThrowOnRankNotInteger) {
config["federated_server_address"] = server_address;
config["federated_world_size"] = 1;
config["federated_rank"] = std::string("0");
auto *comm = FederatedCommunicator::Create(config);
FederatedCommunicator::Create(config);
};
EXPECT_THROW(construct(), dmlc::Error);
}
@@ -125,7 +125,7 @@ TEST(FederatedCommunicatorSimpleTest, IsDistributed) {
TEST_F(FederatedCommunicatorTest, Allgather) {
std::vector<std::thread> threads;
for (auto rank = 0; rank < kWorldSize; rank++) {
threads.emplace_back(&FederatedCommunicatorTest::VerifyAllgather, rank, server_address_);
threads.emplace_back(&FederatedCommunicatorTest::VerifyAllgather, rank, server_->Address());
}
for (auto &thread : threads) {
thread.join();
@@ -135,7 +135,7 @@ TEST_F(FederatedCommunicatorTest, Allgather) {
TEST_F(FederatedCommunicatorTest, Allreduce) {
std::vector<std::thread> threads;
for (auto rank = 0; rank < kWorldSize; rank++) {
threads.emplace_back(&FederatedCommunicatorTest::VerifyAllreduce, rank, server_address_);
threads.emplace_back(&FederatedCommunicatorTest::VerifyAllreduce, rank, server_->Address());
}
for (auto &thread : threads) {
thread.join();
@@ -145,7 +145,7 @@ TEST_F(FederatedCommunicatorTest, Allreduce) {
TEST_F(FederatedCommunicatorTest, Broadcast) {
std::vector<std::thread> threads;
for (auto rank = 0; rank < kWorldSize; rank++) {
threads.emplace_back(&FederatedCommunicatorTest::VerifyBroadcast, rank, server_address_);
threads.emplace_back(&FederatedCommunicatorTest::VerifyBroadcast, rank, server_->Address());
}
for (auto &thread : threads) {
thread.join();

View File

@@ -38,8 +38,8 @@ void VerifyLoadUri() {
auto index = 0;
int offsets[] = {0, 8, 17};
int offset = offsets[rank];
for (auto row = 0; row < kRows; row++) {
for (auto col = 0; col < kCols; col++) {
for (std::size_t row = 0; row < kRows; row++) {
for (std::size_t col = 0; col < kCols; col++) {
EXPECT_EQ(entries[index].index, col + offset);
index++;
}
@@ -48,6 +48,6 @@ void VerifyLoadUri() {
}
TEST_F(FederatedDataTest, LoadUri) {
RunWithFederatedCommunicator(kWorldSize, server_address_, &VerifyLoadUri);
RunWithFederatedCommunicator(kWorldSize, server_->Address(), &VerifyLoadUri);
}
} // namespace xgboost

View File

@@ -8,71 +8,113 @@
#include "../../../plugin/federated/federated_server.h"
#include "../../../src/collective/communicator-inl.h"
#include "../../../src/common/linalg_op.h"
#include "../helpers.h"
#include "../objective_helpers.h" // for MakeObjNamesForTest, ObjTestNameGenerator
#include "helpers.h"
namespace xgboost {
class FederatedLearnerTest : public BaseFederatedTest {
protected:
static auto constexpr kRows{16};
static auto constexpr kCols{16};
};
void VerifyBaseScore(size_t rows, size_t cols, float expected_base_score) {
auto const world_size = collective::GetWorldSize();
auto const rank = collective::GetRank();
std::shared_ptr<DMatrix> Xy_{RandomDataGenerator{rows, cols, 0}.GenerateDMatrix(rank == 0)};
std::shared_ptr<DMatrix> sliced{Xy_->SliceCol(world_size, rank)};
std::unique_ptr<Learner> learner{Learner::Create({sliced})};
namespace {
auto MakeModel(std::string objective, std::shared_ptr<DMatrix> dmat) {
std::unique_ptr<Learner> learner{Learner::Create({dmat})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", "binary:logistic");
learner->UpdateOneIter(0, sliced);
learner->SetParam("objective", objective);
if (objective.find("quantile") != std::string::npos) {
learner->SetParam("quantile_alpha", "0.5");
}
if (objective.find("multi") != std::string::npos) {
learner->SetParam("num_class", "3");
}
learner->UpdateOneIter(0, dmat);
Json config{Object{}};
learner->SaveConfig(&config);
auto base_score = GetBaseScore(config);
ASSERT_EQ(base_score, expected_base_score);
}
void VerifyModel(size_t rows, size_t cols, Json const& expected_model) {
auto const world_size = collective::GetWorldSize();
auto const rank = collective::GetRank();
std::shared_ptr<DMatrix> Xy_{RandomDataGenerator{rows, cols, 0}.GenerateDMatrix(rank == 0)};
std::shared_ptr<DMatrix> sliced{Xy_->SliceCol(world_size, rank)};
std::unique_ptr<Learner> learner{Learner::Create({sliced})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", "binary:logistic");
learner->UpdateOneIter(0, sliced);
Json model{Object{}};
learner->SaveModel(&model);
return model;
}
void VerifyObjective(size_t rows, size_t cols, float expected_base_score, Json expected_model,
std::string objective) {
auto const world_size = collective::GetWorldSize();
auto const rank = collective::GetRank();
std::shared_ptr<DMatrix> dmat{RandomDataGenerator{rows, cols, 0}.GenerateDMatrix(rank == 0)};
if (rank == 0) {
auto &h_upper = dmat->Info().labels_upper_bound_.HostVector();
auto &h_lower = dmat->Info().labels_lower_bound_.HostVector();
h_lower.resize(rows);
h_upper.resize(rows);
for (size_t i = 0; i < rows; ++i) {
h_lower[i] = 1;
h_upper[i] = 10;
}
if (objective.find("rank:") != std::string::npos) {
auto h_label = dmat->Info().labels.HostView();
std::size_t k = 0;
for (auto &v : h_label) {
v = k % 2 == 0;
++k;
}
}
}
std::shared_ptr<DMatrix> sliced{dmat->SliceCol(world_size, rank)};
auto model = MakeModel(objective, sliced);
auto base_score = GetBaseScore(model);
ASSERT_EQ(base_score, expected_base_score);
ASSERT_EQ(model, expected_model);
}
} // namespace
TEST_F(FederatedLearnerTest, BaseScore) {
std::shared_ptr<DMatrix> Xy_{RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix(true)};
std::unique_ptr<Learner> learner{Learner::Create({Xy_})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", "binary:logistic");
learner->UpdateOneIter(0, Xy_);
Json config{Object{}};
learner->SaveConfig(&config);
auto base_score = GetBaseScore(config);
ASSERT_NE(base_score, ObjFunction::DefaultBaseScore());
class FederatedLearnerTest : public ::testing::TestWithParam<std::string> {
std::unique_ptr<ServerForTest> server_;
static int const kWorldSize{3};
RunWithFederatedCommunicator(kWorldSize, server_address_, &VerifyBaseScore, kRows, kCols,
base_score);
protected:
void SetUp() override { server_ = std::make_unique<ServerForTest>(kWorldSize); }
void TearDown() override { server_.reset(nullptr); }
void Run(std::string objective) {
static auto constexpr kRows{16};
static auto constexpr kCols{16};
std::shared_ptr<DMatrix> dmat{RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix(true)};
auto &h_upper = dmat->Info().labels_upper_bound_.HostVector();
auto &h_lower = dmat->Info().labels_lower_bound_.HostVector();
h_lower.resize(kRows);
h_upper.resize(kRows);
for (size_t i = 0; i < kRows; ++i) {
h_lower[i] = 1;
h_upper[i] = 10;
}
if (objective.find("rank:") != std::string::npos) {
auto h_label = dmat->Info().labels.HostView();
std::size_t k = 0;
for (auto &v : h_label) {
v = k % 2 == 0;
++k;
}
}
auto model = MakeModel(objective, dmat);
auto score = GetBaseScore(model);
RunWithFederatedCommunicator(kWorldSize, server_->Address(), &VerifyObjective, kRows, kCols,
score, model, objective);
}
};
TEST_P(FederatedLearnerTest, Objective) {
std::string objective = GetParam();
this->Run(objective);
}
TEST_F(FederatedLearnerTest, Model) {
std::shared_ptr<DMatrix> Xy_{RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix(true)};
std::unique_ptr<Learner> learner{Learner::Create({Xy_})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", "binary:logistic");
learner->UpdateOneIter(0, Xy_);
Json model{Object{}};
learner->SaveModel(&model);
RunWithFederatedCommunicator(kWorldSize, server_address_, &VerifyModel, kRows, kCols,
std::cref(model));
}
INSTANTIATE_TEST_SUITE_P(FederatedLearnerObjective, FederatedLearnerTest,
::testing::ValuesIn(MakeObjNamesForTest()),
[](const ::testing::TestParamInfo<FederatedLearnerTest::ParamType> &info) {
return ObjTestNameGenerator(info);
});
} // namespace xgboost

View File

@@ -73,7 +73,7 @@ class FederatedServerTest : public BaseFederatedTest {
TEST_F(FederatedServerTest, Allgather) {
std::vector<std::thread> threads;
for (auto rank = 0; rank < kWorldSize; rank++) {
threads.emplace_back(&FederatedServerTest::VerifyAllgather, rank, server_address_);
threads.emplace_back(&FederatedServerTest::VerifyAllgather, rank, server_->Address());
}
for (auto& thread : threads) {
thread.join();
@@ -83,7 +83,7 @@ TEST_F(FederatedServerTest, Allgather) {
TEST_F(FederatedServerTest, Allreduce) {
std::vector<std::thread> threads;
for (auto rank = 0; rank < kWorldSize; rank++) {
threads.emplace_back(&FederatedServerTest::VerifyAllreduce, rank, server_address_);
threads.emplace_back(&FederatedServerTest::VerifyAllreduce, rank, server_->Address());
}
for (auto& thread : threads) {
thread.join();
@@ -93,7 +93,7 @@ TEST_F(FederatedServerTest, Allreduce) {
TEST_F(FederatedServerTest, Broadcast) {
std::vector<std::thread> threads;
for (auto rank = 0; rank < kWorldSize; rank++) {
threads.emplace_back(&FederatedServerTest::VerifyBroadcast, rank, server_address_);
threads.emplace_back(&FederatedServerTest::VerifyBroadcast, rank, server_->Address());
}
for (auto& thread : threads) {
thread.join();
@@ -103,7 +103,7 @@ TEST_F(FederatedServerTest, Broadcast) {
TEST_F(FederatedServerTest, Mixture) {
std::vector<std::thread> threads;
for (auto rank = 0; rank < kWorldSize; rank++) {
threads.emplace_back(&FederatedServerTest::VerifyMixture, rank, server_address_);
threads.emplace_back(&FederatedServerTest::VerifyMixture, rank, server_->Address());
}
for (auto& thread : threads) {
thread.join();

View File

@@ -1,22 +1,49 @@
/*!
* Copyright 2017-2023 by XGBoost contributors
/**
* Copyright (c) 2017-2023, XGBoost contributors
*/
#include <gtest/gtest.h>
#include <xgboost/learner.h>
#include <xgboost/objective.h> // ObjFunction
#include <xgboost/version_config.h>
#include <xgboost/learner.h> // for Learner
#include <xgboost/logging.h> // for LogCheck_NE, CHECK_NE, LogCheck_EQ
#include <xgboost/objective.h> // for ObjFunction
#include <xgboost/version_config.h> // for XGBOOST_VER_MAJOR, XGBOOST_VER_MINOR
#include <string> // std::stof, std::string
#include <thread>
#include <vector>
#include <algorithm> // for equal, transform
#include <cinttypes> // for int32_t, int64_t, uint32_t
#include <cstddef> // for size_t
#include <iosfwd> // for ofstream
#include <iterator> // for back_insert_iterator, back_inserter
#include <limits> // for numeric_limits
#include <map> // for map
#include <memory> // for unique_ptr, shared_ptr, __shared_ptr_...
#include <random> // for uniform_real_distribution
#include <string> // for allocator, basic_string, string, oper...
#include <thread> // for thread
#include <type_traits> // for is_integral
#include <utility> // for pair
#include <vector> // for vector
#include "../../src/common/api_entry.h" // XGBAPIThreadLocalEntry
#include "../../src/common/io.h"
#include "../../src/common/linalg_op.h"
#include "../../src/common/random.h"
#include "filesystem.h" // dmlc::TemporaryDirectory
#include "helpers.h"
#include "xgboost/json.h"
#include "../../src/collective/communicator-inl.h" // for GetRank, GetWorldSize
#include "../../src/common/api_entry.h" // for XGBAPIThreadLocalEntry
#include "../../src/common/io.h" // for LoadSequentialFile
#include "../../src/common/linalg_op.h" // for ElementWiseTransformHost, begin, end
#include "../../src/common/random.h" // for GlobalRandom
#include "../../src/common/transform_iterator.h" // for IndexTransformIter
#include "dmlc/io.h" // for Stream
#include "dmlc/omp.h" // for omp_get_max_threads
#include "dmlc/registry.h" // for Registry
#include "filesystem.h" // for TemporaryDirectory
#include "helpers.h" // for GetBaseScore, RandomDataGenerator
#include "objective_helpers.h" // for MakeObjNamesForTest, ObjTestNameGenerator
#include "xgboost/base.h" // for bst_float, Args, bst_feature_t, bst_int
#include "xgboost/context.h" // for Context
#include "xgboost/data.h" // for DMatrix, MetaInfo, DataType
#include "xgboost/host_device_vector.h" // for HostDeviceVector
#include "xgboost/json.h" // for Json, Object, get, String, IsA, opera...
#include "xgboost/linalg.h" // for Tensor, TensorView
#include "xgboost/logging.h" // for ConsoleLogger
#include "xgboost/predictor.h" // for PredictionCacheEntry
#include "xgboost/span.h" // for Span, operator!=, SpanIterator
#include "xgboost/string_view.h" // for StringView
namespace xgboost {
TEST(Learner, Basic) {
@@ -608,31 +635,90 @@ TEST_F(InitBaseScore, InitWithPredict) { this->TestInitWithPredt(); }
TEST_F(InitBaseScore, UpdateProcess) { this->TestUpdateProcess(); }
void TestColumnSplitBaseScore(std::shared_ptr<DMatrix> Xy_, float expected_base_score) {
auto const world_size = collective::GetWorldSize();
auto const rank = collective::GetRank();
std::shared_ptr<DMatrix> sliced{Xy_->SliceCol(world_size, rank)};
std::unique_ptr<Learner> learner{Learner::Create({sliced})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", "binary:logistic");
learner->UpdateOneIter(0, sliced);
Json config{Object{}};
learner->SaveConfig(&config);
auto base_score = GetBaseScore(config);
ASSERT_EQ(base_score, expected_base_score);
class TestColumnSplit : public ::testing::TestWithParam<std::string> {
static auto MakeFmat(std::string const& obj) {
auto constexpr kRows = 10, kCols = 10;
auto p_fmat = RandomDataGenerator{kRows, kCols, 0}.GenerateDMatrix(true);
auto& h_upper = p_fmat->Info().labels_upper_bound_.HostVector();
auto& h_lower = p_fmat->Info().labels_lower_bound_.HostVector();
h_lower.resize(kRows);
h_upper.resize(kRows);
for (size_t i = 0; i < kRows; ++i) {
h_lower[i] = 1;
h_upper[i] = 10;
}
if (obj.find("rank:") != std::string::npos) {
auto h_label = p_fmat->Info().labels.HostView();
std::size_t k = 0;
for (auto& v : h_label) {
v = k % 2 == 0;
++k;
}
}
return p_fmat;
};
void TestBaseScore(std::string objective, float expected_base_score, Json expected_model) {
auto const world_size = collective::GetWorldSize();
auto const rank = collective::GetRank();
auto p_fmat = MakeFmat(objective);
std::shared_ptr<DMatrix> sliced{p_fmat->SliceCol(world_size, rank)};
std::unique_ptr<Learner> learner{Learner::Create({sliced})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", objective);
if (objective.find("quantile") != std::string::npos) {
learner->SetParam("quantile_alpha", "0.5");
}
if (objective.find("multi") != std::string::npos) {
learner->SetParam("num_class", "3");
}
learner->UpdateOneIter(0, sliced);
Json config{Object{}};
learner->SaveConfig(&config);
auto base_score = GetBaseScore(config);
ASSERT_EQ(base_score, expected_base_score);
Json model{Object{}};
learner->SaveModel(&model);
ASSERT_EQ(model, expected_model);
}
public:
void Run(std::string objective) {
auto p_fmat = MakeFmat(objective);
std::unique_ptr<Learner> learner{Learner::Create({p_fmat})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", objective);
if (objective.find("quantile") != std::string::npos) {
learner->SetParam("quantile_alpha", "0.5");
}
if (objective.find("multi") != std::string::npos) {
learner->SetParam("num_class", "3");
}
learner->UpdateOneIter(0, p_fmat);
Json config{Object{}};
learner->SaveConfig(&config);
Json model{Object{}};
learner->SaveModel(&model);
auto constexpr kWorldSize{3};
auto call = [this, &objective](auto&... args) { TestBaseScore(objective, args...); };
auto score = GetBaseScore(config);
RunWithInMemoryCommunicator(kWorldSize, call, score, model);
}
};
TEST_P(TestColumnSplit, Objective) {
std::string objective = GetParam();
this->Run(objective);
}
TEST_F(InitBaseScore, ColumnSplit) {
std::unique_ptr<Learner> learner{Learner::Create({Xy_})};
learner->SetParam("tree_method", "approx");
learner->SetParam("objective", "binary:logistic");
learner->UpdateOneIter(0, Xy_);
Json config{Object{}};
learner->SaveConfig(&config);
auto base_score = GetBaseScore(config);
ASSERT_NE(base_score, ObjFunction::DefaultBaseScore());
auto constexpr kWorldSize{3};
RunWithInMemoryCommunicator(kWorldSize, &TestColumnSplitBaseScore, Xy_, base_score);
}
INSTANTIATE_TEST_SUITE_P(ColumnSplitObjective, TestColumnSplit,
::testing::ValuesIn(MakeObjNamesForTest()),
[](const ::testing::TestParamInfo<TestColumnSplit::ParamType>& info) {
return ObjTestNameGenerator(info);
});
} // namespace xgboost

View File

@@ -64,7 +64,7 @@ class TestModels:
num_round = 2
bst = xgb.train(param, dtrain, num_round, watchlist)
# this is prediction
preds = bst.predict(dtest, ntree_limit=num_round)
preds = bst.predict(dtest, iteration_range=(0, num_round))
labels = dtest.get_label()
err = sum(1 for i in range(len(preds))
if int(preds[i] > 0.5) != labels[i]) / float(len(preds))
@@ -83,7 +83,7 @@ class TestModels:
bst2 = xgb.Booster(params=param, model_file=model_path)
dtest2 = xgb.DMatrix(dtest_path)
preds2 = bst2.predict(dtest2, ntree_limit=num_round)
preds2 = bst2.predict(dtest2, iteration_range=(0, num_round))
# assert they are the same
assert np.sum(np.abs(preds2 - preds)) == 0
@@ -96,7 +96,7 @@ class TestModels:
# check whether custom evaluation metrics work
bst = xgb.train(param, dtrain, num_round, watchlist,
feval=my_logloss)
preds3 = bst.predict(dtest, ntree_limit=num_round)
preds3 = bst.predict(dtest, iteration_range=(0, num_round))
assert all(preds3 == preds)
# check whether sample_type and normalize_type work
@@ -110,7 +110,7 @@ class TestModels:
param['sample_type'] = p[0]
param['normalize_type'] = p[1]
bst = xgb.train(param, dtrain, num_round, watchlist)
preds = bst.predict(dtest, ntree_limit=num_round)
preds = bst.predict(dtest, iteration_range=(0, num_round))
err = sum(1 for i in range(len(preds))
if int(preds[i] > 0.5) != labels[i]) / float(len(preds))
assert err < 0.1
@@ -472,8 +472,8 @@ class TestModels:
X, y = load_iris(return_X_y=True)
cls = xgb.XGBClassifier(n_estimators=2)
cls.fit(X, y, early_stopping_rounds=1, eval_set=[(X, y)])
assert cls.get_booster().best_ntree_limit == 2
assert cls.best_ntree_limit == cls.get_booster().best_ntree_limit
assert cls.get_booster().best_iteration == cls.n_estimators - 1
assert cls.best_iteration == cls.get_booster().best_iteration
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, "cls.json")
@@ -481,8 +481,8 @@ class TestModels:
cls = xgb.XGBClassifier(n_estimators=2)
cls.load_model(path)
assert cls.get_booster().best_ntree_limit == 2
assert cls.best_ntree_limit == cls.get_booster().best_ntree_limit
assert cls.get_booster().best_iteration == cls.n_estimators - 1
assert cls.best_iteration == cls.get_booster().best_iteration
def run_slice(
self,
@@ -664,7 +664,7 @@ class TestModels:
y = rng.randn(rows)
feature_names = ["test_feature_" + str(i) for i in range(cols)]
X_pd = pd.DataFrame(X, columns=feature_names)
X_pd.iloc[:, 3] = X_pd.iloc[:, 3].astype(np.int32)
X_pd[f"test_feature_{3}"] = X_pd.iloc[:, 3].astype(np.int32)
Xy = xgb.DMatrix(X_pd, y)
assert Xy.feature_types[3] == "int"

View File

@@ -102,7 +102,6 @@ eval[test] = {data_path}
booster.feature_names = None
booster.feature_types = None
booster.set_attr(best_iteration=None)
booster.set_attr(best_ntree_limit=None)
booster.save_model(model_out_py)
py_predt = booster.predict(data)

View File

@@ -1,4 +1,4 @@
'''Tests for running inplace prediction.'''
"""Tests for running inplace prediction."""
from concurrent.futures import ThreadPoolExecutor
import numpy as np
@@ -17,10 +17,10 @@ def run_threaded_predict(X, rows, predict_func):
per_thread = 20
with ThreadPoolExecutor(max_workers=10) as e:
for i in range(0, rows, int(rows / per_thread)):
if hasattr(X, 'iloc'):
predictor = X.iloc[i:i+per_thread, :]
if hasattr(X, "iloc"):
predictor = X.iloc[i : i + per_thread, :]
else:
predictor = X[i:i+per_thread, ...]
predictor = X[i : i + per_thread, ...]
f = e.submit(predict_func, predictor)
results.append(f)
@@ -61,27 +61,31 @@ def run_predict_leaf(predictor):
validate_leaf_output(leaf, num_parallel_tree)
ntree_limit = 2
n_iters = 2
sliced = booster.predict(
m, pred_leaf=True, ntree_limit=num_parallel_tree * ntree_limit, strict_shape=True
m,
pred_leaf=True,
iteration_range=(0, n_iters),
strict_shape=True,
)
first = sliced[0, ...]
assert np.prod(first.shape) == classes * num_parallel_tree * ntree_limit
assert np.prod(first.shape) == classes * num_parallel_tree * n_iters
# When there's only 1 tree, the output is a 1 dim vector
booster = xgb.train({"tree_method": "hist"}, num_boost_round=1, dtrain=m)
assert booster.predict(m, pred_leaf=True).shape == (rows, )
assert booster.predict(m, pred_leaf=True).shape == (rows,)
return leaf
def test_predict_leaf():
run_predict_leaf('cpu_predictor')
run_predict_leaf("cpu_predictor")
def test_predict_shape():
from sklearn.datasets import fetch_california_housing
X, y = fetch_california_housing(return_X_y=True)
reg = xgb.XGBRegressor(n_estimators=1)
reg.fit(X, y)
@@ -119,13 +123,14 @@ def test_predict_shape():
class TestInplacePredict:
'''Tests for running inplace prediction'''
"""Tests for running inplace prediction"""
@classmethod
def setup_class(cls):
cls.rows = 1000
cls.cols = 10
cls.missing = 11 # set to integer for testing
cls.missing = 11 # set to integer for testing
cls.rng = np.random.RandomState(1994)
@@ -139,7 +144,7 @@ class TestInplacePredict:
cls.test = xgb.DMatrix(cls.X[:10, ...], missing=cls.missing)
cls.num_boost_round = 10
cls.booster = xgb.train({'tree_method': 'hist'}, dtrain, num_boost_round=10)
cls.booster = xgb.train({"tree_method": "hist"}, dtrain, num_boost_round=10)
def test_predict(self):
booster = self.booster
@@ -162,28 +167,22 @@ class TestInplacePredict:
predt_from_array = booster.inplace_predict(
X[:10, ...], iteration_range=(0, 4), missing=self.missing
)
predt_from_dmatrix = booster.predict(test, ntree_limit=4)
predt_from_dmatrix = booster.predict(test, iteration_range=(0, 4))
np.testing.assert_allclose(predt_from_dmatrix, predt_from_array)
with pytest.raises(ValueError):
booster.predict(test, ntree_limit=booster.best_ntree_limit + 1)
with pytest.raises(ValueError):
booster.predict(test, iteration_range=(0, booster.best_iteration + 2))
default = booster.predict(test)
range_full = booster.predict(test, iteration_range=(0, self.num_boost_round))
ntree_full = booster.predict(test, ntree_limit=self.num_boost_round)
np.testing.assert_allclose(range_full, default)
np.testing.assert_allclose(ntree_full, default)
range_full = booster.predict(
test, iteration_range=(0, booster.best_iteration + 1)
)
ntree_full = booster.predict(test, ntree_limit=booster.best_ntree_limit)
np.testing.assert_allclose(range_full, default)
np.testing.assert_allclose(ntree_full, default)
def predict_dense(x):
inplace_predt = booster.inplace_predict(x)
@@ -251,6 +250,7 @@ class TestInplacePredict:
@pytest.mark.skipif(**tm.no_pandas())
def test_pd_dtypes(self) -> None:
from pandas.api.types import is_bool_dtype
for orig, x in pd_dtypes():
dtypes = orig.dtypes if isinstance(orig, pd.DataFrame) else [orig.dtypes]
if isinstance(orig, pd.DataFrame) and is_bool_dtype(dtypes[0]):

View File

@@ -60,7 +60,7 @@ def test_ranking_with_weighted_data():
assert all(p <= q for p, q in zip(auc_rec, auc_rec[1:]))
for i in range(1, 11):
pred = bst.predict(dtrain, ntree_limit=i)
pred = bst.predict(dtrain, iteration_range=(0, i))
# is_sorted[i]: is i-th group correctly sorted by the ranking predictor?
is_sorted = []
for k in range(0, 20, 5):

View File

@@ -95,44 +95,39 @@ class TestTrainingContinuation:
res2 = mean_squared_error(y_2class, gbdt_03b.predict(dtrain_2class))
assert res1 == res2
gbdt_04 = xgb.train(xgb_params_02, dtrain_2class,
num_boost_round=3)
assert gbdt_04.best_ntree_limit == (gbdt_04.best_iteration +
1) * self.num_parallel_tree
gbdt_04 = xgb.train(xgb_params_02, dtrain_2class, num_boost_round=3)
res1 = mean_squared_error(y_2class, gbdt_04.predict(dtrain_2class))
res2 = mean_squared_error(y_2class,
gbdt_04.predict(
dtrain_2class,
ntree_limit=gbdt_04.best_ntree_limit))
res2 = mean_squared_error(
y_2class,
gbdt_04.predict(
dtrain_2class, iteration_range=(0, gbdt_04.best_iteration + 1)
)
)
assert res1 == res2
gbdt_04 = xgb.train(xgb_params_02, dtrain_2class,
num_boost_round=7, xgb_model=gbdt_04)
assert gbdt_04.best_ntree_limit == (
gbdt_04.best_iteration + 1) * self.num_parallel_tree
gbdt_04 = xgb.train(
xgb_params_02, dtrain_2class, num_boost_round=7, xgb_model=gbdt_04
)
res1 = mean_squared_error(y_2class, gbdt_04.predict(dtrain_2class))
res2 = mean_squared_error(y_2class,
gbdt_04.predict(
dtrain_2class,
ntree_limit=gbdt_04.best_ntree_limit))
res2 = mean_squared_error(
y_2class,
gbdt_04.predict(
dtrain_2class, iteration_range=(0, gbdt_04.best_iteration + 1)
)
)
assert res1 == res2
gbdt_05 = xgb.train(xgb_params_03, dtrain_5class,
num_boost_round=7)
assert gbdt_05.best_ntree_limit == (
gbdt_05.best_iteration + 1) * self.num_parallel_tree
gbdt_05 = xgb.train(xgb_params_03,
dtrain_5class,
num_boost_round=3,
xgb_model=gbdt_05)
assert gbdt_05.best_ntree_limit == (
gbdt_05.best_iteration + 1) * self.num_parallel_tree
res1 = gbdt_05.predict(dtrain_5class)
res2 = gbdt_05.predict(dtrain_5class,
ntree_limit=gbdt_05.best_ntree_limit)
res2 = gbdt_05.predict(
dtrain_5class, iteration_range=(0, gbdt_05.best_iteration + 1)
)
np.testing.assert_almost_equal(res1, res2)
@pytest.mark.skipif(**tm.no_sklearn())

View File

@@ -77,7 +77,10 @@ class TestPandas:
np.testing.assert_array_equal(result, exp)
dm = xgb.DMatrix(dummies)
assert dm.feature_names == ['B', 'A_X', 'A_Y', 'A_Z']
assert dm.feature_types == ['int', 'int', 'int', 'int']
if int(pd.__version__[0]) >= 2:
assert dm.feature_types == ['int', 'i', 'i', 'i']
else:
assert dm.feature_types == ['int', 'int', 'int', 'int']
assert dm.num_row() == 3
assert dm.num_col() == 4
@@ -298,14 +301,14 @@ class TestPandas:
@pytest.mark.parametrize("DMatrixT", [xgb.DMatrix, xgb.QuantileDMatrix])
def test_nullable_type(self, DMatrixT) -> None:
from pandas.api.types import is_categorical
from pandas.api.types import is_categorical_dtype
for orig, df in pd_dtypes():
if hasattr(df.dtypes, "__iter__"):
enable_categorical = any(is_categorical for dtype in df.dtypes)
enable_categorical = any(is_categorical_dtype for dtype in df.dtypes)
else:
# series
enable_categorical = is_categorical(df.dtype)
enable_categorical = is_categorical_dtype(df.dtype)
f0_orig = orig[orig.columns[0]] if isinstance(orig, pd.DataFrame) else orig
f0 = df[df.columns[0]] if isinstance(df, pd.DataFrame) else df

View File

@@ -13,9 +13,9 @@ except Exception:
pytestmark = pytest.mark.skipif(shap is None, reason="Requires shap package")
# Check integration is not broken from xgboost side
# Changes in binary format may cause problems
def test_with_shap():
# xgboost removed ntree_limit in 2.0, which breaks the SHAP package.
@pytest.mark.xfail
def test_with_shap() -> None:
from sklearn.datasets import fetch_california_housing
X, y = fetch_california_housing(return_X_y=True)

View File

@@ -63,9 +63,15 @@ def test_multiclass_classification(objective):
assert xgb_model.get_booster().num_boosted_rounds() == 100
preds = xgb_model.predict(X[test_index])
# test other params in XGBClassifier().fit
preds2 = xgb_model.predict(X[test_index], output_margin=True, ntree_limit=3)
preds3 = xgb_model.predict(X[test_index], output_margin=True, ntree_limit=0)
preds4 = xgb_model.predict(X[test_index], output_margin=False, ntree_limit=3)
preds2 = xgb_model.predict(
X[test_index], output_margin=True, iteration_range=(0, 1)
)
preds3 = xgb_model.predict(
X[test_index], output_margin=True, iteration_range=None
)
preds4 = xgb_model.predict(
X[test_index], output_margin=False, iteration_range=(0, 1)
)
labels = y[test_index]
check_pred(preds, labels, output_margin=False)
@@ -86,25 +92,21 @@ def test_multiclass_classification(objective):
assert proba.shape[1] == cls.n_classes_
def test_best_ntree_limit():
def test_best_iteration():
from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)
def train(booster, forest):
def train(booster: str, forest: Optional[int]) -> None:
rounds = 4
cls = xgb.XGBClassifier(
n_estimators=rounds, num_parallel_tree=forest, booster=booster
).fit(
X, y, eval_set=[(X, y)], early_stopping_rounds=3
)
assert cls.best_iteration == rounds - 1
if forest:
assert cls.best_ntree_limit == rounds * forest
else:
assert cls.best_ntree_limit == 0
# best_ntree_limit is used by default, assert that under gblinear it's
# best_iteration is used by default, assert that under gblinear it's
# automatically ignored due to being 0.
cls.predict(X)
@@ -430,12 +432,15 @@ def test_regression():
preds = xgb_model.predict(X[test_index])
# test other params in XGBRegressor().fit
preds2 = xgb_model.predict(X[test_index], output_margin=True,
ntree_limit=3)
preds3 = xgb_model.predict(X[test_index], output_margin=True,
ntree_limit=0)
preds4 = xgb_model.predict(X[test_index], output_margin=False,
ntree_limit=3)
preds2 = xgb_model.predict(
X[test_index], output_margin=True, iteration_range=(0, 3)
)
preds3 = xgb_model.predict(
X[test_index], output_margin=True, iteration_range=None
)
preds4 = xgb_model.predict(
X[test_index], output_margin=False, iteration_range=(0, 3)
)
labels = y[test_index]
assert mean_squared_error(preds, labels) < 25

View File

@@ -169,7 +169,7 @@ def reg_with_weight(
)
RegData = namedtuple("RegData", ("reg_df_train", "reg_df_test"))
RegData = namedtuple("RegData", ("reg_df_train", "reg_df_test", "reg_params"))
@pytest.fixture
@@ -181,6 +181,13 @@ def reg_data(spark: SparkSession) -> Generator[RegData, None, None]:
predt0 = reg1.predict(X)
pred_contrib0: np.ndarray = pred_contribs(reg1, X, None, False)
reg_params = {
"max_depth": 5,
"n_estimators": 10,
"iteration_range": [0, 5],
"max_bin": 9,
}
# convert np array to pyspark dataframe
reg_df_train_data = [
(Vectors.dense(X[0, :]), int(y[0])),
@@ -188,26 +195,34 @@ def reg_data(spark: SparkSession) -> Generator[RegData, None, None]:
]
reg_df_train = spark.createDataFrame(reg_df_train_data, ["features", "label"])
reg2 = xgb.XGBRegressor(max_depth=5, n_estimators=10)
reg2.fit(X, y)
predt2 = reg2.predict(X, iteration_range=[0, 5])
# array([0.22185266, 0.77814734], dtype=float32)
reg_df_test = spark.createDataFrame(
[
(
Vectors.dense(X[0, :]),
float(predt0[0]),
pred_contrib0[0, :].tolist(),
float(predt2[0]),
),
(
Vectors.sparse(3, {1: 1.0, 2: 5.5}),
float(predt0[1]),
pred_contrib0[1, :].tolist(),
float(predt2[1]),
),
],
[
"features",
"expected_prediction",
"expected_pred_contribs",
"expected_prediction_with_params",
],
)
yield RegData(reg_df_train, reg_df_test)
yield RegData(reg_df_train, reg_df_test, reg_params)
MultiClfData = namedtuple("MultiClfData", ("multi_clf_df_train", "multi_clf_df_test"))
@@ -740,6 +755,76 @@ class TestPySparkLocal:
model = classifier.fit(clf_data.cls_df_train)
model.transform(clf_data.cls_df_test).collect()
def test_regressor_model_save_load(self, reg_data: RegData) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
path = "file:" + tmpdir
regressor = SparkXGBRegressor(**reg_data.reg_params)
model = regressor.fit(reg_data.reg_df_train)
model.save(path)
loaded_model = SparkXGBRegressorModel.load(path)
assert model.uid == loaded_model.uid
for k, v in reg_data.reg_params.items():
assert loaded_model.getOrDefault(k) == v
pred_result = loaded_model.transform(reg_data.reg_df_test).collect()
for row in pred_result:
assert np.isclose(
row.prediction, row.expected_prediction_with_params, atol=1e-3
)
with pytest.raises(AssertionError, match="Expected class name"):
SparkXGBClassifierModel.load(path)
assert_model_compatible(model, tmpdir)
def test_regressor_with_params(self, reg_data: RegData) -> None:
regressor = SparkXGBRegressor(**reg_data.reg_params)
all_params = dict(
**(regressor._gen_xgb_params_dict()),
**(regressor._gen_fit_params_dict()),
**(regressor._gen_predict_params_dict()),
)
check_sub_dict_match(
reg_data.reg_params, all_params, excluding_keys=_non_booster_params
)
model = regressor.fit(reg_data.reg_df_train)
all_params = dict(
**(model._gen_xgb_params_dict()),
**(model._gen_fit_params_dict()),
**(model._gen_predict_params_dict()),
)
check_sub_dict_match(
reg_data.reg_params, all_params, excluding_keys=_non_booster_params
)
pred_result = model.transform(reg_data.reg_df_test).collect()
for row in pred_result:
assert np.isclose(
row.prediction, row.expected_prediction_with_params, atol=1e-3
)
def test_regressor_model_pipeline_save_load(self, reg_data: RegData) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
path = "file:" + tmpdir
regressor = SparkXGBRegressor()
pipeline = Pipeline(stages=[regressor])
pipeline = pipeline.copy(
extra=get_params_map(reg_data.reg_params, regressor)
)
model = pipeline.fit(reg_data.reg_df_train)
model.save(path)
loaded_model = PipelineModel.load(path)
for k, v in reg_data.reg_params.items():
assert loaded_model.stages[0].getOrDefault(k) == v
pred_result = loaded_model.transform(reg_data.reg_df_test).collect()
for row in pred_result:
assert np.isclose(
row.prediction, row.expected_prediction_with_params, atol=1e-3
)
assert_model_compatible(model.stages[0], tmpdir)
class XgboostLocalTest(SparkTestCase):
def setUp(self):
@@ -918,12 +1003,6 @@ class XgboostLocalTest(SparkTestCase):
def get_local_tmp_dir(self):
return self.tempdir + str(uuid.uuid4())
def assert_model_compatible(self, model: XGBModel, model_path: str):
bst = xgb.Booster()
path = glob.glob(f"{model_path}/**/model/part-00000", recursive=True)[0]
bst.load_model(path)
self.assertEqual(model.get_booster().save_raw("json"), bst.save_raw("json"))
def test_convert_to_sklearn_model_reg(self) -> None:
regressor = SparkXGBRegressor(
n_estimators=200, missing=2.0, max_depth=3, sketch_eps=0.5
@@ -1007,80 +1086,6 @@ class XgboostLocalTest(SparkTestCase):
== "float64"
)
def test_regressor_with_params(self):
regressor = SparkXGBRegressor(**self.reg_params)
all_params = dict(
**(regressor._gen_xgb_params_dict()),
**(regressor._gen_fit_params_dict()),
**(regressor._gen_predict_params_dict()),
)
check_sub_dict_match(
self.reg_params, all_params, excluding_keys=_non_booster_params
)
model = regressor.fit(self.reg_df_train)
all_params = dict(
**(model._gen_xgb_params_dict()),
**(model._gen_fit_params_dict()),
**(model._gen_predict_params_dict()),
)
check_sub_dict_match(
self.reg_params, all_params, excluding_keys=_non_booster_params
)
pred_result = model.transform(self.reg_df_test).collect()
for row in pred_result:
self.assertTrue(
np.isclose(
row.prediction, row.expected_prediction_with_params, atol=1e-3
)
)
def test_regressor_model_save_load(self):
tmp_dir = self.get_local_tmp_dir()
path = "file:" + tmp_dir
regressor = SparkXGBRegressor(**self.reg_params)
model = regressor.fit(self.reg_df_train)
model.save(path)
loaded_model = SparkXGBRegressorModel.load(path)
self.assertEqual(model.uid, loaded_model.uid)
for k, v in self.reg_params.items():
self.assertEqual(loaded_model.getOrDefault(k), v)
pred_result = loaded_model.transform(self.reg_df_test).collect()
for row in pred_result:
self.assertTrue(
np.isclose(
row.prediction, row.expected_prediction_with_params, atol=1e-3
)
)
with self.assertRaisesRegex(AssertionError, "Expected class name"):
SparkXGBClassifierModel.load(path)
self.assert_model_compatible(model, tmp_dir)
def test_regressor_model_pipeline_save_load(self):
tmp_dir = self.get_local_tmp_dir()
path = "file:" + tmp_dir
regressor = SparkXGBRegressor()
pipeline = Pipeline(stages=[regressor])
pipeline = pipeline.copy(extra=get_params_map(self.reg_params, regressor))
model = pipeline.fit(self.reg_df_train)
model.save(path)
loaded_model = PipelineModel.load(path)
for k, v in self.reg_params.items():
self.assertEqual(loaded_model.stages[0].getOrDefault(k), v)
pred_result = loaded_model.transform(self.reg_df_test).collect()
for row in pred_result:
self.assertTrue(
np.isclose(
row.prediction, row.expected_prediction_with_params, atol=1e-3
)
)
self.assert_model_compatible(model.stages[0], tmp_dir)
def test_callbacks(self):
from xgboost.callback import LearningRateScheduler

View File

@@ -1,16 +1,24 @@
import json
import logging
import os
import random
import tempfile
import uuid
from collections import namedtuple
import numpy as np
import pytest
import xgboost as xgb
from xgboost import testing as tm
from xgboost.callback import LearningRateScheduler
pytestmark = pytest.mark.skipif(**tm.no_spark())
from typing import Generator
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession
from xgboost.spark import SparkXGBClassifier, SparkXGBRegressor
from xgboost.spark.utils import _get_max_num_concurrent_tasks
@@ -18,51 +26,119 @@ from xgboost.spark.utils import _get_max_num_concurrent_tasks
from .utils import SparkLocalClusterTestCase
@pytest.fixture
def spark() -> Generator[SparkSession, None, None]:
config = {
"spark.master": "local-cluster[2, 2, 1024]",
"spark.python.worker.reuse": "false",
"spark.driver.host": "127.0.0.1",
"spark.task.maxFailures": "1",
"spark.sql.execution.pyspark.udf.simplifiedTraceback.enabled": "false",
"spark.sql.pyspark.jvmStacktrace.enabled": "true",
"spark.cores.max": "4",
"spark.task.cpus": "1",
"spark.executor.cores": "2",
}
builder = SparkSession.builder.appName("XGBoost PySpark Python API Tests")
for k, v in config.items():
builder.config(k, v)
logging.getLogger("pyspark").setLevel(logging.INFO)
sess = builder.getOrCreate()
yield sess
sess.stop()
sess.sparkContext.stop()
RegData = namedtuple("RegData", ("reg_df_train", "reg_df_test", "reg_params"))
@pytest.fixture
def reg_data(spark: SparkSession) -> Generator[RegData, None, None]:
reg_params = {"max_depth": 5, "n_estimators": 10, "iteration_range": (0, 5)}
X = np.array([[1.0, 2.0, 3.0], [0.0, 1.0, 5.5]])
y = np.array([0, 1])
def custom_lr(boosting_round):
return 1.0 / (boosting_round + 1)
reg1 = xgb.XGBRegressor(callbacks=[LearningRateScheduler(custom_lr)])
reg1.fit(X, y)
predt1 = reg1.predict(X)
# array([0.02406833, 0.97593164], dtype=float32)
reg2 = xgb.XGBRegressor(max_depth=5, n_estimators=10)
reg2.fit(X, y)
predt2 = reg2.predict(X, iteration_range=(0, 5))
# array([0.22185263, 0.77814734], dtype=float32)
reg_df_train = spark.createDataFrame(
[
(Vectors.dense(1.0, 2.0, 3.0), 0),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1),
],
["features", "label"],
)
reg_df_test = spark.createDataFrame(
[
(Vectors.dense(1.0, 2.0, 3.0), 0.0, float(predt2[0]), float(predt1[0])),
(
Vectors.sparse(3, {1: 1.0, 2: 5.5}),
1.0,
float(predt2[1]),
float(predt1[1]),
),
],
[
"features",
"expected_prediction",
"expected_prediction_with_params",
"expected_prediction_with_callbacks",
],
)
yield RegData(reg_df_train, reg_df_test, reg_params)
class TestPySparkLocalCluster:
def test_regressor_basic_with_params(self, reg_data: RegData) -> None:
regressor = SparkXGBRegressor(**reg_data.reg_params)
model = regressor.fit(reg_data.reg_df_train)
pred_result = model.transform(reg_data.reg_df_test).collect()
for row in pred_result:
assert np.isclose(
row.prediction, row.expected_prediction_with_params, atol=1e-3
)
def test_callbacks(self, reg_data: RegData) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
path = os.path.join(tmpdir, str(uuid.uuid4()))
def custom_lr(boosting_round):
return 1.0 / (boosting_round + 1)
cb = [LearningRateScheduler(custom_lr)]
regressor = SparkXGBRegressor(callbacks=cb)
# Test the save/load of the estimator instead of the model, since
# the callbacks param only exists in the estimator but not in the model
regressor.save(path)
regressor = SparkXGBRegressor.load(path)
model = regressor.fit(reg_data.reg_df_train)
pred_result = model.transform(reg_data.reg_df_test).collect()
for row in pred_result:
assert np.isclose(
row.prediction, row.expected_prediction_with_callbacks, atol=1e-3
)
class XgboostLocalClusterTestCase(SparkLocalClusterTestCase):
def setUp(self):
random.seed(2020)
self.n_workers = _get_max_num_concurrent_tasks(self.session.sparkContext)
# The following code use xgboost python library to train xgb model and predict.
#
# >>> import numpy as np
# >>> import xgboost
# >>> X = np.array([[1.0, 2.0, 3.0], [0.0, 1.0, 5.5]])
# >>> y = np.array([0, 1])
# >>> reg1 = xgboost.XGBRegressor()
# >>> reg1.fit(X, y)
# >>> reg1.predict(X)
# array([8.8363886e-04, 9.9911636e-01], dtype=float32)
# >>> def custom_lr(boosting_round, num_boost_round):
# ... return 1.0 / (boosting_round + 1)
# ...
# >>> reg1.fit(X, y, callbacks=[xgboost.callback.reset_learning_rate(custom_lr)])
# >>> reg1.predict(X)
# array([0.02406833, 0.97593164], dtype=float32)
# >>> reg2 = xgboost.XGBRegressor(max_depth=5, n_estimators=10)
# >>> reg2.fit(X, y)
# >>> reg2.predict(X, ntree_limit=5)
# array([0.22185263, 0.77814734], dtype=float32)
self.reg_params = {"max_depth": 5, "n_estimators": 10, "ntree_limit": 5}
self.reg_df_train = self.session.createDataFrame(
[
(Vectors.dense(1.0, 2.0, 3.0), 0),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1),
],
["features", "label"],
)
self.reg_df_test = self.session.createDataFrame(
[
(Vectors.dense(1.0, 2.0, 3.0), 0.0, 0.2219, 0.02406),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 1.0, 0.7781, 0.9759),
],
[
"features",
"expected_prediction",
"expected_prediction_with_params",
"expected_prediction_with_callbacks",
],
)
# Distributed section
# Binary classification
@@ -218,42 +294,6 @@ class XgboostLocalClusterTestCase(SparkLocalClusterTestCase):
self.reg_best_score_eval = 5.239e-05
self.reg_best_score_weight_and_eval = 4.850e-05
def test_regressor_basic_with_params(self):
regressor = SparkXGBRegressor(**self.reg_params)
model = regressor.fit(self.reg_df_train)
pred_result = model.transform(self.reg_df_test).collect()
for row in pred_result:
self.assertTrue(
np.isclose(
row.prediction, row.expected_prediction_with_params, atol=1e-3
)
)
def test_callbacks(self):
from xgboost.callback import LearningRateScheduler
path = os.path.join(self.tempdir, str(uuid.uuid4()))
def custom_learning_rate(boosting_round):
return 1.0 / (boosting_round + 1)
cb = [LearningRateScheduler(custom_learning_rate)]
regressor = SparkXGBRegressor(callbacks=cb)
# Test the save/load of the estimator instead of the model, since
# the callbacks param only exists in the estimator but not in the model
regressor.save(path)
regressor = SparkXGBRegressor.load(path)
model = regressor.fit(self.reg_df_train)
pred_result = model.transform(self.reg_df_test).collect()
for row in pred_result:
self.assertTrue(
np.isclose(
row.prediction, row.expected_prediction_with_callbacks, atol=1e-3
)
)
def test_classifier_distributed_basic(self):
classifier = SparkXGBClassifier(num_workers=self.n_workers, n_estimators=100)
model = classifier.fit(self.cls_df_train_distributed)
@@ -409,7 +449,6 @@ class XgboostLocalClusterTestCase(SparkLocalClusterTestCase):
pred_result = model.transform(
self.cls_df_test_distributed_lower_estimators
).collect()
print(pred_result)
for row in pred_result:
self.assertTrue(np.isclose(row.expected_label, row.prediction, atol=1e-3))
self.assertTrue(