spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From javadba <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-4259][MLlib]: Add Power Iteration Clust...
Date Wed, 28 Jan 2015 22:13:51 GMT
Github user javadba commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4254#discussion_r23728755
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala ---
    @@ -0,0 +1,433 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}
    +import org.apache.log4j.Logger
    +import org.apache.spark.SparkContext
    +import org.apache.spark.graphx._
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +import scala.language.existentials
    +
    +/**
    + * Implements the scalable graph clustering algorithm Power Iteration Clustering (see
    + * www.icml2010.org/papers/387.pdf).  From the abstract:
    + *
    + * The input data is first transformed to a normalized Affinity Matrix via Gaussian pairwise
    + * distance calculations. Power iteration is then used to find a dimensionality-reduced
    + * representation.  The resulting pseudo-eigenvector provides effective clustering -
as
    + * performed by Parallel KMeans.
    + */
    +object PIClustering {
    +
    +  private val logger = Logger.getLogger(getClass.getName())
    +
    +  type LabeledPoint = (VertexId, BDV[Double])
    +  type Points = Seq[LabeledPoint]
    +  type DGraph = Graph[Double, Double]
    +  type IndexedVector[Double] = (Long, BDV[Double])
    +
    +  // Terminate iteration when norm changes by less than this value
    +  private[mllib] val DefaultMinNormChange: Double = 1e-11
    +
    +  // Default σ for Gaussian Distance calculations
    +  private[mllib] val DefaultSigma = 1.0
    +
    +  // Default number of iterations for PIC loop
    +  private[mllib] val DefaultIterations: Int = 20
    +
    +  // Default minimum affinity between points - lower than this it is considered
    +  // zero and no edge will be created
    +  private[mllib] val DefaultMinAffinity = 1e-11
    +
    +  // Do not allow divide by zero: change to this value instead
    +  val DefaultDivideByZeroVal: Double = 1e-15
    +
    +  // Default number of runs by the KMeans.run() method
    +  val DefaultKMeansRuns = 10
    +
    +  /**
    +   *
    +   * Run a Power Iteration Clustering
    +   *
    +   * @param sc  Spark Context
    +   * @param points  Input Points in format of [(VertexId,(x,y)]
    +   *                where VertexId is a Long
    +   * @param nClusters  Number of clusters to create
    +   * @param nIterations Number of iterations of the PIC algorithm
    +   *                    that calculates primary PseudoEigenvector and Eigenvalue
    +   * @param sigma   Sigma for Gaussian distribution calculation according to
    +   *                [1/2 *sqrt(pi*sigma)] exp (- (x-y)**2 / 2sigma**2
    +   * @param minAffinity  Minimum Affinity between two Points in the input dataset: below
    +   *                     this threshold the affinity will be considered "close to" zero
and
    +   *                     no Edge will be created between those Points in the sparse matrix
    +   * @param nRuns  Number of runs for the KMeans clustering
    +   * @return Tuple of (Seq[(Cluster Id,Cluster Center)],
    +   *         Seq[(VertexId, ClusterID Membership)]
    +   */
    +  def run(sc: SparkContext,
    +          points: Points,
    +          nClusters: Int,
    +          nIterations: Int = DefaultIterations,
    +          sigma: Double = DefaultSigma,
    +          minAffinity: Double = DefaultMinAffinity,
    +          nRuns: Int = DefaultKMeansRuns)
    +  : (Seq[(Int, Vector)], Seq[((VertexId, Vector), Int)]) = {
    +    val vidsRdd = sc.parallelize(points.map(_._1).sorted)
    +    val nVertices = points.length
    +
    +    val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
    +    val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
    +    if (logger.isDebugEnabled) {
    +      logger.debug(s"Vt(0)=${
    +        printVector(new BDV(initialVt.map {
    +          _._2
    +        }.toArray))
    +      }")
    +    }
    +    val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)
    +    val G = createGraphFromEdges(sc, edgesRdd, points.size, Some(initialVt))
    +    if (logger.isDebugEnabled) {
    +      logger.debug(printMatrixFromEdges(G.edges))
    +    }
    +    val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G, nIterations)
    +    // TODO: avoid local collect and then sc.parallelize.
    +    val localVt = vt.collect.sortBy(_._1)
    +    val vectRdd = sc.parallelize(localVt.map(v => (v._1, Vectors.dense(v._2))))
    +    vectRdd.cache()
    +    val model = KMeans.train(vectRdd.map {
    +      _._2
    +    }, nClusters, nRuns)
    +    vectRdd.unpersist()
    +    if (logger.isDebugEnabled) {
    +      logger.debug(s"Eigenvalue = $lambda EigenVector: ${localVt.mkString(",")}")
    +    }
    +    val estimates = vectRdd.zip(model.predict(vectRdd.map(_._2)))
    +    if (logger.isDebugEnabled) {
    +      logger.debug(s"lambda=$lambda  eigen=${localVt.mkString(",")}")
    +    }
    +    val ccs = (0 until model.clusterCenters.length).zip(model.clusterCenters)
    +    if (logger.isDebugEnabled) {
    +      logger.debug(s"Kmeans model cluster centers: ${ccs.mkString(",")}")
    +    }
    +    val estCollected = estimates.collect.sortBy(_._1._1)
    +    if (logger.isDebugEnabled) {
    +      val clusters = estCollected.map(_._2)
    +      val counts = estCollected.groupBy(_._2).mapValues {
    +        _.length
    +      }
    +      logger.debug(s"Cluster counts: Counts: ${counts.mkString(",")}"
    +        + s"\nCluster Estimates: ${estCollected.mkString(",")}")
    +    }
    +    (ccs, estCollected)
    +  }
    +
    +  /**
    +   * Read Points from an input file in the following format:
    +   * Vertex1Id Coord11 Coord12 CoordX13 .. Coord1D
    +   * Vertex2Id Coord21 Coord22 CoordX23 .. Coord2D
    +   * ..
    +   * VertexNId CoordN1 CoordN2 CoordN23 .. CoordND
    +   *
    +   * Where N is the number of observations, each a D-dimension point
    +   *
    +   * E.g.
    +   *
    +   * 19	1.8035177495	0.7460582552	0.2361611395	-0.8645567427	-0.8613062
    +   * 10	0.5534111111	1.0456386879	1.7045663273	0.7281759816	1.0807487792
    +   * 911	1.200749626	1.8962364439	2.5117192131	-0.4034737281	-0.9069696484
    +   *
    +   * Which represents three 5-dimensional input Points with VertexIds 19,10, and 911
    +   * @param verticesFile Local filesystem path to the Points input file
    +   * @return Set of Vertices in format appropriate for consumption by the PIC algorithm
    +   */
    +  def readVerticesfromFile(verticesFile: String): Points = {
    --- End diff --
    
    OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message