diff --git a/include/xgboost/data.h b/include/xgboost/data.h index 32ca6f314..e2d800ca4 100644 --- a/include/xgboost/data.h +++ b/include/xgboost/data.h @@ -9,6 +9,7 @@ #include #include +#include #include #include #include @@ -169,8 +170,16 @@ class SparsePage { inline Inst operator[](size_t i) const { const auto& data_vec = data.HostVector(); const auto& offset_vec = offset.HostVector(); + size_t size; + // in distributed mode, some partitions may not get any instance for a feature. Therefore + // we should set the size as zero + if (rabit::IsDistributed() && i + 1 >= offset_vec.size()) { + size = 0; + } else { + size = offset_vec[i + 1] - offset_vec[i]; + } return {data_vec.data() + offset_vec[i], - static_cast(offset_vec[i + 1] - offset_vec[i])}; + static_cast(size)}; } /*! \brief constructor */ @@ -285,7 +294,6 @@ class SparsePage { auto& data_vec = data.HostVector(); auto& offset_vec = offset.HostVector(); offset_vec.push_back(offset_vec.back() + inst.size()); - size_t begin = data_vec.size(); data_vec.resize(begin + inst.size()); if (inst.size() != 0) { diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/TrainTestData.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/TrainTestData.scala index 2a93170a6..1a64e2d03 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/TrainTestData.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/TrainTestData.scala @@ -98,3 +98,12 @@ object Ranking extends TrainTestData { getResourceLines(resource).map(_.toInt).toList } } + +object Synthetic extends { + val train: Seq[XGBLabeledPoint] = Seq( + XGBLabeledPoint(1.0f, Array(0, 1), Array(1.0f, 2.0f)), + XGBLabeledPoint(0.0f, Array(0, 1, 2), Array(1.0f, 2.0f, 3.0f)), + XGBLabeledPoint(0.0f, Array(0, 1, 2), Array(1.0f, 2.0f, 3.0f)), + XGBLabeledPoint(1.0f, Array(0, 1), Array(1.0f, 2.0f)) + ) +} diff --git a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifierSuite.scala b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifierSuite.scala index b1abc2db2..4e0a2a016 100644 --- a/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifierSuite.scala +++ b/jvm-packages/xgboost4j-spark/src/test/scala/ml/dmlc/xgboost4j/scala/spark/XGBoostClassifierSuite.scala @@ -17,11 +17,14 @@ package ml.dmlc.xgboost4j.scala.spark import ml.dmlc.xgboost4j.scala.{DMatrix, XGBoost => ScalaXGBoost} + import org.apache.spark.ml.linalg._ import org.apache.spark.ml.param.ParamMap import org.apache.spark.sql._ import org.scalatest.FunSuite +import org.apache.spark.Partitioner + class XGBoostClassifierSuite extends FunSuite with PerTest { test("XGBoost-Spark XGBoostClassifier ouput should match XGBoost4j") { @@ -263,4 +266,46 @@ class XGBoostClassifierSuite extends FunSuite with PerTest { assert(resultDF.columns.contains("predictLeaf")) assert(resultDF.columns.contains("predictContrib")) } + + test("infrequent features") { + val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", + "objective" -> "binary:logistic", + "num_round" -> 5, "num_workers" -> 2) + import DataUtils._ + val sparkSession = SparkSession.builder().getOrCreate() + import sparkSession.implicits._ + val repartitioned = sc.parallelize(Synthetic.train, 3).map(lp => (lp.label, lp)).partitionBy( + new Partitioner { + override def numPartitions: Int = 2 + + override def getPartition(key: Any): Int = key.asInstanceOf[Float].toInt + } + ).map(_._2).zipWithIndex().map { + case (lp, id) => + (id, lp.label, lp.features) + }.toDF("id", "label", "features") + val xgb = new XGBoostClassifier(paramMap) + xgb.fit(repartitioned) + } + + test("infrequent features (use_external_memory)") { + val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1", + "objective" -> "binary:logistic", + "num_round" -> 5, "num_workers" -> 2, "use_external_memory" -> true) + import DataUtils._ + val sparkSession = SparkSession.builder().getOrCreate() + import sparkSession.implicits._ + val repartitioned = sc.parallelize(Synthetic.train, 3).map(lp => (lp.label, lp)).partitionBy( + new Partitioner { + override def numPartitions: Int = 2 + + override def getPartition(key: Any): Int = key.asInstanceOf[Float].toInt + } + ).map(_._2).zipWithIndex().map { + case (lp, id) => + (id, lp.label, lp.features) + }.toDF("id", "label", "features") + val xgb = new XGBoostClassifier(paramMap) + xgb.fit(repartitioned) + } }