Rework MAP and Pairwise for LTR. (#9075)

This commit is contained in:
Jiaming Yuan
2023-04-28 02:39:12 +08:00
committed by GitHub
parent 0e470ef606
commit e206b899ef
19 changed files with 612 additions and 1135 deletions

View File

@@ -1343,61 +1343,94 @@ class XgboostLocalTest(SparkTestCase):
SparkXGBClassifier(evals_result={})
class XgboostRankerLocalTest(SparkTestCase):
def setUp(self):
self.session.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "8")
self.ranker_df_train = self.session.createDataFrame(
[
(Vectors.dense(1.0, 2.0, 3.0), 0, 0),
(Vectors.dense(4.0, 5.0, 6.0), 1, 0),
(Vectors.dense(9.0, 4.0, 8.0), 2, 0),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 0, 1),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, 1),
(Vectors.sparse(3, {1: 8.0, 2: 9.5}), 2, 1),
],
["features", "label", "qid"],
)
self.ranker_df_test = self.session.createDataFrame(
[
(Vectors.dense(1.5, 2.0, 3.0), 0, -1.87988),
(Vectors.dense(4.5, 5.0, 6.0), 0, 0.29556),
(Vectors.dense(9.0, 4.5, 8.0), 0, 2.36570),
(Vectors.sparse(3, {1: 1.0, 2: 6.0}), 1, -1.87988),
(Vectors.sparse(3, {1: 6.0, 2: 7.0}), 1, -0.30612),
(Vectors.sparse(3, {1: 8.0, 2: 10.5}), 1, 2.44826),
],
["features", "qid", "expected_prediction"],
)
self.ranker_df_train_1 = self.session.createDataFrame(
[
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 0, 9),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, 9),
(Vectors.sparse(3, {1: 8.0, 2: 9.5}), 2, 9),
(Vectors.dense(1.0, 2.0, 3.0), 0, 8),
(Vectors.dense(4.0, 5.0, 6.0), 1, 8),
(Vectors.dense(9.0, 4.0, 8.0), 2, 8),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 0, 7),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, 7),
(Vectors.sparse(3, {1: 8.0, 2: 9.5}), 2, 7),
(Vectors.dense(1.0, 2.0, 3.0), 0, 6),
(Vectors.dense(4.0, 5.0, 6.0), 1, 6),
(Vectors.dense(9.0, 4.0, 8.0), 2, 6),
]
* 4,
["features", "label", "qid"],
)
LTRData = namedtuple("LTRData", ("df_train", "df_test", "df_train_1"))
def test_ranker(self):
ranker = SparkXGBRanker(qid_col="qid")
@pytest.fixture
def ltr_data(spark: SparkSession) -> Generator[LTRData, None, None]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "8")
ranker_df_train = spark.createDataFrame(
[
(Vectors.dense(1.0, 2.0, 3.0), 0, 0),
(Vectors.dense(4.0, 5.0, 6.0), 1, 0),
(Vectors.dense(9.0, 4.0, 8.0), 2, 0),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 0, 1),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, 1),
(Vectors.sparse(3, {1: 8.0, 2: 9.5}), 2, 1),
],
["features", "label", "qid"],
)
X_train = np.array(
[
[1.0, 2.0, 3.0],
[4.0, 5.0, 6.0],
[9.0, 4.0, 8.0],
[np.NaN, 1.0, 5.5],
[np.NaN, 6.0, 7.5],
[np.NaN, 8.0, 9.5],
]
)
qid_train = np.array([0, 0, 0, 1, 1, 1])
y_train = np.array([0, 1, 2, 0, 1, 2])
X_test = np.array(
[
[1.5, 2.0, 3.0],
[4.5, 5.0, 6.0],
[9.0, 4.5, 8.0],
[np.NaN, 1.0, 6.0],
[np.NaN, 6.0, 7.0],
[np.NaN, 8.0, 10.5],
]
)
ltr = xgb.XGBRanker(tree_method="approx", objective="rank:pairwise")
ltr.fit(X_train, y_train, qid=qid_train)
predt = ltr.predict(X_test)
ranker_df_test = spark.createDataFrame(
[
(Vectors.dense(1.5, 2.0, 3.0), 0, float(predt[0])),
(Vectors.dense(4.5, 5.0, 6.0), 0, float(predt[1])),
(Vectors.dense(9.0, 4.5, 8.0), 0, float(predt[2])),
(Vectors.sparse(3, {1: 1.0, 2: 6.0}), 1, float(predt[3])),
(Vectors.sparse(3, {1: 6.0, 2: 7.0}), 1, float(predt[4])),
(Vectors.sparse(3, {1: 8.0, 2: 10.5}), 1, float(predt[5])),
],
["features", "qid", "expected_prediction"],
)
ranker_df_train_1 = spark.createDataFrame(
[
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 0, 9),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, 9),
(Vectors.sparse(3, {1: 8.0, 2: 9.5}), 2, 9),
(Vectors.dense(1.0, 2.0, 3.0), 0, 8),
(Vectors.dense(4.0, 5.0, 6.0), 1, 8),
(Vectors.dense(9.0, 4.0, 8.0), 2, 8),
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 0, 7),
(Vectors.sparse(3, {1: 6.0, 2: 7.5}), 1, 7),
(Vectors.sparse(3, {1: 8.0, 2: 9.5}), 2, 7),
(Vectors.dense(1.0, 2.0, 3.0), 0, 6),
(Vectors.dense(4.0, 5.0, 6.0), 1, 6),
(Vectors.dense(9.0, 4.0, 8.0), 2, 6),
]
* 4,
["features", "label", "qid"],
)
yield LTRData(ranker_df_train, ranker_df_test, ranker_df_train_1)
class TestPySparkLocalLETOR:
def test_ranker(self, ltr_data: LTRData) -> None:
ranker = SparkXGBRanker(qid_col="qid", objective="rank:pairwise")
assert ranker.getOrDefault(ranker.objective) == "rank:pairwise"
model = ranker.fit(self.ranker_df_train)
pred_result = model.transform(self.ranker_df_test).collect()
model = ranker.fit(ltr_data.df_train)
pred_result = model.transform(ltr_data.df_test).collect()
for row in pred_result:
assert np.isclose(row.prediction, row.expected_prediction, rtol=1e-3)
def test_ranker_qid_sorted(self):
ranker = SparkXGBRanker(qid_col="qid", num_workers=4)
assert ranker.getOrDefault(ranker.objective) == "rank:pairwise"
model = ranker.fit(self.ranker_df_train_1)
model.transform(self.ranker_df_test).collect()
def test_ranker_qid_sorted(self, ltr_data: LTRData) -> None:
ranker = SparkXGBRanker(qid_col="qid", num_workers=4, objective="rank:ndcg")
assert ranker.getOrDefault(ranker.objective) == "rank:ndcg"
model = ranker.fit(ltr_data.df_train_1)
model.transform(ltr_data.df_test).collect()