spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkbradley <...@git.apache.org>
Subject [GitHub] spark pull request #15413: [SPARK-17847][ML] Reduce shuffled data size of Ga...
Date Wed, 28 Dec 2016 22:58:20 GMT
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15413#discussion_r94083864
  
    --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
    @@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") (
       override def transformSchema(schema: StructType): StructType = {
         validateAndTransformSchema(schema)
       }
    +
    +  /**
    +   * Initialize weights and corresponding gaussian distributions at random.
    +   *
    +   * We start with uniform weights, a random mean from the data, and diagonal covariance
matrices
    +   * using component variances derived from the samples.
    +   *
    +   * @param instances The training instances.
    +   * @param numClusters The number of clusters.
    +   * @param numFeatures The number of features of training instance.
    +   * @return The initialized weights and corresponding gaussian distributions. Note the
    +   *         covariance matrix of multivariate gaussian distribution is symmetric and
    +   *         we only save the upper triangular part as a dense vector.
    +   */
    +  private def initRandom(
    +      instances: RDD[Vector],
    +      numClusters: Int,
    +      numFeatures: Int): (Array[Double], Array[(DenseVector, DenseVector)]) = {
    +    val samples = instances.takeSample(withReplacement = true, numClusters * numSamples,
$(seed))
    +    val weights: Array[Double] = Array.fill(numClusters)(1.0 / numClusters)
    +    val gaussians: Array[(DenseVector, DenseVector)] = Array.tabulate(numClusters) {
i =>
    +      val slice = samples.view(i * numSamples, (i + 1) * numSamples)
    +      val mean = {
    +        val v = new DenseVector(new Array[Double](numFeatures))
    +        var i = 0
    +        while (i < numSamples) {
    +          BLAS.axpy(1.0, slice(i), v)
    +          i += 1
    +        }
    +        BLAS.scal(1.0 / numSamples, v)
    +        v
    +      }
    +      /*
    +         Construct matrix where diagonal entries are element-wise
    +         variance of input vectors (computes biased variance).
    +         Since the covariance matrix of multivariate gaussian distribution is symmetric,
    +         only the upper triangular part of the matrix will be saved as a dense vector
    +         in order to reduce the shuffled data size.
    +       */
    +      val cov = {
    +        val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
    +        slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
    +        val diagVec = Vectors.fromBreeze(ss)
    +        BLAS.scal(1.0 / numSamples, diagVec)
    +        val covVec = new DenseVector(Array.fill[Double](
    +          numFeatures * (numFeatures + 1) / 2)(0.0))
    +        diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) =>
    +          covVec.values(i + i * (i + 1) / 2) = v
    +        }
    +        covVec
    +      }
    +      (mean, cov)
    +    }
    +    (weights, gaussians)
    +  }
     }
     
     @Since("2.0.0")
     object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
     
       @Since("2.0.0")
       override def load(path: String): GaussianMixture = super.load(path)
    +
    +  /**
    +   * Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately
when
    +   * numFeatures > 25 except for when numClusters is very small.
    +   *
    +   * @param numClusters  Number of clusters
    +   * @param numFeatures  Number of features
    +   */
    +  private[clustering] def shouldDistributeGaussians(
    +      numClusters: Int,
    +      numFeatures: Int): Boolean = {
    +    ((numClusters - 1.0) / numClusters) * numFeatures > 25.0
    +  }
    +
    +  /**
    +   * Convert an n * (n + 1) / 2 dimension array representing the upper triangular part
of a matrix
    +   * into an n * n array representing the full symmetric matrix.
    +   *
    +   * @param n The order of the n by n matrix.
    +   * @param triangularValues The upper triangular part of the matrix packed in an array
    +   *                         (column major).
    +   * @return An array which represents the symmetric matrix in column major.
    +   */
    +  private[clustering] def unpackUpperTriangularMatrix(
    +      n: Int,
    +      triangularValues: Array[Double]): Array[Double] = {
    +    val symmetricValues = new Array[Double](n * n)
    +    var r = 0
    +    var i = 0
    +    while(i < n) {
    +      var j = 0
    +      while (j <= i) {
    +        symmetricValues(i * n + j) = triangularValues(r)
    +        symmetricValues(j * n + i) = triangularValues(r)
    +        r += 1
    +        j += 1
    +      }
    +      i += 1
    +    }
    +    symmetricValues
    +  }
    +
    +  /**
    +   * Update the weight, mean and covariance of gaussian distribution.
    +   *
    +   * @param mean The mean of the gaussian distribution.
    +   * @param cov The covariance matrix of the gaussian distribution. Note we only
    +   *            save the upper triangular part as a dense vector.
    +   * @param weight The weight of the gaussian distribution.
    +   * @param sumWeights The sum of weights of all clusters.
    +   * @return The updated weight, mean and covariance.
    +   */
    +  private[clustering] def updateWeightsAndGaussians(
    +      mean: DenseVector,
    +      cov: DenseVector,
    +      weight: Double,
    +      sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
    +    BLAS.scal(1.0 / weight, mean)
    +    BLAS.spr(-weight, mean, cov)
    +    BLAS.scal(1.0 / weight, cov)
    +    val newWeight = weight / sumWeights
    +    val newGaussian = (mean, cov)
    +    (newWeight, newGaussian)
    +  }
    +}
    +
    +/**
    + * ExpectationAggregator computes the partial expectation results.
    + *
    + * @param numFeatures The number of features.
    + * @param bcWeights The broadcast weights for each Gaussian distribution in the mixture.
    + * @param bcGaussians The broadcast array of Multivariate Gaussian (Normal) Distribution
    + *                    in the mixture. Note only upper triangular part of the covariance
    + *                    matrix of each distribution is stored as dense vector in order
to
    + *                    reduce shuffled data size.
    + */
    +private class ExpectationAggregator(
    +    numFeatures: Int,
    +    bcWeights: Broadcast[Array[Double]],
    +    bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends Serializable {
    +
    +  private val k: Int = bcWeights.value.length
    +  private var totalCnt: Long = 0L
    +  private var newLogLikelihood: Double = 0.0
    +  private val newWeights: Array[Double] = new Array[Double](k)
    +  private val newMeans: Array[DenseVector] = Array.fill(k)(
    +    new DenseVector(Array.fill[Double](numFeatures)(0.0)))
    +  private val newCovs: Array[DenseVector] = Array.fill(k)(
    +    new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 2)(0.0)))
    +
    +  @transient private lazy val oldGaussians = {
    +    bcGaussians.value.map { case (mean, covVec) =>
    +      val cov = new DenseMatrix(numFeatures, numFeatures,
    +        GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values))
    +      new MultivariateGaussian(mean, cov)
    +    }
    +  }
    +
    +  def count: Long = totalCnt
    +
    +  def logLikelihood: Double = newLogLikelihood
    +
    +  def weights: Array[Double] = newWeights
    +
    +  def means: Array[DenseVector] = newMeans
    +
    +  def covs: Array[DenseVector] = newCovs
    +
    +  /**
    +   * Add a new training instance to this ExpectationAggregator, update the weights,
    +   * means and covariances for each distributions, and update the log likelihood.
    +   *
    +   * @param instance The instance of data point to be added.
    +   * @return This ExpectationAggregator object.
    +   */
    +  def add(instance: Vector): this.type = {
    +    val localWeights = bcWeights.value
    +    val localOldGaussians = oldGaussians
    +
    +    val prob = new Array[Double](k)
    +    var probSum = 0.0
    +    var i = 0
    +    while(i < k) {
    +      val p = EPSILON + localWeights(i) * localOldGaussians(i).pdf(instance)
    +      prob(i) = p
    +      probSum += p
    +      i += 1
    +    }
    +
    +    newLogLikelihood += math.log(probSum)
    +    val localNewWeights = newWeights
    +    val localNewMeans = newMeans
    +    val localNewCovs = newCovs
    +    i = 0
    +    while(i < k) {
    +      prob(i) /= probSum
    +      localNewWeights(i) += prob(i)
    +      BLAS.axpy(prob(i), instance, localNewMeans(i))
    +      BLAS.spr(prob(i), instance, localNewCovs(i))
    +      i += 1
    +    }
    +
    +    totalCnt += 1
    +    this
    +  }
    +
    +  /**
    +   * Merge another ExpectationAggregator, update the weights, means and covariances
    +   * for each distributions, and update the log likelihood.
    +   * (Note that it's in place merging; as a result, `this` object will be modified.)
    +   *
    +   * @param other The other ExpectationAggregator to be merged.
    +   * @return This ExpectationAggregator object.
    +   */
    +  def merge(other: ExpectationAggregator): this.type = {
    +    if (other.count != 0) {
    +      totalCnt += other.totalCnt
    +
    +      val localThisNewWeights = this.newWeights
    --- End diff --
    
    Why do you need to make these local copies?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message