spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m...@apache.org
Subject spark git commit: [SPARK-7685] [ML] Apply weights to different samples in Logistic Regression
Date Tue, 15 Sep 2015 22:46:49 GMT
Repository: spark
Updated Branches:
  refs/heads/master 31a229aa7 -> be52faa7c


[SPARK-7685] [ML] Apply weights to different samples in Logistic Regression

In fraud detection dataset, almost all the samples are negative while only couple of them are positive. This type of high imbalanced data will bias the models toward negative resulting poor performance. In python-scikit, they provide a correction allowing users to Over-/undersample the samples of each class according to the given weights. In auto mode, selects weights inversely proportional to class frequencies in the training set. This can be done in a more efficient way by multiplying the weights into loss and gradient instead of doing actual over/undersampling in the training dataset which is very expensive.
http://scikit-learn.org/stable/modules/generated/sklearn.linear_model.LogisticRegression.html
On the other hand, some of the training data maybe more important like the training samples from tenure users while the training samples from new users maybe less important. We should be able to provide another "weight: Double" information in the LabeledPoint to weight them differently in the learning algorithm.

Author: DB Tsai <dbt@netflix.com>
Author: DB Tsai <dbt@dbs-mac-pro.corp.netflix.com>

Closes #7884 from dbtsai/SPARK-7685.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/be52faa7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/be52faa7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/be52faa7

Branch: refs/heads/master
Commit: be52faa7c72fb4b95829f09a7dc5eb5dccd03524
Parents: 31a229a
Author: DB Tsai <dbt@netflix.com>
Authored: Tue Sep 15 15:46:47 2015 -0700
Committer: Xiangrui Meng <meng@databricks.com>
Committed: Tue Sep 15 15:46:47 2015 -0700

----------------------------------------------------------------------
 .../ml/classification/LogisticRegression.scala  | 199 +++++++++++--------
 .../ml/param/shared/SharedParamsCodeGen.scala   |   6 +-
 .../spark/ml/param/shared/sharedParams.scala    |  12 +-
 .../stat/MultivariateOnlineSummarizer.scala     |  75 ++++---
 .../LogisticRegressionSuite.scala               | 102 +++++++++-
 .../MultivariateOnlineSummarizerSuite.scala     |  27 +++
 project/MimaExcludes.scala                      |  10 +-
 7 files changed, 303 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/be52faa7/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index a460262..bd96e8d 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -29,12 +29,12 @@ import org.apache.spark.ml.param.shared._
 import org.apache.spark.ml.util.Identifiable
 import org.apache.spark.mllib.linalg._
 import org.apache.spark.mllib.linalg.BLAS._
-import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
 import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.functions.{col, lit}
 import org.apache.spark.storage.StorageLevel
 
 /**
@@ -42,7 +42,7 @@ import org.apache.spark.storage.StorageLevel
  */
 private[classification] trait LogisticRegressionParams extends ProbabilisticClassifierParams
   with HasRegParam with HasElasticNetParam with HasMaxIter with HasFitIntercept with HasTol
-  with HasStandardization with HasThreshold {
+  with HasStandardization with HasWeightCol with HasThreshold {
 
   /**
    * Set threshold in binary classification, in range [0, 1].
@@ -147,6 +147,17 @@ private[classification] trait LogisticRegressionParams extends ProbabilisticClas
 }
 
 /**
+ * Class that represents an instance of weighted data point with label and features.
+ *
+ * TODO: Refactor this class to proper place.
+ *
+ * @param label Label for this data point.
+ * @param weight The weight of this instance.
+ * @param features The vector of features for this data point.
+ */
+private[classification] case class Instance(label: Double, weight: Double, features: Vector)
+
+/**
  * :: Experimental ::
  * Logistic regression.
  * Currently, this class only supports binary classification.  It will support multiclass
@@ -218,31 +229,42 @@ class LogisticRegression(override val uid: String)
 
   override def getThreshold: Double = super.getThreshold
 
+  /**
+   * Whether to over-/under-sample training instances according to the given weights in weightCol.
+   * If empty, all instances are treated equally (weight 1.0).
+   * Default is empty, so all instances have weight one.
+   * @group setParam
+   */
+  def setWeightCol(value: String): this.type = set(weightCol, value)
+  setDefault(weightCol -> "")
+
   override def setThresholds(value: Array[Double]): this.type = super.setThresholds(value)
 
   override def getThresholds: Array[Double] = super.getThresholds
 
   override protected def train(dataset: DataFrame): LogisticRegressionModel = {
     // Extract columns from data.  If dataset is persisted, do not persist oldDataset.
-    val instances = extractLabeledPoints(dataset).map {
-      case LabeledPoint(label: Double, features: Vector) => (label, features)
+    val w = if ($(weightCol).isEmpty) lit(1.0) else col($(weightCol))
+    val instances: RDD[Instance] = dataset.select(col($(labelCol)), w, col($(featuresCol))).map {
+      case Row(label: Double, weight: Double, features: Vector) =>
+        Instance(label, weight, features)
     }
+
     val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
     if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
 
-    val (summarizer, labelSummarizer) = instances.treeAggregate(
-      (new MultivariateOnlineSummarizer, new MultiClassSummarizer))(
-        seqOp = (c, v) => (c, v) match {
-          case ((summarizer: MultivariateOnlineSummarizer, labelSummarizer: MultiClassSummarizer),
-          (label: Double, features: Vector)) =>
-            (summarizer.add(features), labelSummarizer.add(label))
-        },
-        combOp = (c1, c2) => (c1, c2) match {
-          case ((summarizer1: MultivariateOnlineSummarizer,
-          classSummarizer1: MultiClassSummarizer), (summarizer2: MultivariateOnlineSummarizer,
-          classSummarizer2: MultiClassSummarizer)) =>
-            (summarizer1.merge(summarizer2), classSummarizer1.merge(classSummarizer2))
-      })
+    val (summarizer, labelSummarizer) = {
+      val seqOp = (c: (MultivariateOnlineSummarizer, MultiClassSummarizer),
+        instance: Instance) =>
+          (c._1.add(instance.features, instance.weight), c._2.add(instance.label, instance.weight))
+
+      val combOp = (c1: (MultivariateOnlineSummarizer, MultiClassSummarizer),
+        c2: (MultivariateOnlineSummarizer, MultiClassSummarizer)) =>
+          (c1._1.merge(c2._1), c1._2.merge(c2._2))
+
+      instances.treeAggregate(
+        new MultivariateOnlineSummarizer, new MultiClassSummarizer)(seqOp, combOp)
+    }
 
     val histogram = labelSummarizer.histogram
     val numInvalid = labelSummarizer.countInvalid
@@ -295,7 +317,7 @@ class LogisticRegression(override val uid: String)
       new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))
     }
 
-    val initialWeightsWithIntercept =
+    val initialCoefficientsWithIntercept =
       Vectors.zeros(if ($(fitIntercept)) numFeatures + 1 else numFeatures)
 
     if ($(fitIntercept)) {
@@ -312,14 +334,14 @@ class LogisticRegression(override val uid: String)
          b = \log{P(1) / P(0)} = \log{count_1 / count_0}
          }}}
        */
