spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: [SPARK-1991] Support custom storage levels for vertices and edges
Date Tue, 03 Jun 2014 21:54:29 GMT
Repository: spark
Updated Branches:
  refs/heads/master 894ecde04 -> b1feb6020


[SPARK-1991] Support custom storage levels for vertices and edges

This PR adds support for specifying custom storage levels for the vertices and edges of a
graph. This enables GraphX to handle graphs larger than memory size by specifying MEMORY_AND_DISK
and then repartitioning the graph to use many small partitions, each of which does fit in
memory. Spark will then automatically load partitions from disk as needed.

The user specifies the desired vertex and edge storage levels when building the graph by passing
them to the graph constructor. These are then stored in the `targetStorageLevel` attribute
of the VertexRDD and EdgeRDD respectively. Whenever GraphX needs to cache a VertexRDD or EdgeRDD
(because it plans to use it more than once, for example), it uses the specified target storage
level. Also, when the user calls `Graph#cache()`, the vertices and edges are persisted using
their target storage levels.

In order to facilitate propagating the target storage levels across VertexRDD and EdgeRDD
operations, we remove raw calls to the constructors and instead introduce the `withPartitionsRDD`
and `withTargetStorageLevel` methods.

I tested this change by running PageRank and triangle count on a severely memory-constrained
cluster (1 executor with 300 MB of memory, and a 1 GB graph). Before this PR, these algorithms
used to fail with OutOfMemoryErrors. With this PR, and using the DISK_ONLY storage level,
they succeed.

Author: Ankur Dave <ankurdave@gmail.com>

Closes #946 from ankurdave/SPARK-1991 and squashes the following commits:

ce17d95 [Ankur Dave] Move pickStorageLevel to StorageLevel.fromString
ccaf06f [Ankur Dave] Shadow members in withXYZ() methods rather than using underscores
c34abc0 [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0
c5ca068 [Ankur Dave] Revert "Exclude all of GraphX from binary compatibility checks"
34bcefb [Ankur Dave] Exclude all of GraphX from binary compatibility checks
6fdd137 [Ankur Dave] [SPARK-1991] Support custom storage levels for vertices and edges


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1feb602
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1feb602
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1feb602

Branch: refs/heads/master
Commit: b1feb60209174433262de2a26d39616ba00edcc8
Parents: 894ecde
Author: Ankur Dave <ankurdave@gmail.com>
Authored: Tue Jun 3 14:54:26 2014 -0700
Committer: Reynold Xin <rxin@apache.org>
Committed: Tue Jun 3 14:54:26 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/StorageLevel.scala | 21 +++++
 .../scala/org/apache/spark/graphx/EdgeRDD.scala | 67 +++++++++++++++-
 .../scala/org/apache/spark/graphx/Graph.scala   | 34 +++++---
 .../org/apache/spark/graphx/GraphLoader.scala   | 12 ++-
 .../org/apache/spark/graphx/VertexRDD.scala     | 49 +++++++++---
 .../apache/spark/graphx/impl/GraphImpl.scala    | 55 ++++++-------
 .../graphx/impl/ReplicatedVertexView.scala      |  6 +-
 .../org/apache/spark/graphx/lib/Analytics.scala | 82 ++++++++++----------
 8 files changed, 229 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 363de93..2d8ff11 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -149,6 +149,27 @@ object StorageLevel {
 
   /**
    * :: DeveloperApi ::
+   * Return the StorageLevel object with the specified name.
+   */
+  @DeveloperApi
+  def fromString(s: String): StorageLevel = s match {
+    case "NONE" => NONE
+    case "DISK_ONLY" => DISK_ONLY
+    case "DISK_ONLY_2" => DISK_ONLY_2
+    case "MEMORY_ONLY" => MEMORY_ONLY
+    case "MEMORY_ONLY_2" => MEMORY_ONLY_2
+    case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
+    case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
+    case "MEMORY_AND_DISK" => MEMORY_AND_DISK
+    case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
+    case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
+    case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
+    case "OFF_HEAP" => OFF_HEAP
+    case _ => throw new IllegalArgumentException("Invalid StorageLevel: " + s)
+  }
+
+  /**
+   * :: DeveloperApi ::
    * Create a new StorageLevel object without setting useOffHeap.
    */
   @DeveloperApi

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
index a8fc095..899a3cb 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
@@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.spark.graphx.impl.EdgePartition
+import org.apache.spark.graphx.impl.EdgePartitionBuilder
 
 /**
  * `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
@@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition
  * `impl.ReplicatedVertexView`.
  */
 class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
-    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
+    val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
+    val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
   extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD)))
{
 
   partitionsRDD.setName("EdgeRDD")
@@ -58,6 +60,10 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
 
   override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()
 
+  /**
+   * Persists the edge partitions at the specified storage level, ignoring any existing target
+   * storage level.
+   */
   override def persist(newLevel: StorageLevel): this.type = {
     partitionsRDD.persist(newLevel)
     this
@@ -68,9 +74,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
     this
   }
 
+  /** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY.
*/
+  override def cache(): this.type = {
+    partitionsRDD.persist(targetStorageLevel)
+    this
+  }
+
   private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
       f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2,
VD2] = {
-    new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
+    this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
       if (iter.hasNext) {
         val (pid, ep) = iter.next()
         Iterator(Tuple2(pid, f(pid, ep)))
@@ -118,11 +130,60 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
       (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
     val ed2Tag = classTag[ED2]
     val ed3Tag = classTag[ED3]
-    new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
+    this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true)
{
       (thisIter, otherIter) =>
         val (pid, thisEPart) = thisIter.next()
         val (_, otherEPart) = otherIter.next()
         Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
     })
   }
