spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkbradley <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin
Date Tue, 28 Apr 2015 22:46:07 GMT
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296395
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which
    + * processes a subset of the corpus by each call to next, and update the term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS,
2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but was set
to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   */
    +  def getBatchSize: Int = {
    +    if (this.batchSize == -1) {
    +      D / 100
    +    } else {
    +      this.batchSize
    +    }
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each iteration
    +   * default = 1% from total documents.
    +   */
    +  def setBatchSize(batchSize: Int): this.type = {
    +    this.batchSize = batchSize
    +    this
    +  }
    +
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer
= {
    +
    +    this.k = lda.getK
    +    this.D = docs.count().toInt
    +    this.vocabSize = docs.first()._2.size
    +    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration
    +    this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration
    +    this.randomSeed = randomSeed
    +
    +    this.docs = docs
    +
    +    // Initialize the variational distribution q(beta|lambda)
    +    this.lambda = getGammaMatrix(k, vocabSize)
    +    this.Elogbeta = dirichlet_expectation(lambda)
    +    this.expElogbeta = exp(Elogbeta)
    +    this.iteration = 0
    +    this
    +  }
    +
    +  /**
    +   * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online
LDA model,
    +   * and it will update the topic distribution adaptively for the terms appearing in
the subset.
    +   *
    +   * @return  Inferred LDA model
    +   */
    +  private[clustering] override def next(): OnlineLDAOptimizer = {
    +    iteration += 1
    +    val batchSize = getBatchSize
    +    val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache()
    +    if(batch.isEmpty()) return this
    +
    +    val k = this.k
    +    val vocabSize = this.vocabSize
    +    val expElogbeta = this.expElogbeta
    +    val alpha = this.alpha
    +
    +    val stats = batch.mapPartitions(docs =>{
    +      val stat = BDM.zeros[Double](k, vocabSize)
    +      docs.foreach(doc =>{
    +        val termCounts = doc._2
    +        val (ids, cts) = termCounts match {
    +          case v: DenseVector => (((0 until v.size).toList), v.values)
    +          case v: SparseVector => (v.indices.toList, v.values)
    +          case v => throw new IllegalArgumentException("Do not support vector type
" + v.getClass)
    +        }
    +
    +        // Initialize the variational distribution q(theta|gamma) for the mini-batch
    +        var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K
    +        var Elogthetad = digamma(gammad) - digamma(sum(gammad))     // 1 * K
    +        var expElogthetad = exp(Elogthetad)                         // 1 * K
    +        val expElogbetad = expElogbeta(::, ids).toDenseMatrix       // K * ids
    +
    +        var phinorm = expElogthetad * expElogbetad + 1e-100         // 1 * ids
    +        var meanchange = 1D
    +        val ctsVector = new BDV[Double](cts).t                      // 1 * ids
    +
    +        // Iterate between gamma and phi until convergence
    +        while (meanchange > 1e-5) {
    +          val lastgamma = gammad
    +          //        1*K                  1 * ids               ids * k
    +          gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha
    +          Elogthetad = digamma(gammad) - digamma(sum(gammad))
    +          expElogthetad = exp(Elogthetad)
    +          phinorm = expElogthetad * expElogbetad + 1e-100
    +          meanchange = sum(abs(gammad - lastgamma)) / k
    +        }
    +
    +        val m1 = expElogthetad.t.toDenseMatrix.t
    +        val m2 = (ctsVector / phinorm).t.toDenseMatrix
    +        val outerResult = kron(m1, m2) // K * ids
    +        for (i <- 0 until ids.size) {
    +          stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i))
    +        }
    +        stat
    +      })
    +      Iterator(stat)
    +    })
    +
    +    val batchResult = stats.reduce(_ += _)
    +    update(batchResult, iteration, batchSize)
    +    batch.unpersist()
    +    this
    +  }
    +
    +  private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel
= {
    +    new LocalLDAModel(Matrices.fromBreeze(lambda).transpose)
    +  }
    +
    +  private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={
    --- End diff --
    
    Please add a little doc, even though it's an internal method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message