-      initialWeightsWithIntercept.toArray(numFeatures)
-        = math.log(histogram(1).toDouble / histogram(0).toDouble)
+      initialCoefficientsWithIntercept.toArray(numFeatures)
+        = math.log(histogram(1) / histogram(0))
     }
 
     val states = optimizer.iterations(new CachedDiffFunction(costFun),
-      initialWeightsWithIntercept.toBreeze.toDenseVector)
+      initialCoefficientsWithIntercept.toBreeze.toDenseVector)
 
-    val (weights, intercept, objectiveHistory) = {
+    val (coefficients, intercept, objectiveHistory) = {
       /*
          Note that in Logistic Regression, the objective history (loss + regularization)
          is log-likelihood which is invariance under feature standardization. As a result,
@@ -339,28 +361,29 @@ class LogisticRegression(override val uid: String)
       }
 
       /*
-         The weights are trained in the scaled space; we're converting them back to
+         The coefficients are trained in the scaled space; we're converting them back to
          the original space.
          Note that the intercept in scaled space and original space is the same;
          as a result, no scaling is needed.
        */
-      val rawWeights = state.x.toArray.clone()
+      val rawCoefficients = state.x.toArray.clone()
       var i = 0
       while (i < numFeatures) {
-        rawWeights(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 }
+        rawCoefficients(i) *= { if (featuresStd(i) != 0.0) 1.0 / featuresStd(i) else 0.0 }
         i += 1
       }
 
       if ($(fitIntercept)) {
-        (Vectors.dense(rawWeights.dropRight(1)).compressed, rawWeights.last, arrayBuilder.result())
+        (Vectors.dense(rawCoefficients.dropRight(1)).compressed, rawCoefficients.last,
+          arrayBuilder.result())
       } else {
-        (Vectors.dense(rawWeights).compressed, 0.0, arrayBuilder.result())
+        (Vectors.dense(rawCoefficients).compressed, 0.0, arrayBuilder.result())
       }
     }
 
     if (handlePersistence) instances.unpersist()
 
-    val model = copyValues(new LogisticRegressionModel(uid, weights, intercept))
+    val model = copyValues(new LogisticRegressionModel(uid, coefficients, intercept))
     val logRegSummary = new BinaryLogisticRegressionTrainingSummary(
       model.transform(dataset),
       $(probabilityCol),
@@ -501,22 +524,29 @@ class LogisticRegressionModel private[ml] (
  * corresponding joint dataset.
  */
 private[classification] class MultiClassSummarizer extends Serializable {
-  private val distinctMap = new mutable.HashMap[Int, Long]
+  // The first element of value in distinctMap is the actually number of instances,
+  // and the second element of value is sum of the weights.
+  private val distinctMap = new mutable.HashMap[Int, (Long, Double)]
   private var totalInvalidCnt: Long = 0L
 
   /**
    * Add a new label into this MultilabelSummarizer, and update the distinct map.
    * @param label The label for this data point.
+   * @param weight The weight of this instances.
    * @return This MultilabelSummarizer
    */
-  def add(label: Double): this.type = {
+  def add(label: Double, weight: Double = 1.0): this.type = {
+    require(weight >= 0.0, s"instance weight, ${weight} has to be >= 0.0")
+
+    if (weight == 0.0) return this
+
     if (label - label.toInt != 0.0 || label < 0) {
       totalInvalidCnt += 1
       this
     }
     else {
-      val counts: Long = distinctMap.getOrElse(label.toInt, 0L)
-      distinctMap.put(label.toInt, counts + 1)
+      val (counts: Long, weightSum: Double) = distinctMap.getOrElse(label.toInt, (0L, 0.0))
+      distinctMap.put(label.toInt, (counts + 1L, weightSum + weight))
       this
     }
   }
@@ -537,8 +567,8 @@ private[classification] class MultiClassSummarizer extends Serializable {
     }
     smallMap.distinctMap.foreach {
       case (key, value) =>
-        val counts = largeMap.distinctMap.getOrElse(key, 0L)
-        largeMap.distinctMap.put(key, counts + value)
+        val (counts: Long, weightSum: Double) = largeMap.distinctMap.getOrElse(key, (0L, 0.0))
+        largeMap.distinctMap.put(key, (counts + value._1, weightSum + value._2))
     }
     largeMap.totalInvalidCnt += smallMap.totalInvalidCnt
     largeMap
@@ -550,13 +580,13 @@ private[classification] class MultiClassSummarizer extends Serializable {
   /** @return The number of distinct labels in the input dataset. */
   def numClasses: Int = distinctMap.keySet.max + 1
 
-  /** @return The counts of each label in the input dataset. */
-  def histogram: Array[Long] = {
-    val result = Array.ofDim[Long](numClasses)
+  /** @return The weightSum of each label in the input dataset. */
+  def histogram: Array[Double] = {
+    val result = Array.ofDim[Double](numClasses)
     var i = 0
     val len = result.length
     while (i < len) {
-      result(i) = distinctMap.getOrElse(i, 0L)
+      result(i) = distinctMap.getOrElse(i, (0L, 0.0))._2
       i += 1
     }
     result
@@ -565,6 +595,8 @@ private[classification] class MultiClassSummarizer extends Serializable {
 
 /**
  * Abstraction for multinomial Logistic Regression Training results.
+ * Currently, the training summary ignores the training weights except
+ * for the objective trace.
  */
 sealed trait LogisticRegressionTrainingSummary extends LogisticRegressionSummary {
 
@@ -584,10 +616,10 @@ sealed trait LogisticRegressionSummary extends Serializable {
   /** Dataframe outputted by the model's `transform` method. */
   def predictions: DataFrame
 
-  /** Field in "predictions" which gives the calibrated probability of each sample as a vector. */
+  /** Field in "predictions" which gives the calibrated probability of each instance as a vector. */
   def probabilityCol: String
 
-  /** Field in "predictions" which gives the the true label of each sample. */
+  /** Field in "predictions" which gives the the true label of each instance. */
   def labelCol: String
 
 }
@@ -597,8 +629,8 @@ sealed trait LogisticRegressionSummary extends Serializable {
  * Logistic regression training results.
  * @param predictions dataframe outputted by the model's `transform` method.
  * @param probabilityCol field in "predictions" which gives the calibrated probability of
- *                       each sample as a vector.
- * @param labelCol field in "predictions" which gives the true label of each sample.
+ *                       each instance as a vector.
+ * @param labelCol field in "predictions" which gives the true label of each instance.
  * @param objectiveHistory objective function (scaled loss + regularization) at each iteration.
  */
 @Experimental
@@ -617,8 +649,8 @@ class BinaryLogisticRegressionTrainingSummary private[classification] (
  * Binary Logistic regression results for a given model.
  * @param predictions dataframe outputted by the model's `transform` method.
  * @param probabilityCol field in "predictions" which gives the calibrated probability of
- *                       each sample.
- * @param labelCol field in "predictions" which gives the true label of each sample.
+ *                       each instance.
+ * @param labelCol field in "predictions" which gives the true label of each instance.
  */
 @Experimental
 class BinaryLogisticRegressionSummary private[classification] (
@@ -687,14 +719,14 @@ class BinaryLogisticRegressionSummary private[classification] (
 
 /**
  * LogisticAggregator computes the gradient and loss for binary logistic loss function, as used
- * in binary classification for samples in sparse or dense vector in a online fashion.
+ * in binary classification for instances in sparse or dense vector in a online fashion.
  *
  * Note that multinomial logistic loss is not supported yet!
  *
  * Two LogisticAggregator can be merged together to have a summary of loss and gradient of
  * the corresponding joint dataset.
  *
- * @param weights The weights/coefficients corresponding to the features.
+ * @param coefficients The coefficients corresponding to the features.
  * @param numClasses the number of possible outcomes for k classes classification problem in
  *                   Multinomial Logistic Regression.
  * @param fitIntercept Whether to fit an intercept term.
@@ -702,25 +734,25 @@ class BinaryLogisticRegressionSummary private[classification] (
  * @param featuresMean The mean values of the features.
  */
 private class LogisticAggregator(
-    weights: Vector,
+    coefficients: Vector,
     numClasses: Int,
     fitIntercept: Boolean,
     featuresStd: Array[Double],
     featuresMean: Array[Double]) extends Serializable {
 
-  private var totalCnt: Long = 0L
+  private var weightSum = 0.0
   private var lossSum = 0.0
 
-  private val weightsArray = weights match {
+  private val coefficientsArray = coefficients match {
     case dv: DenseVector => dv.values
     case _ =>
       throw new IllegalArgumentException(
-        s"weights only supports dense vector but got type ${weights.getClass}.")
+        s"coefficients only supports dense vector but got type ${coefficients.getClass}.")
   }
 
-  private val dim = if (fitIntercept) weightsArray.length - 1 else weightsArray.length
+  private val dim = if (fitIntercept) coefficientsArray.length - 1 else coefficientsArray.length
 
-  private val gradientSumArray = Array.ofDim[Double](weightsArray.length)
+  private val gradientSumArray = Array.ofDim[Double](coefficientsArray.length)
 
   /**
    * Add a new training data to this LogisticAggregator, and update the loss and gradient
@@ -729,13 +761,17 @@ private class LogisticAggregator(
    * @param label The label for this data point.
    * @param data The features for one data point in dense/sparse vector format to be added
    *             into this aggregator.
+   * @param weight The weight for over-/undersamples each of training instance. Default is one.
    * @return This LogisticAggregator object.
    */
-  def add(label: Double, data: Vector): this.type = {
-    require(dim == data.size, s"Dimensions mismatch when adding new sample." +
+  def add(label: Double, data: Vector, weight: Double = 1.0): this.type = {
+    require(dim == data.size, s"Dimensions mismatch when adding new instance." +
       s" Expecting $dim but got ${data.size}.")
+    require(weight >= 0.0, s"instance weight, ${weight} has to be >= 0.0")
 
-    val localWeightsArray = weightsArray
+    if (weight == 0.0) return this
+
+    val localCoefficientsArray = coefficientsArray
     val localGradientSumArray = gradientSumArray
 
     numClasses match {
@@ -745,13 +781,13 @@ private class LogisticAggregator(
           var sum = 0.0
           data.foreachActive { (index, value) =>
             if (featuresStd(index) != 0.0 && value != 0.0) {
-              sum += localWeightsArray(index) * (value / featuresStd(index))
+              sum += localCoefficientsArray(index) * (value / featuresStd(index))
             }
           }
-          sum + { if (fitIntercept) localWeightsArray(dim) else 0.0 }
+          sum + { if (fitIntercept) localCoefficientsArray(dim) else 0.0 }
         }
 
-        val multiplier = (1.0 / (1.0 + math.exp(margin))) - label
+        val multiplier = weight * (1.0 / (1.0 + math.exp(margin)) - label)
 
         data.foreachActive { (index, value) =>
           if (featuresStd(index) != 0.0 && value != 0.0) {
@@ -765,15 +801,15 @@ private class LogisticAggregator(
 
         if (label > 0) {
           // The following is equivalent to log(1 + exp(margin)) but more numerically stable.
-          lossSum += MLUtils.log1pExp(margin)
+          lossSum += weight * MLUtils.log1pExp(margin)
         } else {
-          lossSum += MLUtils.log1pExp(margin) - margin
+          lossSum += weight * (MLUtils.log1pExp(margin) - margin)
         }
       case _ =>
         new NotImplementedError("LogisticRegression with ElasticNet in ML package only supports " +
           "binary classification for now.")
     }
-    totalCnt += 1
+    weightSum += weight
     this
   }
 
@@ -789,8 +825,8 @@ private class LogisticAggregator(
     require(dim == other.dim, s"Dimensions mismatch when merging with another " +
       s"LeastSquaresAggregator. Expecting $dim but got ${other.dim}.")
 
-    if (other.totalCnt != 0) {
-      totalCnt += other.totalCnt
+    if (other.weightSum != 0.0) {
+      weightSum += other.weightSum
       lossSum += other.lossSum
 
       var i = 0
@@ -805,13 +841,17 @@ private class LogisticAggregator(
     this
   }
 
-  def count: Long = totalCnt
-
-  def loss: Double = lossSum / totalCnt
+  def loss: Double = {
+    require(weightSum > 0.0, s"The effective number of instances should be " +
+      s"greater than 0.0, but $weightSum.")
+    lossSum / weightSum
+  }
 
   def gradient: Vector = {
+    require(weightSum > 0.0, s"The effective number of instances should be " +
+      s"greater than 0.0, but $weightSum.")
     val result = Vectors.dense(gradientSumArray.clone())
-    scal(1.0 / totalCnt, result)
+    scal(1.0 / weightSum, result)
     result
   }
 }
@@ -823,7 +863,7 @@ private class LogisticAggregator(
  * It's used in Breeze's convex optimization routines.
  */
 private class LogisticCostFun(
-    data: RDD[(Double, Vector)],
+    data: RDD[Instance],
     numClasses: Int,
     fitIntercept: Boolean,
     standardization: Boolean,
@@ -831,22 +871,23 @@ private class LogisticCostFun(
     featuresMean: Array[Double],
     regParamL2: Double) extends DiffFunction[BDV[Double]] {
 
-  override def calculate(weights: BDV[Double]): (Double, BDV[Double]) = {
+  override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = {
     val numFeatures = featuresStd.length
-    val w = Vectors.fromBreeze(weights)
+    val w = Vectors.fromBreeze(coefficients)
 
-    val logisticAggregator = data.treeAggregate(new LogisticAggregator(w, numClasses, fitIntercept,
-      featuresStd, featuresMean))(
-        seqOp = (c, v) => (c, v) match {
-          case (aggregator, (label, features)) => aggregator.add(label, features)
-        },
-        combOp = (c1, c2) => (c1, c2) match {
-          case (aggregator1, aggregator2) => aggregator1.merge(aggregator2)
-        })
+    val logisticAggregator = {
+      val seqOp = (c: LogisticAggregator, instance: Instance) =>
+        c.add(instance.label, instance.features, instance.weight)
+      val combOp = (c1: LogisticAggregator, c2: LogisticAggregator) => c1.merge(c2)
+
+      data.treeAggregate(
+        new LogisticAggregator(w, numClasses, fitIntercept, featuresStd, featuresMean)
+      )(seqOp, combOp)
+    }
 
     val totalGradientArray = logisticAggregator.gradient.toArray
 
-    // regVal is the sum of weight squares excluding intercept for L2 regularization.
+    // regVal is the sum of coefficients squares excluding intercept for L2 regularization.
     val regVal = if (regParamL2 == 0.0) {
       0.0
     } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/be52faa7/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
index e9e99ed..8049d51 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
@@ -42,7 +42,7 @@ private[shared] object SharedParamsCodeGen {
         Some("\"rawPrediction\"")),
       ParamDesc[String]("probabilityCol", "Column name for predicted class conditional" +
         " probabilities. Note: Not all models output well-calibrated probability estimates!" +
-        " These probabilities should be treated as confidences, not precise probabilities.",
+        " These probabilities should be treated as confidences, not precise probabilities",
         Some("\"probability\"")),
       ParamDesc[Double]("threshold",
         "threshold in binary classification prediction, in range [0, 1]", Some("0.5"),
@@ -65,10 +65,10 @@ private[shared] object SharedParamsCodeGen {
         "options may be added later.",
         isValid = "ParamValidators.inArray(Array(\"skip\", \"error\"))"),
       ParamDesc[Boolean]("standardization", "whether to standardize the training features" +
-        " before fitting the model.", Some("true")),
+        " before fitting the model", Some("true")),
       ParamDesc[Long]("seed", "random seed", Some("this.getClass.getName.hashCode.toLong")),
       ParamDesc[Double]("elasticNetParam", "the ElasticNet mixing parameter, in range [0, 1]." +
-        " For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.",
+        " For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty",
         isValid = "ParamValidators.inRange(0, 1)"),
       ParamDesc[Double]("tol", "the convergence tolerance for iterative algorithms"),
       ParamDesc[Double]("stepSize", "Step size to be used for each iteration of optimization."),

http://git-wip-us.apache.org/repos/asf/spark/blob/be52faa7/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
index 3009217..aff47fc 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
@@ -127,10 +127,10 @@ private[ml] trait HasRawPredictionCol extends Params {
 private[ml] trait HasProbabilityCol extends Params {
 
   /**
-   * Param for Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities..
+   * Param for Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities.
    * @group param
    */
-  final val probabilityCol: Param[String] = new Param[String](this, "probabilityCol", "Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities.")
+  final val probabilityCol: Param[String] = new Param[String](this, "probabilityCol", "Column name for predicted class conditional probabilities. Note: Not all models output well-calibrated probability estimates! These probabilities should be treated as confidences, not precise probabilities")
 
   setDefault(probabilityCol, "probability")
 
@@ -270,10 +270,10 @@ private[ml] trait HasHandleInvalid extends Params {
 private[ml] trait HasStandardization extends Params {
 
   /**
-   * Param for whether to standardize the training features before fitting the model..
+   * Param for whether to standardize the training features before fitting the model.
    * @group param
    */
-  final val standardization: BooleanParam = new BooleanParam(this, "standardization", "whether to standardize the training features before fitting the model.")
+  final val standardization: BooleanParam = new BooleanParam(this, "standardization", "whether to standardize the training features before fitting the model")
 
   setDefault(standardization, true)
 
@@ -304,10 +304,10 @@ private[ml] trait HasSeed extends Params {
 private[ml] trait HasElasticNetParam extends Params {
 
   /**
-   * Param for the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty..
+   * Param for the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.
    * @group param
    */
-  final val elasticNetParam: DoubleParam = new DoubleParam(this, "elasticNetParam", "the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty.", ParamValidators.inRange(0, 1))
+  final val elasticNetParam: DoubleParam = new DoubleParam(this, "elasticNetParam", "the ElasticNet mixing parameter, in range [0, 1]. For alpha = 0, the penalty is an L2 penalty. For alpha = 1, it is an L1 penalty", ParamValidators.inRange(0, 1))
 
   /** @group getParam */
   final def getElasticNetParam: Double = $(elasticNetParam)

http://git-wip-us.apache.org/repos/asf/spark/blob/be52faa7/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
index 51b713e..201333c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
@@ -23,16 +23,19 @@ import org.apache.spark.mllib.linalg.{Vectors, Vector}
 /**
  * :: DeveloperApi ::
  * MultivariateOnlineSummarizer implements [[MultivariateStatisticalSummary]] to compute the mean,
- * variance, minimum, maximum, counts, and nonzero counts for samples in sparse or dense vector
+ * variance, minimum, maximum, counts, and nonzero counts for instances in sparse or dense vector
  * format in a online fashion.
  *
  * Two MultivariateOnlineSummarizer can be merged together to have a statistical summary of
  * the corresponding joint dataset.
  *
- * A numerically stable algorithm is implemented to compute sample mean and variance:
+ * A numerically stable algorithm is implemented to compute the mean and variance of instances:
  * Reference: [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]]
  * Zero elements (including explicit zero values) are skipped when calling add(),
  * to have time complexity O(nnz) instead of O(n) for each column.
+ *
+ * For weighted instances, the unbiased estimation of variance is defined by the reliability
+ * weights: [[https://en.wikipedia.org/wiki/Weighted_arithmetic_mean#Reliability_weights]].
  */
 @Since("1.1.0")
 @DeveloperApi
@@ -44,6 +47,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
   private var currM2: Array[Double] = _
   private var currL1: Array[Double] = _
   private var totalCnt: Long = 0
+  private var weightSum: Double = 0.0
+  private var weightSquareSum: Double = 0.0
   private var nnz: Array[Double] = _
   private var currMax: Array[Double] = _
   private var currMin: Array[Double] = _
@@ -55,10 +60,15 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
    * @return This MultivariateOnlineSummarizer object.
    */
   @Since("1.1.0")
-  def add(sample: Vector): this.type = {
+  def add(sample: Vector): this.type = add(sample, 1.0)
+
+  private[spark] def add(instance: Vector, weight: Double): this.type = {
+    require(weight >= 0.0, s"sample weight, ${weight} has to be >= 0.0")
+    if (weight == 0.0) return this
+
     if (n == 0) {
-      require(sample.size > 0, s"Vector should have dimension larger than zero.")
-      n = sample.size
+      require(instance.size > 0, s"Vector should have dimension larger than zero.")
+      n = instance.size
 
       currMean = Array.ofDim[Double](n)
       currM2n = Array.ofDim[Double](n)
@@ -69,8 +79,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
       currMin = Array.fill[Double](n)(Double.MaxValue)
     }
 
-    require(n == sample.size, s"Dimensions mismatch when adding new sample." +
-      s" Expecting $n but got ${sample.size}.")
+    require(n == instance.size, s"Dimensions mismatch when adding new sample." +
+      s" Expecting $n but got ${instance.size}.")
 
     val localCurrMean = currMean
     val localCurrM2n = currM2n
@@ -79,7 +89,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
     val localNnz = nnz
     val localCurrMax = currMax
     val localCurrMin = currMin
-    sample.foreachActive { (index, value) =>
+    instance.foreachActive { (index, value) =>
       if (value != 0.0) {
         if (localCurrMax(index) < value) {
           localCurrMax(index) = value
@@ -90,15 +100,17 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
 
         val prevMean = localCurrMean(index)
         val diff = value - prevMean
-        localCurrMean(index) = prevMean + diff / (localNnz(index) + 1.0)
-        localCurrM2n(index) += (value - localCurrMean(index)) * diff
-        localCurrM2(index) += value * value
-        localCurrL1(index) += math.abs(value)
+        localCurrMean(index) = prevMean + weight * diff / (localNnz(index) + weight)
+        localCurrM2n(index) += weight * (value - localCurrMean(index)) * diff
+        localCurrM2(index) += weight * value * value
+        localCurrL1(index) += weight * math.abs(value)
 
-        localNnz(index) += 1.0
+        localNnz(index) += weight
       }
     }
 
+    weightSum += weight
+    weightSquareSum += weight * weight
     totalCnt += 1
     this
   }
@@ -112,10 +124,12 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
    */
   @Since("1.1.0")
   def merge(other: MultivariateOnlineSummarizer): this.type = {
-    if (this.totalCnt != 0 && other.totalCnt != 0) {
+    if (this.weightSum != 0.0 && other.weightSum != 0.0) {
       require(n == other.n, s"Dimensions mismatch when merging with another summarizer. " +
         s"Expecting $n but got ${other.n}.")
       totalCnt += other.totalCnt
+      weightSum += other.weightSum
+      weightSquareSum += other.weightSquareSum
       var i = 0
       while (i < n) {
         val thisNnz = nnz(i)
@@ -138,13 +152,15 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
         nnz(i) = totalNnz
         i += 1
       }
-    } else if (totalCnt == 0 && other.totalCnt != 0) {
+    } else if (weightSum == 0.0 && other.weightSum != 0.0) {
       this.n = other.n
       this.currMean = other.currMean.clone()
       this.currM2n = other.currM2n.clone()
       this.currM2 = other.currM2.clone()
       this.currL1 = other.currL1.clone()
       this.totalCnt = other.totalCnt
+      this.weightSum = other.weightSum
+      this.weightSquareSum = other.weightSquareSum
       this.nnz = other.nnz.clone()
       this.currMax = other.currMax.clone()
       this.currMin = other.currMin.clone()
@@ -158,28 +174,28 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
    */
   @Since("1.1.0")
   override def mean: Vector = {
-    require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+    require(weightSum > 0, s"Nothing has been added to this summarizer.")
 
     val realMean = Array.ofDim[Double](n)
     var i = 0
     while (i < n) {
-      realMean(i) = currMean(i) * (nnz(i) / totalCnt)
+      realMean(i) = currMean(i) * (nnz(i) / weightSum)
       i += 1
     }
     Vectors.dense(realMean)
   }
 
   /**
-   * Sample variance of each dimension.
+   * Unbiased estimate of sample variance of each dimension.
    *
    */
   @Since("1.1.0")
   override def variance: Vector = {
-    require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+    require(weightSum > 0, s"Nothing has been added to this summarizer.")
 
     val realVariance = Array.ofDim[Double](n)
 
-    val denominator = totalCnt - 1.0
+    val denominator = weightSum - (weightSquareSum / weightSum)
 
     // Sample variance is computed, if the denominator is less than 0, the variance is just 0.
     if (denominator > 0.0) {
@@ -187,9 +203,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
       var i = 0
       val len = currM2n.length
       while (i < len) {
-        realVariance(i) =
-          currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt
-        realVariance(i) /= denominator
+        realVariance(i) = (currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) *
+          (weightSum - nnz(i)) / weightSum) / denominator
         i += 1
       }
     }
@@ -209,7 +224,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
    */
   @Since("1.1.0")
   override def numNonzeros: Vector = {
-    require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+    require(weightSum > 0, s"Nothing has been added to this summarizer.")
 
     Vectors.dense(nnz)
   }
@@ -220,11 +235,11 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
    */
   @Since("1.1.0")
   override def max: Vector = {
-    require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+    require(weightSum > 0, s"Nothing has been added to this summarizer.")
 
     var i = 0
     while (i < n) {
-      if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0
+      if ((nnz(i) < weightSum) && (currMax(i) < 0.0)) currMax(i) = 0.0
       i += 1
     }
     Vectors.dense(currMax)
@@ -236,11 +251,11 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
    */
   @Since("1.1.0")
   override def min: Vector = {
-    require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+    require(weightSum > 0, s"Nothing has been added to this summarizer.")
 
     var i = 0
     while (i < n) {
-      if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0
+      if ((nnz(i) < weightSum) && (currMin(i) > 0.0)) currMin(i) = 0.0
       i += 1
     }
     Vectors.dense(currMin)
@@ -252,7 +267,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
    */
   @Since("1.2.0")
   override def normL2: Vector = {
-    require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+    require(weightSum > 0, s"Nothing has been added to this summarizer.")
 
     val realMagnitude = Array.ofDim[Double](n)
 
@@ -271,7 +286,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
    */
   @Since("1.2.0")
   override def normL1: Vector = {
-    require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+    require(weightSum > 0, s"Nothing has been added to this summarizer.")
 
     Vectors.dense(currL1)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/be52faa7/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
index cce39f3..f5219f9 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.ml.classification
 
+import scala.util.Random
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.param.ParamsSuite
 import org.apache.spark.ml.util.MLTestingUtils
 import org.apache.spark.mllib.classification.LogisticRegressionSuite._
 import org.apache.spark.mllib.linalg.{Vectors, Vector}
+import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.util.MLlibTestSparkContext
 import org.apache.spark.mllib.util.TestingUtils._
 import org.apache.spark.sql.{DataFrame, Row}
@@ -59,8 +62,7 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext {
 
       val testData = generateMultinomialLogisticInput(weights, xMean, xVariance, true, nPoints, 42)
 
-      sqlContext.createDataFrame(
-        generateMultinomialLogisticInput(weights, xMean, xVariance, true, nPoints, 42))
+      sqlContext.createDataFrame(sc.parallelize(testData, 4))
     }
   }
 
@@ -77,6 +79,7 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext {
     assert(lr.getPredictionCol === "prediction")
     assert(lr.getRawPredictionCol === "rawPrediction")
     assert(lr.getProbabilityCol === "probability")
+    assert(lr.getWeightCol === "")
     assert(lr.getFitIntercept)
     assert(lr.getStandardization)
     val model = lr.fit(dataset)
@@ -216,43 +219,65 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext {
   test("MultiClassSummarizer") {
     val summarizer1 = (new MultiClassSummarizer)
       .add(0.0).add(3.0).add(4.0).add(3.0).add(6.0)
-    assert(summarizer1.histogram.zip(Array[Long](1, 0, 0, 2, 1, 0, 1)).forall(x => x._1 === x._2))
+    assert(summarizer1.histogram === Array[Double](1, 0, 0, 2, 1, 0, 1))
     assert(summarizer1.countInvalid === 0)
     assert(summarizer1.numClasses === 7)
 
     val summarizer2 = (new MultiClassSummarizer)
       .add(1.0).add(5.0).add(3.0).add(0.0).add(4.0).add(1.0)
-    assert(summarizer2.histogram.zip(Array[Long](1, 2, 0, 1, 1, 1)).forall(x => x._1 === x._2))
+    assert(summarizer2.histogram === Array[Double](1, 2, 0, 1, 1, 1))
     assert(summarizer2.countInvalid === 0)
     assert(summarizer2.numClasses === 6)
 
     val summarizer3 = (new MultiClassSummarizer)
       .add(0.0).add(1.3).add(5.2).add(2.5).add(2.0).add(4.0).add(4.0).add(4.0).add(1.0)
-    assert(summarizer3.histogram.zip(Array[Long](1, 1, 1, 0, 3)).forall(x => x._1 === x._2))
+    assert(summarizer3.histogram === Array[Double](1, 1, 1, 0, 3))
     assert(summarizer3.countInvalid === 3)
     assert(summarizer3.numClasses === 5)
 
     val summarizer4 = (new MultiClassSummarizer)
       .add(3.1).add(4.3).add(2.0).add(1.0).add(3.0)
-    assert(summarizer4.histogram.zip(Array[Long](0, 1, 1, 1)).forall(x => x._1 === x._2))
+    assert(summarizer4.histogram === Array[Double](0, 1, 1, 1))
     assert(summarizer4.countInvalid === 2)
     assert(summarizer4.numClasses === 4)
 
     // small map merges large one
     val summarizerA = summarizer1.merge(summarizer2)
     assert(summarizerA.hashCode() === summarizer2.hashCode())
-    assert(summarizerA.histogram.zip(Array[Long](2, 2, 0, 3, 2, 1, 1)).forall(x => x._1 === x._2))
+    assert(summarizerA.histogram === Array[Double](2, 2, 0, 3, 2, 1, 1))
     assert(summarizerA.countInvalid === 0)
     assert(summarizerA.numClasses === 7)
 
     // large map merges small one
     val summarizerB = summarizer3.merge(summarizer4)
     assert(summarizerB.hashCode() === summarizer3.hashCode())
-    assert(summarizerB.histogram.zip(Array[Long](1, 2, 2, 1, 3)).forall(x => x._1 === x._2))
+    assert(summarizerB.histogram === Array[Double](1, 2, 2, 1, 3))
     assert(summarizerB.countInvalid === 5)
     assert(summarizerB.numClasses === 5)
   }
 
+  test("MultiClassSummarizer with weighted samples") {
+    val summarizer1 = (new MultiClassSummarizer)
+      .add(label = 0.0, weight = 0.2).add(3.0, 0.8).add(4.0, 3.2).add(3.0, 1.3).add(6.0, 3.1)
+    assert(Vectors.dense(summarizer1.histogram) ~==
+      Vectors.dense(Array(0.2, 0, 0, 2.1, 3.2, 0, 3.1)) absTol 1E-10)
+    assert(summarizer1.countInvalid === 0)
+    assert(summarizer1.numClasses === 7)
+
+    val summarizer2 = (new MultiClassSummarizer)
+      .add(1.0, 1.1).add(5.0, 2.3).add(3.0).add(0.0).add(4.0).add(1.0).add(2, 0.0)
+    assert(Vectors.dense(summarizer2.histogram) ~==
+      Vectors.dense(Array[Double](1.0, 2.1, 0.0, 1, 1, 2.3)) absTol 1E-10)
+    assert(summarizer2.countInvalid === 0)
+    assert(summarizer2.numClasses === 6)
+
+    val summarizer = summarizer1.merge(summarizer2)
+    assert(Vectors.dense(summarizer.histogram) ~==
+      Vectors.dense(Array(1.2, 2.1, 0.0, 3.1, 4.2, 2.3, 3.1)) absTol 1E-10)
+    assert(summarizer.countInvalid === 0)
+    assert(summarizer.numClasses === 7)
+  }
+
   test("binary logistic regression with intercept without regularization") {
     val trainer1 = (new LogisticRegression).setFitIntercept(true).setStandardization(true)
     val trainer2 = (new LogisticRegression).setFitIntercept(true).setStandardization(false)
@@ -713,7 +738,7 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext {
        b = \log{P(1) / P(0)} = \log{count_1 / count_0}
        }}}
      */
-    val interceptTheory = math.log(histogram(1).toDouble / histogram(0).toDouble)
+    val interceptTheory = math.log(histogram(1) / histogram(0))
     val weightsTheory = Vectors.dense(0.0, 0.0, 0.0, 0.0)
 
     assert(model1.intercept ~== interceptTheory relTol 1E-5)
@@ -781,4 +806,63 @@ class LogisticRegressionSuite extends SparkFunSuite with MLlibTestSparkContext {
         .forall(x => x(0) >= x(1)))
 
   }
+
+  test("binary logistic regression with weighted samples") {
+    val (dataset, weightedDataset) = {
+      val nPoints = 1000
+      val weights = Array(-0.57997, 0.912083, -0.371077, -0.819866, 2.688191)
+      val xMean = Array(5.843, 3.057, 3.758, 1.199)
+      val xVariance = Array(0.6856, 0.1899, 3.116, 0.581)
+      val testData = generateMultinomialLogisticInput(weights, xMean, xVariance, true, nPoints, 42)
+
+      // Let's over-sample the positive samples twice.
+      val data1 = testData.flatMap { case labeledPoint: LabeledPoint =>
+        if (labeledPoint.label == 1.0) {
+          Iterator(labeledPoint, labeledPoint)
+        } else {
+          Iterator(labeledPoint)
+        }
+      }
+
+      val rnd = new Random(8392)
+      val data2 = testData.flatMap { case LabeledPoint(label: Double, features: Vector) =>
+        if (rnd.nextGaussian() > 0.0) {
+          if (label == 1.0) {
+            Iterator(
+              Instance(label, 1.2, features),
+              Instance(label, 0.8, features),
+              Instance(0.0, 0.0, features))
+          } else {
+            Iterator(
+              Instance(label, 0.3, features),
+              Instance(1.0, 0.0, features),
+              Instance(label, 0.1, features),
+              Instance(label, 0.6, features))
+          }
+        } else {
+          if (label == 1.0) {
+            Iterator(Instance(label, 2.0, features))
+          } else {
+            Iterator(Instance(label, 1.0, features))
+          }
+        }
+      }
+
+      (sqlContext.createDataFrame(sc.parallelize(data1, 4)),
+        sqlContext.createDataFrame(sc.parallelize(data2, 4)))
+    }
+
+    val trainer1a = (new LogisticRegression).setFitIntercept(true)
+      .setRegParam(0.0).setStandardization(true)
+    val trainer1b = (new LogisticRegression).setFitIntercept(true).setWeightCol("weight")
+      .setRegParam(0.0).setStandardization(true)
+    val model1a0 = trainer1a.fit(dataset)
+    val model1a1 = trainer1a.fit(weightedDataset)
+    val model1b = trainer1b.fit(weightedDataset)
+    assert(model1a0.weights !~= model1a1.weights absTol 1E-3)
+    assert(model1a0.intercept !~= model1a1.intercept absTol 1E-3)
+    assert(model1a0.weights ~== model1b.weights absTol 1E-3)
+    assert(model1a0.intercept ~== model1b.intercept absTol 1E-3)
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/be52faa7/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala
index 07efde4..b6d41db 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizerSuite.scala
@@ -218,4 +218,31 @@ class MultivariateOnlineSummarizerSuite extends SparkFunSuite {
     s0.merge(s1)
     assert(s0.mean(0) ~== 1.0 absTol 1e-14)
   }
+
+  test("merging summarizer with weighted samples") {
+    val summarizer = (new MultivariateOnlineSummarizer)
+      .add(instance = Vectors.sparse(3, Seq((0, -0.8), (1, 1.7))), weight = 0.1)
+      .add(Vectors.dense(0.0, -1.2, -1.7), 0.2).merge(
+        (new MultivariateOnlineSummarizer)
+          .add(Vectors.sparse(3, Seq((0, -0.7), (1, 0.01), (2, 1.3))), 0.15)
+          .add(Vectors.dense(-0.5, 0.3, -1.5), 0.05))
+
+    assert(summarizer.count === 4)
+
+    // The following values are hand calculated using the formula:
+    // [[https://en.wikipedia.org/wiki/Weighted_arithmetic_mean#Reliability_weights]]
+    // which defines the reliability weight used for computing the unbiased estimation of variance
+    // for weighted instances.
+    assert(summarizer.mean ~== Vectors.dense(Array(-0.42, -0.107, -0.44))
+      absTol 1E-10, "mean mismatch")
+    assert(summarizer.variance ~== Vectors.dense(Array(0.17657142857, 1.645115714, 2.42057142857))
+      absTol 1E-8, "variance mismatch")
+    assert(summarizer.numNonzeros ~== Vectors.dense(Array(0.3, 0.5, 0.4))
+      absTol 1E-10, "numNonzeros mismatch")
+    assert(summarizer.max ~== Vectors.dense(Array(0.0, 1.7, 1.3)) absTol 1E-10, "max mismatch")
+    assert(summarizer.min ~== Vectors.dense(Array(-0.8, -1.2, -1.7)) absTol 1E-10, "min mismatch")
+    assert(summarizer.normL2 ~== Vectors.dense(0.387298335, 0.762571308141, 0.9715966241192)
+      absTol 1E-8, "normL2 mismatch")
+    assert(summarizer.normL1 ~== Vectors.dense(0.21, 0.4265, 0.61) absTol 1E-10, "normL1 mismatch")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/be52faa7/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 87b141c..46026c1 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -45,7 +45,15 @@ object MimaExcludes {
         excludePackage("org.apache.spark.sql.execution")
       ) ++
       MimaBuild.excludeSparkClass("streaming.flume.FlumeTestUtils") ++
-      MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils")
+      MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++ 
+      Seq(
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.ml.classification.LogisticCostFun.this"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.ml.classification.LogisticAggregator.add"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.ml.classification.LogisticAggregator.count")
+      )
     case v if v.startsWith("1.5") =>
       Seq(
         MimaBuild.excludeSparkPackage("network"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message