+
+  /** Replaces the vertex partitions while preserving all other properties of the VertexRDD.
*/
+  private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
+      partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
+    new EdgeRDD(partitionsRDD, this.targetStorageLevel)
+  }
+
+  /**
+   * Changes the target storage level while preserving all other properties of the
+   * EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level.
+   *
+   * This does not actually trigger a cache; to do this, call
+   * [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
+   */
+  private[graphx] def withTargetStorageLevel(
+      targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
+    new EdgeRDD(this.partitionsRDD, targetStorageLevel)
+  }
+
+}
+
+object EdgeRDD {
+  /**
+   * Creates an EdgeRDD from a set of edges.
+   *
+   * @tparam ED the edge attribute type
+   * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
+   */
+  def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
+    val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
+      val builder = new EdgePartitionBuilder[ED, VD]
+      iter.foreach { e =>
+        builder.add(e.srcId, e.dstId, e.attr)
+      }
+      Iterator((pid, builder.toEdgePartition))
+    }
+    EdgeRDD.fromEdgePartitions(edgePartitions)
+  }
+
+  /**
+   * Creates an EdgeRDD from already-constructed edge partitions.
+   *
+   * @tparam ED the edge attribute type
+   * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
+   */
+  def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
+      edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
+    new EdgeRDD(edgePartitions)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index dc5dac4..c4f9d65 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -80,7 +80,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
   @transient val triplets: RDD[EdgeTriplet[VD, ED]]
 
   /**
-   * Caches the vertices and edges associated with this graph at the specified storage level.
+   * Caches the vertices and edges associated with this graph at the specified storage level,
+   * ignoring any target storage levels previously set.
    *
    * @param newLevel the level at which to cache the graph.
    *
@@ -89,9 +90,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
   def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]
 
   /**
-   * Caches the vertices and edges associated with this graph. This is used to
-   * pin a graph in memory enabling multiple queries to reuse the same
-   * construction process.
+   * Caches the vertices and edges associated with this graph at the previously-specified
target
+   * storage levels, which default to `MEMORY_ONLY`. This is used to pin a graph in memory
enabling
+   * multiple queries to reuse the same construction process.
    */
   def cache(): Graph[VD, ED]
 
