spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject git commit: SPARK-1786: Edge Partition Serialization
Date Mon, 12 May 2014 02:20:46 GMT
Repository: spark
Updated Branches:
  refs/heads/master f938a155b -> a6b02fb74


SPARK-1786: Edge Partition Serialization

This appears to address the issue with edge partition serialization.  The solution appears
to be just registering the `PrimitiveKeyOpenHashMap`.  However I noticed that we appear to
have forked that code in GraphX but retained the same name (which is confusing).  I also renamed
our local copy to `GraphXPrimitiveKeyOpenHashMap`.  We should consider dropping that and using
the one in Spark if possible.

Author: Ankur Dave <ankurdave@gmail.com>
Author: Joseph E. Gonzalez <joseph.e.gonzalez@gmail.com>

Closes #724 from jegonzal/edge_partition_serialization and squashes the following commits:

b0a525a [Ankur Dave] Disable reference tracking to fix serialization test
bb7f548 [Ankur Dave] Add failing test for EdgePartition Kryo serialization
67dac22 [Joseph E. Gonzalez] Making EdgePartition serializable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6b02fb7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6b02fb7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6b02fb7

Branch: refs/heads/master
Commit: a6b02fb7486356493474c7f42bb714c9cce215ca
Parents: f938a15
Author: Ankur Dave <ankurdave@gmail.com>
Authored: Sun May 11 19:20:42 2014 -0700
Committer: Matei Zaharia <matei@databricks.com>
Committed: Sun May 11 19:20:42 2014 -0700

----------------------------------------------------------------------
 .../spark/graphx/GraphKryoRegistrator.scala     |   9 +-
 .../spark/graphx/impl/EdgePartition.scala       |  14 +-
 .../graphx/impl/EdgePartitionBuilder.scala      |   4 +-
 .../spark/graphx/impl/EdgeTripletIterator.scala |   2 +-
 .../graphx/impl/RoutingTablePartition.scala     |   4 +-
 .../graphx/impl/ShippableVertexPartition.scala  |   2 +-
 .../spark/graphx/impl/VertexPartition.scala     |   2 +-
 .../spark/graphx/impl/VertexPartitionBase.scala |   6 +-
 .../graphx/impl/VertexPartitionBaseOps.scala    |   4 +-
 .../GraphXPrimitiveKeyOpenHashMap.scala         | 153 +++++++++++++++++++
 .../collection/PrimitiveKeyOpenHashMap.scala    | 153 -------------------
 .../spark/graphx/impl/EdgePartitionSuite.scala  |  18 +++
 12 files changed, 196 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a6b02fb7/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
index d295d01..f97f329 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala
@@ -24,6 +24,9 @@ import org.apache.spark.util.BoundedPriorityQueue
 import org.apache.spark.util.collection.BitSet
 
 import org.apache.spark.graphx.impl._
+import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
+import org.apache.spark.util.collection.OpenHashSet
+
 
 /**
  * Registers GraphX classes with Kryo for improved performance.
@@ -43,8 +46,8 @@ class GraphKryoRegistrator extends KryoRegistrator {
     kryo.register(classOf[PartitionStrategy])
     kryo.register(classOf[BoundedPriorityQueue[Object]])
     kryo.register(classOf[EdgeDirection])
-
-    // This avoids a large number of hash table lookups.
-    kryo.setReferences(false)
+    kryo.register(classOf[GraphXPrimitiveKeyOpenHashMap[VertexId, Int]])
+    kryo.register(classOf[OpenHashSet[Int]])
+    kryo.register(classOf[OpenHashSet[Long]])
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6b02fb7/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
index 871e81f..a5c9cd1 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala
@@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
 import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
 
 /**
  * A collection of edges stored in columnar format, along with any vertex attributes referenced.
The
@@ -42,12 +42,12 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
 private[graphx]
 class EdgePartition[
     @specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag](
-    @transient val srcIds: Array[VertexId],
-    @transient val dstIds: Array[VertexId],
-    @transient val data: Array[ED],
-    @transient val index: PrimitiveKeyOpenHashMap[VertexId, Int],
-    @transient val vertices: VertexPartition[VD],
-    @transient val activeSet: Option[VertexSet] = None
+    val srcIds: Array[VertexId] = null,
+    val dstIds: Array[VertexId] = null,
+    val data: Array[ED] = null,
+    val index: GraphXPrimitiveKeyOpenHashMap[VertexId, Int] = null,
+    val vertices: VertexPartition[VD] = null,
+    val activeSet: Option[VertexSet] = None
   ) extends Serializable {
 
   /** Return a new `EdgePartition` with the specified edge data. */

