Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1AB09200CFE for ; Fri, 25 Aug 2017 04:22:37 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 191FE16AABE; Fri, 25 Aug 2017 02:22:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B8A1116AABD for ; Fri, 25 Aug 2017 04:22:35 +0200 (CEST) Received: (qmail 57735 invoked by uid 500); 25 Aug 2017 02:22:33 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 57726 invoked by uid 99); 25 Aug 2017 02:22:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 25 Aug 2017 02:22:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7C780E96B6; Fri, 25 Aug 2017 02:22:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: yliang@apache.org To: commits@spark.apache.org Message-Id: <26ab9d59ed234d288e7b70cccb073230@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-21108][ML] convert LinearSVC to aggregator framework Date: Fri, 25 Aug 2017 02:22:33 +0000 (UTC) archived-at: Fri, 25 Aug 2017 02:22:37 -0000 Repository: spark Updated Branches: refs/heads/master 05af2de0f -> f3676d639 [SPARK-21108][ML] convert LinearSVC to aggregator framework ## What changes were proposed in this pull request? convert LinearSVC to new aggregator framework ## How was this patch tested? existing unit test. Author: Yuhao Yang Closes #18315 from hhbyyh/svcAggregator. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3676d63 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3676d63 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3676d63 Branch: refs/heads/master Commit: f3676d63913e0706e071b71e1742b8d57b102fba Parents: 05af2de Author: Yuhao Yang Authored: Fri Aug 25 10:22:27 2017 +0800 Committer: Yanbo Liang Committed: Fri Aug 25 10:22:27 2017 +0800 ---------------------------------------------------------------------- .../spark/ml/classification/LinearSVC.scala | 204 ++----------------- .../ml/optim/aggregator/HingeAggregator.scala | 105 ++++++++++ .../ml/classification/LinearSVCSuite.scala | 7 +- .../optim/aggregator/HingeAggregatorSuite.scala | 163 +++++++++++++++ .../aggregator/LogisticAggregatorSuite.scala | 2 - 5 files changed, 286 insertions(+), 195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 8d556de..3b0666c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -25,11 +25,11 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkException import org.apache.spark.annotation.{Experimental, Since} -import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging import org.apache.spark.ml.feature.Instance import org.apache.spark.ml.linalg._ -import org.apache.spark.ml.linalg.BLAS._ +import org.apache.spark.ml.optim.aggregator.HingeAggregator +import org.apache.spark.ml.optim.loss.{L2Regularization, RDDLossFunction} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -214,10 +214,20 @@ class LinearSVC @Since("2.2.0") ( } val featuresStd = summarizer.variance.toArray.map(math.sqrt) + val getFeaturesStd = (j: Int) => featuresStd(j) val regParamL2 = $(regParam) val bcFeaturesStd = instances.context.broadcast(featuresStd) - val costFun = new LinearSVCCostFun(instances, $(fitIntercept), - $(standardization), bcFeaturesStd, regParamL2, $(aggregationDepth)) + val regularization = if (regParamL2 != 0.0) { + val shouldApply = (idx: Int) => idx >= 0 && idx < numFeatures + Some(new L2Regularization(regParamL2, shouldApply, + if ($(standardization)) None else Some(getFeaturesStd))) + } else { + None + } + + val getAggregatorFunc = new HingeAggregator(bcFeaturesStd, $(fitIntercept))(_) + val costFun = new RDDLossFunction(instances, getAggregatorFunc, regularization, + $(aggregationDepth)) def regParamL1Fun = (index: Int) => 0D val optimizer = new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol)) @@ -372,189 +382,3 @@ object LinearSVCModel extends MLReadable[LinearSVCModel] { } } } - -/** - * LinearSVCCostFun implements Breeze's DiffFunction[T] for hinge loss function - */ -private class LinearSVCCostFun( - instances: RDD[Instance], - fitIntercept: Boolean, - standardization: Boolean, - bcFeaturesStd: Broadcast[Array[Double]], - regParamL2: Double, - aggregationDepth: Int) extends DiffFunction[BDV[Double]] { - - override def calculate(coefficients: BDV[Double]): (Double, BDV[Double]) = { - val coeffs = Vectors.fromBreeze(coefficients) - val bcCoeffs = instances.context.broadcast(coeffs) - val featuresStd = bcFeaturesStd.value - val numFeatures = featuresStd.length - - val svmAggregator = { - val seqOp = (c: LinearSVCAggregator, instance: Instance) => c.add(instance) - val combOp = (c1: LinearSVCAggregator, c2: LinearSVCAggregator) => c1.merge(c2) - - instances.treeAggregate( - new LinearSVCAggregator(bcCoeffs, bcFeaturesStd, fitIntercept) - )(seqOp, combOp, aggregationDepth) - } - - val totalGradientArray = svmAggregator.gradient.toArray - // regVal is the sum of coefficients squares excluding intercept for L2 regularization. - val regVal = if (regParamL2 == 0.0) { - 0.0 - } else { - var sum = 0.0 - coeffs.foreachActive { case (index, value) => - // We do not apply regularization to the intercepts - if (index != numFeatures) { - // The following code will compute the loss of the regularization; also - // the gradient of the regularization, and add back to totalGradientArray. - sum += { - if (standardization) { - totalGradientArray(index) += regParamL2 * value - value * value - } else { - if (featuresStd(index) != 0.0) { - // If `standardization` is false, we still standardize the data - // to improve the rate of convergence; as a result, we have to - // perform this reverse standardization by penalizing each component - // differently to get effectively the same objective function when - // the training dataset is not standardized. - val temp = value / (featuresStd(index) * featuresStd(index)) - totalGradientArray(index) += regParamL2 * temp - value * temp - } else { - 0.0 - } - } - } - } - } - 0.5 * regParamL2 * sum - } - bcCoeffs.destroy(blocking = false) - - (svmAggregator.loss + regVal, new BDV(totalGradientArray)) - } -} - -/** - * LinearSVCAggregator computes the gradient and loss for hinge loss function, as used - * in binary classification for instances in sparse or dense vector in an online fashion. - * - * Two LinearSVCAggregator can be merged together to have a summary of loss and gradient of - * the corresponding joint dataset. - * - * This class standardizes feature values during computation using bcFeaturesStd. - * - * @param bcCoefficients The coefficients corresponding to the features. - * @param fitIntercept Whether to fit an intercept term. - * @param bcFeaturesStd The standard deviation values of the features. - */ -private class LinearSVCAggregator( - bcCoefficients: Broadcast[Vector], - bcFeaturesStd: Broadcast[Array[Double]], - fitIntercept: Boolean) extends Serializable { - - private val numFeatures: Int = bcFeaturesStd.value.length - private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures - private var weightSum: Double = 0.0 - private var lossSum: Double = 0.0 - @transient private lazy val coefficientsArray = bcCoefficients.value match { - case DenseVector(values) => values - case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + - s" but got type ${bcCoefficients.value.getClass}.") - } - private lazy val gradientSumArray = new Array[Double](numFeaturesPlusIntercept) - - /** - * Add a new training instance to this LinearSVCAggregator, and update the loss and gradient - * of the objective function. - * - * @param instance The instance of data point to be added. - * @return This LinearSVCAggregator object. - */ - def add(instance: Instance): this.type = { - instance match { case Instance(label, weight, features) => - - if (weight == 0.0) return this - val localFeaturesStd = bcFeaturesStd.value - val localCoefficients = coefficientsArray - val localGradientSumArray = gradientSumArray - - val dotProduct = { - var sum = 0.0 - features.foreachActive { (index, value) => - if (localFeaturesStd(index) != 0.0 && value != 0.0) { - sum += localCoefficients(index) * value / localFeaturesStd(index) - } - } - if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) - sum - } - // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) - // Therefore the gradient is -(2y - 1)*x - val labelScaled = 2 * label - 1.0 - val loss = if (1.0 > labelScaled * dotProduct) { - weight * (1.0 - labelScaled * dotProduct) - } else { - 0.0 - } - - if (1.0 > labelScaled * dotProduct) { - val gradientScale = -labelScaled * weight - features.foreachActive { (index, value) => - if (localFeaturesStd(index) != 0.0 && value != 0.0) { - localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) - } - } - if (fitIntercept) { - localGradientSumArray(localGradientSumArray.length - 1) += gradientScale - } - } - - lossSum += loss - weightSum += weight - this - } - } - - /** - * Merge another LinearSVCAggregator, and update the loss and gradient - * of the objective function. - * (Note that it's in place merging; as a result, `this` object will be modified.) - * - * @param other The other LinearSVCAggregator to be merged. - * @return This LinearSVCAggregator object. - */ - def merge(other: LinearSVCAggregator): this.type = { - - if (other.weightSum != 0.0) { - weightSum += other.weightSum - lossSum += other.lossSum - - var i = 0 - val localThisGradientSumArray = this.gradientSumArray - val localOtherGradientSumArray = other.gradientSumArray - val len = localThisGradientSumArray.length - while (i < len) { - localThisGradientSumArray(i) += localOtherGradientSumArray(i) - i += 1 - } - } - this - } - - def loss: Double = if (weightSum != 0) lossSum / weightSum else 0.0 - - def gradient: Vector = { - if (weightSum != 0) { - val result = Vectors.dense(gradientSumArray.clone()) - scal(1.0 / weightSum, result) - result - } else { - Vectors.dense(new Array[Double](numFeaturesPlusIntercept)) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala new file mode 100644 index 0000000..0300500 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/optim/aggregator/HingeAggregator.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg._ + +/** + * HingeAggregator computes the gradient and loss for Hinge loss function as used in + * binary classification for instances in sparse or dense vector in an online fashion. + * + * Two HingeAggregators can be merged together to have a summary of loss and gradient of + * the corresponding joint dataset. + * + * This class standardizes feature values during computation using bcFeaturesStd. + * + * @param bcCoefficients The coefficients corresponding to the features. + * @param fitIntercept Whether to fit an intercept term. + * @param bcFeaturesStd The standard deviation values of the features. + */ +private[ml] class HingeAggregator( + bcFeaturesStd: Broadcast[Array[Double]], + fitIntercept: Boolean)(bcCoefficients: Broadcast[Vector]) + extends DifferentiableLossAggregator[Instance, HingeAggregator] { + + private val numFeatures: Int = bcFeaturesStd.value.length + private val numFeaturesPlusIntercept: Int = if (fitIntercept) numFeatures + 1 else numFeatures + @transient private lazy val coefficientsArray = bcCoefficients.value match { + case DenseVector(values) => values + case _ => throw new IllegalArgumentException(s"coefficients only supports dense vector" + + s" but got type ${bcCoefficients.value.getClass}.") + } + protected override val dim: Int = numFeaturesPlusIntercept + + /** + * Add a new training instance to this HingeAggregator, and update the loss and gradient + * of the objective function. + * + * @param instance The instance of data point to be added. + * @return This HingeAggregator object. + */ + def add(instance: Instance): this.type = { + instance match { case Instance(label, weight, features) => + require(numFeatures == features.size, s"Dimensions mismatch when adding new instance." + + s" Expecting $numFeatures but got ${features.size}.") + require(weight >= 0.0, s"instance weight, $weight has to be >= 0.0") + + if (weight == 0.0) return this + val localFeaturesStd = bcFeaturesStd.value + val localCoefficients = coefficientsArray + val localGradientSumArray = gradientSumArray + + val dotProduct = { + var sum = 0.0 + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + sum += localCoefficients(index) * value / localFeaturesStd(index) + } + } + if (fitIntercept) sum += localCoefficients(numFeaturesPlusIntercept - 1) + sum + } + // Our loss function with {0, 1} labels is max(0, 1 - (2y - 1) (f_w(x))) + // Therefore the gradient is -(2y - 1)*x + val labelScaled = 2 * label - 1.0 + val loss = if (1.0 > labelScaled * dotProduct) { + (1.0 - labelScaled * dotProduct) * weight + } else { + 0.0 + } + + if (1.0 > labelScaled * dotProduct) { + val gradientScale = -labelScaled * weight + features.foreachActive { (index, value) => + if (localFeaturesStd(index) != 0.0 && value != 0.0) { + localGradientSumArray(index) += value * gradientScale / localFeaturesStd(index) + } + } + if (fitIntercept) { + localGradientSumArray(localGradientSumArray.length - 1) += gradientScale + } + } + + lossSum += loss + weightSum += weight + this + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala index f2b00d0..41a5d22 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/classification/LinearSVCSuite.scala @@ -25,7 +25,8 @@ import org.apache.spark.SparkFunSuite import org.apache.spark.ml.classification.LinearSVCSuite._ import org.apache.spark.ml.feature.{Instance, LabeledPoint} import org.apache.spark.ml.linalg.{DenseVector, SparseVector, Vector, Vectors} -import org.apache.spark.ml.param.{ParamMap, ParamsSuite} +import org.apache.spark.ml.optim.aggregator.HingeAggregator +import org.apache.spark.ml.param.ParamsSuite import org.apache.spark.ml.util.{DefaultReadWriteTest, MLTestingUtils} import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -170,10 +171,10 @@ class LinearSVCSuite extends SparkFunSuite with MLlibTestSparkContext with Defau assert(model2.intercept !== 0.0) } - test("sparse coefficients in SVCAggregator") { + test("sparse coefficients in HingeAggregator") { val bcCoefficients = spark.sparkContext.broadcast(Vectors.sparse(2, Array(0), Array(1.0))) val bcFeaturesStd = spark.sparkContext.broadcast(Array(1.0)) - val agg = new LinearSVCAggregator(bcCoefficients, bcFeaturesStd, true) + val agg = new HingeAggregator(bcFeaturesStd, true)(bcCoefficients) val thrown = withClue("LinearSVCAggregator cannot handle sparse coefficients") { intercept[IllegalArgumentException] { agg.add(Instance(1.0, 1.0, Vectors.dense(1.0))) http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala new file mode 100644 index 0000000..61b48ff --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/HingeAggregatorSuite.scala @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.ml.optim.aggregator + +import org.apache.spark.SparkFunSuite +import org.apache.spark.ml.feature.Instance +import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors} +import org.apache.spark.ml.util.TestingUtils._ +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class HingeAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { + + import DifferentiableLossAggregatorSuite.getClassificationSummarizers + + @transient var instances: Array[Instance] = _ + @transient var instancesConstantFeature: Array[Instance] = _ + @transient var instancesConstantFeatureFiltered: Array[Instance] = _ + + override def beforeAll(): Unit = { + super.beforeAll() + instances = Array( + Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), + Instance(1.0, 0.5, Vectors.dense(1.5, 1.0)), + Instance(0.0, 0.3, Vectors.dense(4.0, 0.5)) + ) + instancesConstantFeature = Array( + Instance(0.0, 0.1, Vectors.dense(1.0, 2.0)), + Instance(1.0, 0.5, Vectors.dense(1.0, 1.0)), + Instance(1.0, 0.3, Vectors.dense(1.0, 0.5)) + ) + instancesConstantFeatureFiltered = Array( + Instance(0.0, 0.1, Vectors.dense(2.0)), + Instance(1.0, 0.5, Vectors.dense(1.0)), + Instance(2.0, 0.3, Vectors.dense(0.5)) + ) + } + + /** Get summary statistics for some data and create a new HingeAggregator. */ + private def getNewAggregator( + instances: Array[Instance], + coefficients: Vector, + fitIntercept: Boolean): HingeAggregator = { + val (featuresSummarizer, ySummarizer) = + DifferentiableLossAggregatorSuite.getClassificationSummarizers(instances) + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val bcFeaturesStd = spark.sparkContext.broadcast(featuresStd) + val bcCoefficients = spark.sparkContext.broadcast(coefficients) + new HingeAggregator(bcFeaturesStd, fitIntercept)(bcCoefficients) + } + + test("aggregator add method input size") { + val coefArray = Array(1.0, 2.0) + val interceptArray = Array(2.0) + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + fitIntercept = true) + withClue("HingeAggregator features dimension must match coefficients dimension") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, 1.0, Vectors.dense(2.0))) + } + } + } + + test("negative weight") { + val coefArray = Array(1.0, 2.0) + val interceptArray = Array(2.0) + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ interceptArray), + fitIntercept = true) + withClue("HingeAggregator does not support negative instance weights") { + intercept[IllegalArgumentException] { + agg.add(Instance(1.0, -1.0, Vectors.dense(2.0, 1.0))) + } + } + } + + test("check sizes") { + val rng = new scala.util.Random + val numFeatures = instances.head.features.size + val coefWithIntercept = Vectors.dense(Array.fill(numFeatures + 1)(rng.nextDouble)) + val coefWithoutIntercept = Vectors.dense(Array.fill(numFeatures)(rng.nextDouble)) + val aggIntercept = getNewAggregator(instances, coefWithIntercept, fitIntercept = true) + val aggNoIntercept = getNewAggregator(instances, coefWithoutIntercept, + fitIntercept = false) + instances.foreach(aggIntercept.add) + instances.foreach(aggNoIntercept.add) + + assert(aggIntercept.gradient.size === numFeatures + 1) + assert(aggNoIntercept.gradient.size === numFeatures) + } + + test("check correctness") { + val coefArray = Array(1.0, 2.0) + val intercept = 1.0 + val numFeatures = instances.head.features.size + val (featuresSummarizer, _) = getClassificationSummarizers(instances) + val featuresStd = featuresSummarizer.variance.toArray.map(math.sqrt) + val weightSum = instances.map(_.weight).sum + + val agg = getNewAggregator(instances, Vectors.dense(coefArray ++ Array(intercept)), + fitIntercept = true) + instances.foreach(agg.add) + + // compute the loss + val stdCoef = coefArray.indices.map(i => coefArray(i) / featuresStd(i)).toArray + val lossSum = instances.map { case Instance(l, w, f) => + val margin = BLAS.dot(Vectors.dense(stdCoef), f) + intercept + val labelScaled = 2 * l - 1.0 + if (1.0 > labelScaled * margin) { + (1.0 - labelScaled * margin) * w + } else { + 0.0 + } + }.sum + val loss = lossSum / weightSum + + // compute the gradients + val gradientCoef = new Array[Double](numFeatures) + var gradientIntercept = 0.0 + instances.foreach { case Instance(l, w, f) => + val margin = BLAS.dot(f, Vectors.dense(coefArray)) + intercept + if (1.0 > (2 * l - 1.0) * margin) { + gradientCoef.indices.foreach { i => + gradientCoef(i) += f(i) * -(2 * l - 1.0) * w / featuresStd(i) + } + gradientIntercept += -(2 * l - 1.0) * w + } + } + val gradient = Vectors.dense((gradientCoef ++ Array(gradientIntercept)).map(_ / weightSum)) + + assert(loss ~== agg.loss relTol 0.01) + assert(gradient ~== agg.gradient relTol 0.01) + } + + test("check with zero standard deviation") { + val binaryCoefArray = Array(1.0, 2.0) + val intercept = 1.0 + val aggConstantFeatureBinary = getNewAggregator(instancesConstantFeature, + Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true) + instancesConstantFeature.foreach(aggConstantFeatureBinary.add) + + val aggConstantFeatureBinaryFiltered = getNewAggregator(instancesConstantFeatureFiltered, + Vectors.dense(binaryCoefArray ++ Array(intercept)), fitIntercept = true) + instancesConstantFeatureFiltered.foreach(aggConstantFeatureBinaryFiltered.add) + + // constant features should not affect gradient + assert(aggConstantFeatureBinary.gradient(0) === 0.0) + assert(aggConstantFeatureBinary.gradient(1) == aggConstantFeatureBinaryFiltered.gradient(0)) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/f3676d63/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala index 16ef4af..4c7913d 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/LogisticAggregatorSuite.scala @@ -217,8 +217,6 @@ class LogisticAggregatorSuite extends SparkFunSuite with MLlibTestSparkContext { }.sum val loss = lossSum / weightSum - - // compute the gradients val gradientCoef = new Array[Double](numFeatures) var gradientIntercept = 0.0 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org