@@ -358,9 +359,12 @@ object Graph {
    * Construct a graph from a collection of edges encoded as vertex id pairs.
    *
    * @param rawEdges a collection of edges in (src, dst) form
+   * @param defaultValue the vertex attributes with which to create vertices referenced by
the edges
    * @param uniqueEdges if multiple identical edges are found they are combined and the edge
    * attribute is set to the sum.  Otherwise duplicate edges are treated as separate. To
enable
    * `uniqueEdges`, a [[PartitionStrategy]] must be provided.
+   * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
+   * @param vertexStorageLevel the desired storage level at which to cache the vertices if
necessary
    *
    * @return a graph with edge attributes containing either the count of duplicate edges
or 1
    * (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each
vertex.
@@ -368,10 +372,12 @@ object Graph {
   def fromEdgeTuples[VD: ClassTag](
       rawEdges: RDD[(VertexId, VertexId)],
       defaultValue: VD,
-      uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
+      uniqueEdges: Option[PartitionStrategy] = None,
+      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
   {
     val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
-    val graph = GraphImpl(edges, defaultValue)
+    val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
     uniqueEdges match {
       case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
       case None => graph
@@ -383,14 +389,18 @@ object Graph {
    *
    * @param edges the RDD containing the set of edges in the graph
    * @param defaultValue the default vertex attribute to use for each vertex
+   * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
+   * @param vertexStorageLevel the desired storage level at which to cache the vertices if
necessary
    *
    * @return a graph with edge attributes described by `edges` and vertices
    *         given by all vertices in `edges` with value `defaultValue`
    */
   def fromEdges[VD: ClassTag, ED: ClassTag](
       edges: RDD[Edge[ED]],
-      defaultValue: VD): Graph[VD, ED] = {
-    GraphImpl(edges, defaultValue)
+      defaultValue: VD,
+      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
+    GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
   }
 
   /**
@@ -405,12 +415,16 @@ object Graph {
    * @param edges the collection of edges in the graph
    * @param defaultVertexAttr the default vertex attribute to use for vertices that are
    *                          mentioned in edges but not in vertices
+   * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
+   * @param vertexStorageLevel the desired storage level at which to cache the vertices if
necessary
    */
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: RDD[(VertexId, VD)],
       edges: RDD[Edge[ED]],
-      defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
-    GraphImpl(vertices, edges, defaultVertexAttr)
+      defaultVertexAttr: VD = null.asInstanceOf[VD],
+      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
+    GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
index 389490c..2e814e3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.graphx
 
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}
 
@@ -48,12 +49,16 @@ object GraphLoader extends Logging {
    * @param canonicalOrientation whether to orient edges in the positive
    *        direction
    * @param minEdgePartitions the number of partitions for the edge RDD
+   * @param edgeStorageLevel the desired storage level for the edge partitions. To set the
vertex
+   *        storage level, call [[org.apache.spark.graphx.Graph#persistVertices]].
    */
   def edgeListFile(
       sc: SparkContext,
       path: String,
       canonicalOrientation: Boolean = false,
-      minEdgePartitions: Int = 1)
+      minEdgePartitions: Int = 1,
+      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
     : Graph[Int, Int] =
   {
     val startTime = System.currentTimeMillis
@@ -78,12 +83,13 @@ object GraphLoader extends Logging {
         }
       }
       Iterator((pid, builder.toEdgePartition))
-    }.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
+    }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
     edges.count()
 
     logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
 
-    GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
+    GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
+      vertexStorageLevel = vertexStorageLevel)
   } // end of edgeListFile
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index 8b910fb..f1b6df9 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -56,7 +56,8 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._
  * @tparam VD the vertex attribute associated with each vertex in the set.
  */
 class VertexRDD[@specialized VD: ClassTag](
-    val partitionsRDD: RDD[ShippableVertexPartition[VD]])
+    val partitionsRDD: RDD[ShippableVertexPartition[VD]],
+    val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
   extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD)))
{
 
   require(partitionsRDD.partitioner.isDefined)
@@ -66,7 +67,7 @@ class VertexRDD[@specialized VD: ClassTag](
    * VertexRDD will be based on a different index and can no longer be quickly joined with
this
    * RDD.
    */
-  def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
+  def reindex(): VertexRDD[VD] = this.withPartitionsRDD(partitionsRDD.map(_.reindex()))
 
   override val partitioner = partitionsRDD.partitioner
 
@@ -85,6 +86,10 @@ class VertexRDD[@specialized VD: ClassTag](
   }
   setName("VertexRDD")
 
+  /**
+   * Persists the vertex partitions at the specified storage level, ignoring any existing
target
+   * storage level.
+   */
   override def persist(newLevel: StorageLevel): this.type = {
     partitionsRDD.persist(newLevel)
     this
@@ -95,6 +100,12 @@ class VertexRDD[@specialized VD: ClassTag](
     this
   }
 
+  /** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY.
*/
+  override def cache(): this.type = {
+    partitionsRDD.persist(targetStorageLevel)
+    this
+  }
+
   /** The number of vertices in the RDD. */
   override def count(): Long = {
     partitionsRDD.map(_.size).reduce(_ + _)
@@ -114,7 +125,7 @@ class VertexRDD[@specialized VD: ClassTag](
       f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
     : VertexRDD[VD2] = {
     val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning =
true)
-    new VertexRDD(newPartitionsRDD)
+    this.withPartitionsRDD(newPartitionsRDD)
   }
 
 
@@ -165,7 +176,7 @@ class VertexRDD[@specialized VD: ClassTag](
       val otherPart = otherIter.next()
       Iterator(thisPart.diff(otherPart))
     }
-    new VertexRDD(newPartitionsRDD)
+    this.withPartitionsRDD(newPartitionsRDD)
   }
 
   /**
@@ -191,7 +202,7 @@ class VertexRDD[@specialized VD: ClassTag](
       val otherPart = otherIter.next()
       Iterator(thisPart.leftJoin(otherPart)(f))
     }
-    new VertexRDD(newPartitionsRDD)
+    this.withPartitionsRDD(newPartitionsRDD)
   }
 
   /**
@@ -220,7 +231,7 @@ class VertexRDD[@specialized VD: ClassTag](
       case other: VertexRDD[_] =>
         leftZipJoin(other)(f)
       case _ =>
-        new VertexRDD[VD3](
+        this.withPartitionsRDD[VD3](
           partitionsRDD.zipPartitions(
             other.copartitionWithVertices(this.partitioner.get), preservesPartitioning =
true) {
             (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
@@ -242,7 +253,7 @@ class VertexRDD[@specialized VD: ClassTag](
       val otherPart = otherIter.next()
       Iterator(thisPart.innerJoin(otherPart)(f))
     }
-    new VertexRDD(newPartitionsRDD)
+    this.withPartitionsRDD(newPartitionsRDD)
   }
 
   /**
@@ -264,7 +275,7 @@ class VertexRDD[@specialized VD: ClassTag](
       case other: VertexRDD[_] =>
         innerZipJoin(other)(f)
       case _ =>
-        new VertexRDD(
+        this.withPartitionsRDD(
           partitionsRDD.zipPartitions(
             other.copartitionWithVertices(this.partitioner.get), preservesPartitioning =
true) {
             (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
@@ -290,7 +301,7 @@ class VertexRDD[@specialized VD: ClassTag](
     val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
       thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
     }
-    new VertexRDD[VD2](parts)
+    this.withPartitionsRDD[VD2](parts)
   }
 
   /**
@@ -309,7 +320,25 @@ class VertexRDD[@specialized VD: ClassTag](
           if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
         partIter.map(_.withRoutingTable(routingTable))
     }
-    new VertexRDD(vertexPartitions)
+    this.withPartitionsRDD(vertexPartitions)
+  }
+
+  /** Replaces the vertex partitions while preserving all other properties of the VertexRDD.
*/
+  private[graphx] def withPartitionsRDD[VD2: ClassTag](
+      partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = {
+    new VertexRDD(partitionsRDD, this.targetStorageLevel)
+  }
+
+  /**
+   * Changes the target storage level while preserving all other properties of the
+   * VertexRDD. Operations on the returned VertexRDD will preserve this storage level.
+   *
+   * This does not actually trigger a cache; to do this, call
+   * [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD.
+   */
+  private[graphx] def withTargetStorageLevel(
+      targetStorageLevel: StorageLevel): VertexRDD[VD] = {
+    new VertexRDD(this.partitionsRDD, targetStorageLevel)
   }
 
   /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions.
*/

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
index 1649b24..59d9a88 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala
@@ -61,7 +61,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
     this
   }
 
-  override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY)
+  override def cache(): Graph[VD, ED] = {
+    vertices.cache()
+    replicatedVertexView.edges.cache()
+    this
+  }
 
   override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = {
     vertices.unpersist(blocking)
@@ -70,10 +74,10 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
   }
 
   override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = {
-    val numPartitions = replicatedVertexView.edges.partitions.size
+    val numPartitions = edges.partitions.size
     val edTag = classTag[ED]
     val vdTag = classTag[VD]
-    val newEdges = new EdgeRDD(replicatedVertexView.edges.map { e =>
+    val newEdges = edges.withPartitionsRDD(edges.map { e =>
       val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions)
 
       // Should we be using 3-tuple or an optimized class
@@ -256,24 +260,33 @@ object GraphImpl {
   /** Create a graph from edges, setting referenced vertices to `defaultVertexAttr`. */
   def apply[VD: ClassTag, ED: ClassTag](
       edges: RDD[Edge[ED]],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
-    fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr)
+      defaultVertexAttr: VD,
+      edgeStorageLevel: StorageLevel,
+      vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+    fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
   }
 
   /** Create a graph from EdgePartitions, setting referenced vertices to `defaultVertexAttr`.
*/
   def fromEdgePartitions[VD: ClassTag, ED: ClassTag](
       edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
-    fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr)
+      defaultVertexAttr: VD,
+      edgeStorageLevel: StorageLevel,
+      vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+    fromEdgeRDD(EdgeRDD.fromEdgePartitions(edgePartitions), defaultVertexAttr, edgeStorageLevel,
+      vertexStorageLevel)
   }
 
   /** Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`.
*/
   def apply[VD: ClassTag, ED: ClassTag](
       vertices: RDD[(VertexId, VD)],
       edges: RDD[Edge[ED]],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
-    val edgeRDD = createEdgeRDD(edges)(classTag[ED], classTag[VD]).cache()
+      defaultVertexAttr: VD,
+      edgeStorageLevel: StorageLevel,
+      vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+    val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
+      .withTargetStorageLevel(edgeStorageLevel).cache()
     val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
+      .withTargetStorageLevel(vertexStorageLevel).cache()
     GraphImpl(vertexRDD, edgeRDD)
   }
 
@@ -309,23 +322,13 @@ object GraphImpl {
    */
   private def fromEdgeRDD[VD: ClassTag, ED: ClassTag](
       edges: EdgeRDD[ED, VD],
-      defaultVertexAttr: VD): GraphImpl[VD, ED] = {
-    edges.cache()
-    val vertices = VertexRDD.fromEdges(edges, edges.partitions.size, defaultVertexAttr)
-    fromExistingRDDs(vertices, edges)
-  }
-
-  /** Create an EdgeRDD from a set of edges. */
-  private def createEdgeRDD[ED: ClassTag, VD: ClassTag](
-      edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
-    val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
-      val builder = new EdgePartitionBuilder[ED, VD]
-      iter.foreach { e =>
-        builder.add(e.srcId, e.dstId, e.attr)
-      }
-      Iterator((pid, builder.toEdgePartition))
-    }
-    new EdgeRDD(edgePartitions)
+      defaultVertexAttr: VD,
+      edgeStorageLevel: StorageLevel,
+      vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
+    val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache()
+    val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr)
+      .withTargetStorageLevel(vertexStorageLevel)
+    fromExistingRDDs(vertices, edgesCached)
   }
 
 } // end of object GraphImpl

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
index 3a0bba1..86b366e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala
@@ -69,7 +69,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
           .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format(
             includeSrc, includeDst, shipSrc, shipDst))
           .partitionBy(edges.partitioner.get)
-      val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+      val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts)
{
         (ePartIter, shippedVertsIter) => ePartIter.map {
           case (pid, edgePartition) =>
             (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
@@ -91,7 +91,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
       .setName("ReplicatedVertexView.withActiveSet - shippedActives (broadcast)")
       .partitionBy(edges.partitioner.get)
 
-    val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedActives) {
+    val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedActives)
{
       (ePartIter, shippedActivesIter) => ePartIter.map {
         case (pid, edgePartition) =>
           (pid, edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator)))
@@ -111,7 +111,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag](
         hasSrcId, hasDstId))
       .partitionBy(edges.partitioner.get)
 
-    val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
+    val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts)
{
       (ePartIter, shippedVertsIter) => ePartIter.map {
         case (pid, edgePartition) =>
           (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))

http://git-wip-us.apache.org/repos/asf/spark/blob/b1feb602/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
index 069e042..c1513a0 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.graphx.lib
 
+import scala.collection.mutable
 import org.apache.spark._
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.graphx._
 import org.apache.spark.graphx.PartitionStrategy._
 
@@ -28,18 +30,20 @@ object Analytics extends Logging {
 
   def main(args: Array[String]): Unit = {
     if (args.length < 2) {
-      System.err.println("Usage: Analytics <taskType> <file> [other options]")
+      System.err.println(
+        "Usage: Analytics <taskType> <file> --numEPart=<num_edge_partitions>
[other options]")
       System.exit(1)
     }
 
     val taskType = args(0)
     val fname = args(1)
-    val options =  args.drop(2).map { arg =>
+    val optionsList = args.drop(2).map { arg =>
       arg.dropWhile(_ == '-').split('=') match {
         case Array(opt, v) => (opt -> v)
         case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
       }
     }
+    val options = mutable.Map(optionsList: _*)
 
     def pickPartitioner(v: String): PartitionStrategy = {
       // TODO: Use reflection rather than listing all the partitioning strategies here.
@@ -57,20 +61,24 @@ object Analytics extends Logging {
       .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
       .set("spark.locality.wait", "100000")
 
+    val numEPart = options.remove("numEPart").map(_.toInt).getOrElse {
+      println("Set the number of edge partitions using --numEPart.")
+      sys.exit(1)
+    }
+    val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy")
+      .map(pickPartitioner(_))
+    val edgeStorageLevel = options.remove("edgeStorageLevel")
+      .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+    val vertexStorageLevel = options.remove("vertexStorageLevel")
+      .map(StorageLevel.fromString(_)).getOrElse(StorageLevel.MEMORY_ONLY)
+
     taskType match {
       case "pagerank" =>
-        var tol: Float = 0.001F
-        var outFname = ""
-        var numEPart = 4
-        var partitionStrategy: Option[PartitionStrategy] = None
-        var numIterOpt: Option[Int] = None
-
-        options.foreach{
-          case ("tol", v) => tol = v.toFloat
-          case ("output", v) => outFname = v
-          case ("numEPart", v) => numEPart = v.toInt
-          case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
-          case ("numIter", v) => numIterOpt = Some(v.toInt)
+        val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F)
+        val outFname = options.remove("output").getOrElse("")
+        val numIterOpt = options.remove("numIter").map(_.toInt)
+
+        options.foreach {
           case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
         }
 
@@ -81,7 +89,9 @@ object Analytics extends Logging {
         val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")"))
 
         val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
-          minEdgePartitions = numEPart).cache()
+          minEdgePartitions = numEPart,
+          edgeStorageLevel = edgeStorageLevel,
+          vertexStorageLevel = vertexStorageLevel).cache()
         val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
 
         println("GRAPHX: Number of vertices " + graph.vertices.count)
@@ -102,32 +112,19 @@ object Analytics extends Logging {
         sc.stop()
 
       case "cc" =>
-        var numIter = Int.MaxValue
-        var numVPart = 4
-        var numEPart = 4
-        var isDynamic = false
-        var partitionStrategy: Option[PartitionStrategy] = None
-
-        options.foreach{
-          case ("numIter", v) => numIter = v.toInt
-          case ("dynamic", v) => isDynamic = v.toBoolean
-          case ("numEPart", v) => numEPart = v.toInt
-          case ("numVPart", v) => numVPart = v.toInt
-          case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v))
+        options.foreach {
           case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
         }
 
-        if (!isDynamic && numIter == Int.MaxValue) {
-          println("Set number of iterations!")
-          sys.exit(1)
-        }
         println("======================================")
         println("|      Connected Components          |")
         println("======================================")
 
         val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")"))
         val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname,
-          minEdgePartitions = numEPart).cache()
+          minEdgePartitions = numEPart,
+          edgeStorageLevel = edgeStorageLevel,
+          vertexStorageLevel = vertexStorageLevel).cache()
         val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_))
 
         val cc = ConnectedComponents.run(graph)
@@ -135,24 +132,25 @@ object Analytics extends Logging {
         sc.stop()
 
       case "triangles" =>
-        var numEPart = 4
-        // TriangleCount requires the graph to be partitioned
-        var partitionStrategy: PartitionStrategy = RandomVertexCut
-
-        options.foreach{
-          case ("numEPart", v) => numEPart = v.toInt
-          case ("partStrategy", v) => partitionStrategy = pickPartitioner(v)
+        options.foreach {
           case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt)
         }
+
         println("======================================")
         println("|      Triangle Count                |")
         println("======================================")
+
         val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")"))
-        val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true,
-          minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache()
+        val graph = GraphLoader.edgeListFile(sc, fname,
+          canonicalOrientation = true,
+          minEdgePartitions = numEPart,
+          edgeStorageLevel = edgeStorageLevel,
+          vertexStorageLevel = vertexStorageLevel)
+        // TriangleCount requires the graph to be partitioned
+          .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache()
         val triangles = TriangleCount.run(graph)
         println("Triangles: " + triangles.vertices.map {
-          case (vid,data) => data.toLong
+          case (vid, data) => data.toLong
         }.reduce(_ + _) / 3)
         sc.stop()
 


Mime
View raw message