Return-Path: X-Original-To: apmail-spark-dev-archive@minotaur.apache.org Delivered-To: apmail-spark-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C5B1F1063D for ; Mon, 24 Feb 2014 08:42:42 +0000 (UTC) Received: (qmail 31861 invoked by uid 500); 24 Feb 2014 08:42:42 -0000 Delivered-To: apmail-spark-dev-archive@spark.apache.org Received: (qmail 31411 invoked by uid 500); 24 Feb 2014 08:42:40 -0000 Mailing-List: contact dev-help@spark.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.incubator.apache.org Delivered-To: mailing list dev@spark.incubator.apache.org Received: (qmail 31403 invoked by uid 99); 24 Feb 2014 08:42:39 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Feb 2014 08:42:39 +0000 X-ASF-Spam-Status: No, hits=-2000.5 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Mon, 24 Feb 2014 08:42:36 +0000 Received: (qmail 31275 invoked by uid 99); 24 Feb 2014 08:42:14 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Feb 2014 08:42:14 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 3419889E800; Mon, 24 Feb 2014 08:42:14 +0000 (UTC) From: mengxr To: dev@spark.incubator.apache.org Reply-To: dev@spark.incubator.apache.org References: In-Reply-To: Subject: [GitHub] incubator-spark pull request: Principal Component Analysis Content-Type: text/plain Message-Id: <20140224084214.3419889E800@tyr.zones.apache.org> Date: Mon, 24 Feb 2014 08:42:14 +0000 (UTC) X-Virus-Checked: Checked by ClamAV on apache.org Github user mengxr commented on a diff in the pull request: https://github.com/apache/incubator-spark/pull/564#discussion_r9983939 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD + +import org.apache.spark.mllib.util._ + + +/** + * Class used to obtain principal components + */ +class PCA { + private var k: Int = 1 + + /** + * Set the number of top-k principle components to return + */ + def setK(k: Int): PCA = { + this.k = k + this + } + + /** + * Compute PCA using the current set parameters + */ + def compute(matrix: DenseMatrix): DenseMatrix = { + computePCA(matrix, k) + } + + /** + * Principal Component Analysis. + * Computes the top k principal component coefficients for the m-by-n data matrix X. + * Rows of X correspond to observations and columns correspond to variables. + * The coefficient matrix is n-by-k. Each column of coeff contains coefficients + * for one principal component, and the columns are in descending + * order of component variance. + * This function centers the data and uses the + * singular value decomposition (SVD) algorithm. + * + * All input and output is expected in DenseMatrix format + * + * @param matrix dense matrix to perform pca on + * @param k Recover k principal components + * @return An nxk matrix of principal components + */ + def computePCA(matrix: DenseMatrix, k: Int): DenseMatrix = { + val m = matrix.m + val n = matrix.n + + if (m <= 0 || n <= 0) { + throw new IllegalArgumentException("Expecting a well-formed matrix") + } + + // compute column sums and normalize matrix + val rawData = matrix.rows.flatMap{ + x => Array.tabulate(x.data.size)(idx => MatrixEntry(x.i, idx, x.data(idx))) + } + val colSums = rawData.map(entry => (entry.j, entry.mval)).reduceByKey(_ + _) + val data = rawData.map(entry => (entry.j, (entry.i, entry.mval))).join(colSums).map{ + case (col, ((row, mval), colsum)) => + MatrixEntry(row, col, (mval - colsum / m.toDouble) / Math.sqrt(n-1)) } --- End diff -- It creates many small objects and breaks the data continuity of each row. The following code should be adequate for computing column sums: ~~~ def addi(a: Array[Double], b: Array[Double]): Array[Double] = ... val colsums = matrix.rows.fold(new Array[Double](matrix.n))(addi) ~~~ I ran a small test of size 1000000x100, where it is about 50x faster than flat-mapping rows to entries and then reducing by column indices. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. To do so, please top-post your response. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---