Return-Path: X-Original-To: apmail-spark-reviews-archive@minotaur.apache.org Delivered-To: apmail-spark-reviews-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5A0201045A for ; Tue, 28 Apr 2015 22:46:08 +0000 (UTC) Received: (qmail 94048 invoked by uid 500); 28 Apr 2015 22:46:08 -0000 Delivered-To: apmail-spark-reviews-archive@spark.apache.org Received: (qmail 94028 invoked by uid 500); 28 Apr 2015 22:46:08 -0000 Mailing-List: contact reviews-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list reviews@spark.apache.org Received: (qmail 94016 invoked by uid 99); 28 Apr 2015 22:46:08 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Apr 2015 22:46:08 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E5394E0913; Tue, 28 Apr 2015 22:46:07 +0000 (UTC) From: jkbradley To: reviews@spark.apache.org Reply-To: reviews@spark.apache.org References: In-Reply-To: Subject: [GitHub] spark pull request: [SPARK-5563][mllib] online lda initial checkin Content-Type: text/plain Message-Id: <20150428224607.E5394E0913@git1-us-west.apache.org> Date: Tue, 28 Apr 2015 22:46:07 +0000 (UTC) Github user jkbradley commented on a diff in the pull request: https://github.com/apache/spark/pull/4419#discussion_r29296395 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala --- @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{ new DistributedLDAModel(this, iterationTimes) } } + + +/** + * :: Experimental :: + * + * An online optimizer for LDA. The Optimizer implements the Online LDA algorithm, which + * processes a subset of the corpus by each call to next, and update the term-topic + * distribution adaptively. + * + * References: + * Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet Allocation." NIPS, 2010. + */ +@Experimental +class OnlineLDAOptimizer extends LDAOptimizer { + + // LDA common parameters + private var k: Int = 0 + private var D: Int = 0 + private var vocabSize: Int = 0 + private var alpha: Double = 0 + private var eta: Double = 0 + private var randomSeed: Long = 0 + + // Online LDA specific parameters + private var tau_0: Double = -1 + private var kappa: Double = -1 + private var batchSize: Int = -1 + + // internal data structure + private var docs: RDD[(Long, Vector)] = null + private var lambda: BDM[Double] = null + private var Elogbeta: BDM[Double]= null + private var expElogbeta: BDM[Double] = null + + // count of invocation to next, used to help deciding the weight for each iteration + private var iteration = 0 + + /** + * A (positive) learning parameter that downweights early iterations + */ + def getTau_0: Double = { + if (this.tau_0 == -1) { + 1024 + } else { + this.tau_0 + } + } + + /** + * A (positive) learning parameter that downweights early iterations + * Automatic setting of parameter: + * - default = 1024, which follows the recommendation from OnlineLDA paper. + */ + def setTau_0(tau_0: Double): this.type = { + require(tau_0 > 0 || tau_0 == -1.0, s"LDA tau_0 must be positive, but was set to $tau_0") + this.tau_0 = tau_0 + this + } + + /** + * Learning rate: exponential decay rate + */ + def getKappa: Double = { + if (this.kappa == -1) { + 0.5 + } else { + this.kappa + } + } + + /** + * Learning rate: exponential decay rate---should be between + * (0.5, 1.0] to guarantee asymptotic convergence. + * - default = 0.5, which follows the recommendation from OnlineLDA paper. + */ + def setKappa(kappa: Double): this.type = { + require(kappa >= 0 || kappa == -1.0, + s"Online LDA kappa must be nonnegative (or -1 for auto), but was set to $kappa") + this.kappa = kappa + this + } + + /** + * Mini-batch size, which controls how many documents are used in each iteration + */ + def getBatchSize: Int = { + if (this.batchSize == -1) { + D / 100 + } else { + this.batchSize + } + } + + /** + * Mini-batch size, which controls how many documents are used in each iteration + * default = 1% from total documents. + */ + def setBatchSize(batchSize: Int): this.type = { + this.batchSize = batchSize + this + } + + private[clustering] override def initialize(docs: RDD[(Long, Vector)], lda: LDA): LDAOptimizer = { + + this.k = lda.getK + this.D = docs.count().toInt + this.vocabSize = docs.first()._2.size + this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else lda.getDocConcentration + this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else lda.getTopicConcentration + this.randomSeed = randomSeed + + this.docs = docs + + // Initialize the variational distribution q(beta|lambda) + this.lambda = getGammaMatrix(k, vocabSize) + this.Elogbeta = dirichlet_expectation(lambda) + this.expElogbeta = exp(Elogbeta) + this.iteration = 0 + this + } + + /** + * Submit a a subset (like 1%, decide by the batchSize) of the corpus to the Online LDA model, + * and it will update the topic distribution adaptively for the terms appearing in the subset. + * + * @return Inferred LDA model + */ + private[clustering] override def next(): OnlineLDAOptimizer = { + iteration += 1 + val batchSize = getBatchSize + val batch = docs.sample(true, batchSize.toDouble / D, randomSeed).cache() + if(batch.isEmpty()) return this + + val k = this.k + val vocabSize = this.vocabSize + val expElogbeta = this.expElogbeta + val alpha = this.alpha + + val stats = batch.mapPartitions(docs =>{ + val stat = BDM.zeros[Double](k, vocabSize) + docs.foreach(doc =>{ + val termCounts = doc._2 + val (ids, cts) = termCounts match { + case v: DenseVector => (((0 until v.size).toList), v.values) + case v: SparseVector => (v.indices.toList, v.values) + case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) + } + + // Initialize the variational distribution q(theta|gamma) for the mini-batch + var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * K + var Elogthetad = digamma(gammad) - digamma(sum(gammad)) // 1 * K + var expElogthetad = exp(Elogthetad) // 1 * K + val expElogbetad = expElogbeta(::, ids).toDenseMatrix // K * ids + + var phinorm = expElogthetad * expElogbetad + 1e-100 // 1 * ids + var meanchange = 1D + val ctsVector = new BDV[Double](cts).t // 1 * ids + + // Iterate between gamma and phi until convergence + while (meanchange > 1e-5) { + val lastgamma = gammad + // 1*K 1 * ids ids * k + gammad = (expElogthetad :* ((ctsVector / phinorm) * (expElogbetad.t))) + alpha + Elogthetad = digamma(gammad) - digamma(sum(gammad)) + expElogthetad = exp(Elogthetad) + phinorm = expElogthetad * expElogbetad + 1e-100 + meanchange = sum(abs(gammad - lastgamma)) / k + } + + val m1 = expElogthetad.t.toDenseMatrix.t + val m2 = (ctsVector / phinorm).t.toDenseMatrix + val outerResult = kron(m1, m2) // K * ids + for (i <- 0 until ids.size) { + stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i)) + } + stat + }) + Iterator(stat) + }) + + val batchResult = stats.reduce(_ += _) + update(batchResult, iteration, batchSize) + batch.unpersist() + this + } + + private[clustering] override def getLDAModel(iterationTimes: Array[Double]): LDAModel = { + new LocalLDAModel(Matrices.fromBreeze(lambda).transpose) + } + + private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={ --- End diff -- Please add a little doc, even though it's an internal method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org For additional commands, e-mail: reviews-help@spark.apache.org