 Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/OnlineSummarizer.scala 
@@ 0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat
+
+import breeze.linalg.{DenseVector => BDV}
+
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * OnlineSummarizer implements [[MultivariateStatisticalSummary]] to compute the mean,
variance,
+ * minimum, maximum, counts, and nonzero counts for samples in sparse or dense vector
format in
+ * a streaming fashion.
+ *
+ * Two OnlineSummarizers can be merged together to have a statistical summary of a jointed
dataset.
+ *
+ * A numerically stable algorithm is implemented to compute sample mean and variance:
+ * Reference: [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variancewiki]]
+ * Zero elements (including explicit zero values) are skipped when calling add(),
+ * to have time complexity O(nnz) instead of O(n) for each column.
+ */
+@DeveloperApi
+class OnlineSummarizer extends MultivariateStatisticalSummary with Serializable {
+
+ private var n = 0
+ private var currMean: BDV[Double] = _
+ private var currM2n: BDV[Double] = _
+ private var totalCnt: Long = 0
+ private var nnz: BDV[Double] = _
+ private var currMax: BDV[Double] = _
+ private var currMin: BDV[Double] = _
+
+ /**
+ * Add a new sample to this summarizer, and update the statistical summary.
+ *
+ * @param sample The sample in dense/sparse vector format to be added into this summarizer.
+ * @return This OnlineSummarizer object.
+ */
+ def add(sample: Vector): OnlineSummarizer = {
+ if (n == 0) {
+ require(sample.toBreeze.length > 0, s"Vector should have dimension larger than
zero.")
+ n = sample.toBreeze.length
+
+ currMean = BDV.zeros[Double](n)
+ currM2n = BDV.zeros[Double](n)
+ nnz = BDV.zeros[Double](n)
+ currMax = BDV.fill(n)(Double.MinValue)
+ currMin = BDV.fill(n)(Double.MaxValue)
+ }
+
+ require(n == sample.toBreeze.length, s"Dimensions mismatch when adding new sample."
+
+ s" Expecting $n but got ${sample.toBreeze.length}.")
+
+ sample.toBreeze.activeIterator.foreach {
+ case (_, 0.0) => // Skip explicit zero elements.
+ case (i, value) =>
+ if (currMax(i) < value) {
+ currMax(i) = value
+ }
+ if (currMin(i) > value) {
+ currMin(i) = value
+ }
+
+ val tmpPrevMean = currMean(i)
+ currMean(i) = (currMean(i) * nnz(i) + value) / (nnz(i) + 1.0)
+ currM2n(i) += (value  currMean(i)) * (value  tmpPrevMean)
+
+ nnz(i) += 1.0
+ }
+
+ totalCnt += 1
+ this
+ }
+
+ /**
+ * Merge another OnlineSummarizer, and update the statistical summary. (Note that it's
+ * in place merging; as a result, this OnlineSummarizer object will be modified.)
+ *
+ * @param other The other OnlineSummarizer to be merged.
+ * @return This OnlineSummarizer object.
+ */
+ def add(other: OnlineSummarizer): OnlineSummarizer = {
+ if (totalCnt == 0) {
+ other
+ } else if (other.totalCnt == 0) {
+ this
+ } else {
+ require(n == other.n, s"Dimensions mismatch when merging with another summarizer.
" +
+ s"Expecting $n but got ${other.n}.")
+
+ totalCnt += other.totalCnt
+ val deltaMean: BDV[Double] = currMean  other.currMean
+
+ var i = 0
+ while (i < n) {
+ // merge mean together
+ if (other.currMean(i) != 0.0) {
+ currMean(i) = (currMean(i) * nnz(i) + other.currMean(i) * other.nnz(i)) /
+ (nnz(i) + other.nnz(i))
+ }
+ // merge m2n together
+ if (nnz(i) + other.nnz(i) != 0.0) {
+ currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i)
/
+ (nnz(i) + other.nnz(i))
+ }
+ if (currMax(i) < other.currMax(i)) {
+ currMax(i) = other.currMax(i)
+ }
+ if (currMin(i) > other.currMin(i)) {
+ currMin(i) = other.currMin(i)
+ }
+ i += 1
+ }
+
+ nnz += other.nnz
+ this
+ }
+ }
+
+ /**
+ * Return the mean of the samples.
+ *
+ * @return The vector of the mean.
+ */
+ override def mean: Vector = {
 End diff 
We don't need doc for overridden methods.

