[BLOCKING] fix the issue with infrequent feature (#4045)

* fix the issue with infrequent feature

* handle exception

* use only 2 workers

* address the comments
This commit is contained in:
Nan Zhu 2019-01-06 16:01:03 -08:00 committed by GitHub
parent e290ec9a80
commit 773ddbcfcb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 64 additions and 2 deletions

View File

@ -9,6 +9,7 @@
#include <dmlc/base.h>
#include <dmlc/data.h>
#include <rabit/rabit.h>
#include <cstring>
#include <memory>
#include <numeric>
@ -169,8 +170,16 @@ class SparsePage {
inline Inst operator[](size_t i) const {
const auto& data_vec = data.HostVector();
const auto& offset_vec = offset.HostVector();
size_t size;
// in distributed mode, some partitions may not get any instance for a feature. Therefore
// we should set the size as zero
if (rabit::IsDistributed() && i + 1 >= offset_vec.size()) {
size = 0;
} else {
size = offset_vec[i + 1] - offset_vec[i];
}
return {data_vec.data() + offset_vec[i],
static_cast<Inst::index_type>(offset_vec[i + 1] - offset_vec[i])};
static_cast<Inst::index_type>(size)};
}
/*! \brief constructor */
@ -285,7 +294,6 @@ class SparsePage {
auto& data_vec = data.HostVector();
auto& offset_vec = offset.HostVector();
offset_vec.push_back(offset_vec.back() + inst.size());
size_t begin = data_vec.size();
data_vec.resize(begin + inst.size());
if (inst.size() != 0) {

View File

@ -98,3 +98,12 @@ object Ranking extends TrainTestData {
getResourceLines(resource).map(_.toInt).toList
}
}
object Synthetic extends {
val train: Seq[XGBLabeledPoint] = Seq(
XGBLabeledPoint(1.0f, Array(0, 1), Array(1.0f, 2.0f)),
XGBLabeledPoint(0.0f, Array(0, 1, 2), Array(1.0f, 2.0f, 3.0f)),
XGBLabeledPoint(0.0f, Array(0, 1, 2), Array(1.0f, 2.0f, 3.0f)),
XGBLabeledPoint(1.0f, Array(0, 1), Array(1.0f, 2.0f))
)
}

View File

@ -17,11 +17,14 @@
package ml.dmlc.xgboost4j.scala.spark
import ml.dmlc.xgboost4j.scala.{DMatrix, XGBoost => ScalaXGBoost}
import org.apache.spark.ml.linalg._
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql._
import org.scalatest.FunSuite
import org.apache.spark.Partitioner
class XGBoostClassifierSuite extends FunSuite with PerTest {
test("XGBoost-Spark XGBoostClassifier ouput should match XGBoost4j") {
@ -263,4 +266,46 @@ class XGBoostClassifierSuite extends FunSuite with PerTest {
assert(resultDF.columns.contains("predictLeaf"))
assert(resultDF.columns.contains("predictContrib"))
}
test("infrequent features") {
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic",
"num_round" -> 5, "num_workers" -> 2)
import DataUtils._
val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._
val repartitioned = sc.parallelize(Synthetic.train, 3).map(lp => (lp.label, lp)).partitionBy(
new Partitioner {
override def numPartitions: Int = 2
override def getPartition(key: Any): Int = key.asInstanceOf[Float].toInt
}
).map(_._2).zipWithIndex().map {
case (lp, id) =>
(id, lp.label, lp.features)
}.toDF("id", "label", "features")
val xgb = new XGBoostClassifier(paramMap)
xgb.fit(repartitioned)
}
test("infrequent features (use_external_memory)") {
val paramMap = Map("eta" -> "1", "max_depth" -> "6", "silent" -> "1",
"objective" -> "binary:logistic",
"num_round" -> 5, "num_workers" -> 2, "use_external_memory" -> true)
import DataUtils._
val sparkSession = SparkSession.builder().getOrCreate()
import sparkSession.implicits._
val repartitioned = sc.parallelize(Synthetic.train, 3).map(lp => (lp.label, lp)).partitionBy(
new Partitioner {
override def numPartitions: Int = 2
override def getPartition(key: Any): Int = key.asInstanceOf[Float].toInt
}
).map(_._2).zipWithIndex().map {
case (lp, id) =>
(id, lp.label, lp.features)
}.toDF("id", "label", "features")
val xgb = new XGBoostClassifier(paramMap)
xgb.fit(repartitioned)
}
}