http://git-wip-us.apache.org/repos/asf/spark/blob/a6b02fb7/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
index ecb49be..4520beb 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala
@@ -23,7 +23,7 @@ import scala.util.Sorting
 import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector}
 
 import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
 
 private[graphx]
 class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag](
@@ -41,7 +41,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag,
VD: Cla
     val srcIds = new Array[VertexId](edgeArray.size)
     val dstIds = new Array[VertexId](edgeArray.size)
     val data = new Array[ED](edgeArray.size)
-    val index = new PrimitiveKeyOpenHashMap[VertexId, Int]
+    val index = new GraphXPrimitiveKeyOpenHashMap[VertexId, Int]
     // Copy edges into columnar structures, tracking the beginnings of source vertex id clusters
and
     // adding them to the index
     if (edgeArray.length > 0) {

http://git-wip-us.apache.org/repos/asf/spark/blob/a6b02fb7/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
index ebb0b94..56f79a7 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala
@@ -20,7 +20,7 @@ package org.apache.spark.graphx.impl
 import scala.reflect.ClassTag
 
 import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
 
 /**
  * The Iterator type returned when constructing edge triplets. This could be an anonymous
class in

http://git-wip-us.apache.org/repos/asf/spark/blob/a6b02fb7/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
index 927e32a..d02e923 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
@@ -25,7 +25,7 @@ import org.apache.spark.rdd.ShuffledRDD
 import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
 
 import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
 
 /**
  * A message from the edge partition `pid` to the vertex partition containing `vid` specifying
that
@@ -69,7 +69,7 @@ object RoutingTablePartition {
     : Iterator[RoutingTableMessage] = {
     // Determine which positions each vertex id appears in using a map where the low 2 bits
     // represent src and dst
-    val map = new PrimitiveKeyOpenHashMap[VertexId, Byte]
+    val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, Byte]
     edgePartition.srcIds.iterator.foreach { srcId =>
       map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6b02fb7/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
index f4e221d..dca54b8 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.util.collection.{BitSet, PrimitiveVector}
 
 import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
 
 /** Stores vertex attributes to ship to an edge partition. */
 private[graphx]

http://git-wip-us.apache.org/repos/asf/spark/blob/a6b02fb7/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
index f1d1747..55c7a19 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala
@@ -22,7 +22,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.util.collection.BitSet
 
 import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
 
 private[graphx] object VertexPartition {
   /** Construct a `VertexPartition` from the given vertices. */

http://git-wip-us.apache.org/repos/asf/spark/blob/a6b02fb7/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
index 8d9e020..34939b2 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
 import org.apache.spark.util.collection.BitSet
 
 import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
 
 private[graphx] object VertexPartitionBase {
   /**
@@ -32,7 +32,7 @@ private[graphx] object VertexPartitionBase {
    */
   def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)])
     : (VertexIdToIndexMap, Array[VD], BitSet) = {
-    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
+    val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
     iter.foreach { pair =>
       map(pair._1) = pair._2
     }
@@ -45,7 +45,7 @@ private[graphx] object VertexPartitionBase {
    */
   def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD)
     : (VertexIdToIndexMap, Array[VD], BitSet) = {
-    val map = new PrimitiveKeyOpenHashMap[VertexId, VD]
+    val map = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
     iter.foreach { pair =>
       map.setMerge(pair._1, pair._2, mergeFunc)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/a6b02fb7/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
index 21ff615..a4f769b 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
@@ -25,7 +25,7 @@ import org.apache.spark.Logging
 import org.apache.spark.util.collection.BitSet
 
 import org.apache.spark.graphx._
-import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
+import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
 
 /**
  * An class containing additional operations for subclasses of VertexPartitionBase that provide
@@ -224,7 +224,7 @@ private[graphx] abstract class VertexPartitionBaseOps
    * Construct a new VertexPartition whose index contains only the vertices in the mask.
    */
   def reindex(): Self[VD] = {
-    val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
+    val hashMap = new GraphXPrimitiveKeyOpenHashMap[VertexId, VD]
     val arbitraryMerge = (a: VD, b: VD) => a
     for ((k, v) <- self.iterator) {
       hashMap.setMerge(k, v, arbitraryMerge)

http://git-wip-us.apache.org/repos/asf/spark/blob/a6b02fb7/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
new file mode 100644
index 0000000..57b01b6
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/GraphXPrimitiveKeyOpenHashMap.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.util.collection
+
+import org.apache.spark.util.collection.OpenHashSet
+
+import scala.reflect._
+
+/**
+ * A fast hash map implementation for primitive, non-null keys. This hash map supports
+ * insertions and updates, but not deletions. This map is about an order of magnitude
+ * faster than java.util.HashMap, while using much less space overhead.
+ *
+ * Under the hood, it uses our OpenHashSet implementation.
+ */
+private[graphx]
+class GraphXPrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
+                              @specialized(Long, Int, Double) V: ClassTag](
+    val keySet: OpenHashSet[K], var _values: Array[V])
+  extends Iterable[(K, V)]
+  with Serializable {
+
+  /**
+   * Allocate an OpenHashMap with a fixed initial capacity
+   */
+  def this(initialCapacity: Int) =
+    this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity))
+
+  /**
+   * Allocate an OpenHashMap with a default initial capacity, providing a true
+   * no-argument constructor.
+   */
+  def this() = this(64)
+
+  /**
+   * Allocate an OpenHashMap with a fixed initial capacity
+   */
+  def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity))
+
+  require(classTag[K] == classTag[Long] || classTag[K] == classTag[Int])
+
+  private var _oldValues: Array[V] = null
+
+  override def size = keySet.size
+
+  /** Get the value for a given key */
+  def apply(k: K): V = {
+    val pos = keySet.getPos(k)
+    _values(pos)
+  }
+
+  /** Get the value for a given key, or returns elseValue if it doesn't exist. */
+  def getOrElse(k: K, elseValue: V): V = {
+    val pos = keySet.getPos(k)
+    if (pos >= 0) _values(pos) else elseValue
+  }
+
+  /** Set the value for a key */
+  def update(k: K, v: V) {
+    val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
+    _values(pos) = v
+    keySet.rehashIfNeeded(k, grow, move)
+    _oldValues = null
+  }
+
+
+  /** Set the value for a key */
+  def setMerge(k: K, v: V, mergeF: (V, V) => V) {
+    val pos = keySet.addWithoutResize(k)
+    val ind = pos & OpenHashSet.POSITION_MASK
+    if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { // if first add
+      _values(ind) = v
+    } else {
+      _values(ind) = mergeF(_values(ind), v)
+    }
+    keySet.rehashIfNeeded(k, grow, move)
+    _oldValues = null
+  }
+
+
+  /**
+   * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
+   * set its value to mergeValue(oldValue).
+   *
+   * @return the newly updated value.
+   */
+  def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
+    val pos = keySet.addWithoutResize(k)
+    if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
+      val newValue = defaultValue
+      _values(pos & OpenHashSet.POSITION_MASK) = newValue
+      keySet.rehashIfNeeded(k, grow, move)
+      newValue
+    } else {
+      _values(pos) = mergeValue(_values(pos))
+      _values(pos)
+    }
+  }
+
+  override def iterator = new Iterator[(K, V)] {
+    var pos = 0
+    var nextPair: (K, V) = computeNextPair()
+
+    /** Get the next value we should return from next(), or null if we're finished iterating
*/
+    def computeNextPair(): (K, V) = {
+      pos = keySet.nextPos(pos)
+      if (pos >= 0) {
+        val ret = (keySet.getValue(pos), _values(pos))
+        pos += 1
+        ret
+      } else {
+        null
+      }
+    }
+
+    def hasNext = nextPair != null
+
+    def next() = {
+      val pair = nextPair
+      nextPair = computeNextPair()
+      pair
+    }
+  }
+
+  // The following member variables are declared as protected instead of private for the
+  // specialization to work (specialized class extends the unspecialized one and needs access
+  // to the "private" variables).
+  // They also should have been val's. We use var's because there is a Scala compiler bug
that
+  // would throw illegal access error at runtime if they are declared as val's.
+  protected var grow = (newCapacity: Int) => {
+    _oldValues = _values
+    _values = new Array[V](newCapacity)
+  }
+
+  protected var move = (oldPos: Int, newPos: Int) => {
+    _values(newPos) = _oldValues(oldPos)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/a6b02fb7/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
b/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
deleted file mode 100644
index 7b02e2e..0000000
--- a/graphx/src/main/scala/org/apache/spark/graphx/util/collection/PrimitiveKeyOpenHashMap.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.graphx.util.collection
-
-import org.apache.spark.util.collection.OpenHashSet
-
-import scala.reflect._
-
-/**
- * A fast hash map implementation for primitive, non-null keys. This hash map supports
- * insertions and updates, but not deletions. This map is about an order of magnitude
- * faster than java.util.HashMap, while using much less space overhead.
- *
- * Under the hood, it uses our OpenHashSet implementation.
- */
-private[graphx]
-class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassTag,
-                              @specialized(Long, Int, Double) V: ClassTag](
-    val keySet: OpenHashSet[K], var _values: Array[V])
-  extends Iterable[(K, V)]
-  with Serializable {
-
-  /**
-   * Allocate an OpenHashMap with a fixed initial capacity
-   */
-  def this(initialCapacity: Int) =
-    this(new OpenHashSet[K](initialCapacity), new Array[V](initialCapacity))
-
-  /**
-   * Allocate an OpenHashMap with a default initial capacity, providing a true
-   * no-argument constructor.
-   */
-  def this() = this(64)
-
-  /**
-   * Allocate an OpenHashMap with a fixed initial capacity
-   */
-  def this(keySet: OpenHashSet[K]) = this(keySet, new Array[V](keySet.capacity))
-
-  require(classTag[K] == classTag[Long] || classTag[K] == classTag[Int])
-
-  private var _oldValues: Array[V] = null
-
-  override def size = keySet.size
-
-  /** Get the value for a given key */
-  def apply(k: K): V = {
-    val pos = keySet.getPos(k)
-    _values(pos)
-  }
-
-  /** Get the value for a given key, or returns elseValue if it doesn't exist. */
-  def getOrElse(k: K, elseValue: V): V = {
-    val pos = keySet.getPos(k)
-    if (pos >= 0) _values(pos) else elseValue
-  }
-
-  /** Set the value for a key */
-  def update(k: K, v: V) {
-    val pos = keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
-    _values(pos) = v
-    keySet.rehashIfNeeded(k, grow, move)
-    _oldValues = null
-  }
-
-
-  /** Set the value for a key */
-  def setMerge(k: K, v: V, mergeF: (V, V) => V) {
-    val pos = keySet.addWithoutResize(k)
-    val ind = pos & OpenHashSet.POSITION_MASK
-    if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) { // if first add
-      _values(ind) = v
-    } else {
-      _values(ind) = mergeF(_values(ind), v)
-    }
-    keySet.rehashIfNeeded(k, grow, move)
-    _oldValues = null
-  }
-
-
-  /**
-   * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
-   * set its value to mergeValue(oldValue).
-   *
-   * @return the newly updated value.
-   */
-  def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
-    val pos = keySet.addWithoutResize(k)
-    if ((pos & OpenHashSet.NONEXISTENCE_MASK) != 0) {
-      val newValue = defaultValue
-      _values(pos & OpenHashSet.POSITION_MASK) = newValue
-      keySet.rehashIfNeeded(k, grow, move)
-      newValue
-    } else {
-      _values(pos) = mergeValue(_values(pos))
-      _values(pos)
-    }
-  }
-
-  override def iterator = new Iterator[(K, V)] {
-    var pos = 0
-    var nextPair: (K, V) = computeNextPair()
-
-    /** Get the next value we should return from next(), or null if we're finished iterating
*/
-    def computeNextPair(): (K, V) = {
-      pos = keySet.nextPos(pos)
-      if (pos >= 0) {
-        val ret = (keySet.getValue(pos), _values(pos))
-        pos += 1
-        ret
-      } else {
-        null
-      }
-    }
-
-    def hasNext = nextPair != null
-
-    def next() = {
-      val pair = nextPair
-      nextPair = computeNextPair()
-      pair
-    }
-  }
-
-  // The following member variables are declared as protected instead of private for the
-  // specialization to work (specialized class extends the unspecialized one and needs access
-  // to the "private" variables).
-  // They also should have been val's. We use var's because there is a Scala compiler bug
that
-  // would throw illegal access error at runtime if they are declared as val's.
-  protected var grow = (newCapacity: Int) => {
-    _oldValues = _values
-    _values = new Array[V](newCapacity)
-  }
-
-  protected var move = (oldPos: Int, newPos: Int) => {
-    _values(newPos) = _oldValues(oldPos)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/a6b02fb7/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index d2e0c01..28fd112 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -22,6 +22,9 @@ import scala.util.Random
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.KryoSerializer
+
 import org.apache.spark.graphx._
 
 class EdgePartitionSuite extends FunSuite {
@@ -120,4 +123,19 @@ class EdgePartitionSuite extends FunSuite {
     assert(!ep.isActive(-1))
     assert(ep.numActives == Some(2))
   }
+
+  test("Kryo serialization") {
+    val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
+    val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
+    val conf = new SparkConf()
+      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+      .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
+    val s = new KryoSerializer(conf).newInstance()
+    val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
+    assert(aSer.srcIds.toList === a.srcIds.toList)
+    assert(aSer.dstIds.toList === a.dstIds.toList)
+    assert(aSer.data.toList === a.data.toList)
+    assert(aSer.index != null)
+    assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
+  }
 }


Mime
View raw message