tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [1/4] tinkerpop git commit: TINKERPOP-1389 Support Spark 2.0
Date Mon, 12 Sep 2016 17:15:52 GMT
Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1389 [created] deb26060a


TINKERPOP-1389 Support Spark 2.0


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/a49ce627
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/a49ce627
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/a49ce627

Branch: refs/heads/TINKERPOP-1389
Commit: a49ce62761d99e2d232f9fffe9703a4b315f35de
Parents: 1e0d7de
Author: yucx <yucx@cn.ibm.com>
Authored: Thu Sep 1 23:11:58 2016 -0700
Committer: yucx <yucx@cn.ibm.com>
Committed: Thu Sep 1 23:11:58 2016 -0700

----------------------------------------------------------------------
 giraph-gremlin/pom.xml                          |  2 +-
 hadoop-gremlin/pom.xml                          |  2 +-
 spark-gremlin/pom.xml                           | 13 +++++-
 .../spark/process/computer/SparkExecutor.java   | 16 ++++----
 .../SparkStarBarrierInterceptor.java            | 10 ++---
 .../spark/structure/io/gryo/GryoSerializer.java | 43 ++++++++++++++++----
 6 files changed, 59 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a49ce627/giraph-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/giraph-gremlin/pom.xml b/giraph-gremlin/pom.xml
index 2e83f89..190078d 100644
--- a/giraph-gremlin/pom.xml
+++ b/giraph-gremlin/pom.xml
@@ -127,7 +127,7 @@ limitations under the License.
         <dependency>
             <groupId>javax.servlet</groupId>
             <artifactId>javax.servlet-api</artifactId>
-            <version>3.0.1</version>
+            <version>3.1.0</version>
         </dependency>
         <!-- TEST -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a49ce627/hadoop-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index 88133eb..2f9c1ca 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -128,7 +128,7 @@ limitations under the License.
         <dependency>
             <groupId>javax.servlet</groupId>
             <artifactId>javax.servlet-api</artifactId>
-            <version>3.0.1</version>
+            <version>3.1.0</version>
         </dependency>
         <!-- TEST -->
         <dependency>

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a49ce627/spark-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml
index fe8988c..be8ac5e 100644
--- a/spark-gremlin/pom.xml
+++ b/spark-gremlin/pom.xml
@@ -30,6 +30,11 @@
     <name>Apache TinkerPop :: Spark Gremlin</name>
     <dependencies>
         <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>14.0.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.tinkerpop</groupId>
             <artifactId>gremlin-core</artifactId>
             <version>${project.version}</version>
@@ -55,6 +60,10 @@
                     <artifactId>servlet-api</artifactId>
                 </exclusion>
                 <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>javax.servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
                     <groupId>com.sun.jersey</groupId>
                     <artifactId>jersey-core</artifactId>
                 </exclusion>
@@ -104,7 +113,7 @@
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-core_2.10</artifactId>
-            <version>1.6.1</version>
+            <version>2.0.0</version>
             <exclusions>
                 <!-- self conflicts -->
                 <exclusion>
@@ -210,7 +219,7 @@
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
-            <version>2.4.4</version>
+            <version>2.6.5</version>
         </dependency>
         <dependency>
             <groupId>commons-lang</groupId>

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a49ce627/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 8dd2381..8d32b36 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -18,7 +18,7 @@
  */
 package org.apache.tinkerpop.gremlin.spark.process.computer;
 
-import com.google.common.base.Optional;
+import org.apache.spark.api.java.Optional;
 import org.apache.commons.configuration.Configuration;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.Function2;
@@ -65,7 +65,7 @@ public final class SparkExecutor {
     public static JavaPairRDD<Object, VertexWritable> applyGraphFilter(final JavaPairRDD<Object,
VertexWritable> graphRDD, final GraphFilter graphFilter) {
         return graphRDD.mapPartitionsToPair(partitionIterator -> {
             final GraphFilter gFilter = graphFilter.clone();
-            return () -> IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
+            return IteratorUtils.filter(partitionIterator, tuple -> (tuple._2().get().applyGraphFilter(gFilter)).isPresent());
         }, true);
     }
 
@@ -95,7 +95,7 @@ public final class SparkExecutor {
                     final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys());
// the compute keys as an array
                     final SparkMessenger<M> messenger = new SparkMessenger<>();
                     workerVertexProgram.workerIterationStart(memory.asImmutable()); // start
the worker
-                    return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming
-> {
+                    return IteratorUtils.map(partitionIterator, vertexViewIncoming ->
{
                         final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get();
// get the vertex from the vertex writable
                         final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent();
// if this is the first iteration, then there are no views or messages
                         final List<DetachedVertexProperty<Object>> previousView
= hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : memory.isInitialIteration()
? new ArrayList<>() : Collections.emptyList();
@@ -133,7 +133,7 @@ public final class SparkExecutor {
         /////////////////////////////////////////////////////////////
         /////////////////////////////////////////////////////////////
         final PairFlatMapFunction<Tuple2<Object, ViewOutgoingPayload<M>>,
Object, Payload> messageFunction =
-                tuple -> () -> IteratorUtils.concat(
+                tuple -> IteratorUtils.concat(
                         IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
     // emit the view payload
                         IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message
-> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))));
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(vertexProgramConfiguration),
vertexProgramConfiguration).getMessageCombiner().orElse(null);
@@ -172,7 +172,7 @@ public final class SparkExecutor {
         newViewIncomingRDD
                 .foreachPartition(partitionIterator -> {
                     KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
-                }); // need to complete a task so its BSP and the memory for this iteration
is updated
+                }); // need to complete a task so its BSP and the memory for this iteration
is updated├č
         return newViewIncomingRDD;
     }
 
@@ -207,7 +207,7 @@ public final class SparkExecutor {
             final Configuration graphComputerConfiguration) {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator ->
{
             KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
-            return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?,
?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration),
partitionIterator);
+            return new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration),
graphComputerConfiguration), partitionIterator);
         });
         if (mapReduce.getMapKeySort().isPresent())
             mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get(), true, 1);
@@ -218,7 +218,7 @@ public final class SparkExecutor {
                                                                     final Configuration graphComputerConfiguration)
{
         return mapRDD.mapPartitionsToPair(partitionIterator -> {
             KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
-            return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V,
OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration),
partitionIterator);
+            return new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration),
graphComputerConfiguration), partitionIterator);
         });
     }
 
@@ -227,7 +227,7 @@ public final class SparkExecutor {
             final Configuration graphComputerConfiguration) {
         JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator
-> {
             KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
-            return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK,
OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration),
partitionIterator);
+            return new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration),
graphComputerConfiguration), partitionIterator);
         });
         if (mapReduce.getReduceKeySort().isPresent())
             reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a49ce627/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
index 8585e0d..dc22d47 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
@@ -85,13 +85,11 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
                 .filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(),
graphStepIds)) // ensure vertex ids are in V(x)
                 .flatMap(vertexWritable -> {
                     if (identityTraversal)                          // g.V.count()-style
(identity)
-                        return () -> IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(),
(Step) graphStep, 1l));
+                        return IteratorUtils.of(traversal.getTraverserGenerator().generate(vertexWritable.get(),
(Step) graphStep, 1l));
                     else {                                          // add the vertex to
head of the traversal
-                        return () -> {                              // and iterate it
for its results
-                            final Traversal.Admin<Vertex, ?> clone = traversal.clone();
// need a unique clone for each vertex to isolate the computation
+                        final Traversal.Admin<Vertex, ?> clone = traversal.clone();
// need a unique clone for each vertex to isolate the computation
                             clone.getStartStep().addStart(clone.getTraverserGenerator().generate(vertexWritable.get(),
graphStep, 1l));
                             return (Step) clone.getEndStep();
-                        };
                     }
                 });
         // USE SPARK DSL FOR THE RESPECTIVE END REDUCING BARRIER STEP OF THE TRAVERSAL
@@ -133,14 +131,14 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
             result = ((GroupStep) endStep).generateFinalResult(nextRDD.
                     mapPartitions(partitions -> {
                         final GroupStep<Object, Object, Object> clone = (GroupStep)
endStep.clone();
-                        return () -> IteratorUtils.map(partitions, clone::projectTraverser);
+                        return IteratorUtils.map(partitions, clone::projectTraverser);
                     }).fold(((GroupStep<Object, Object, Object>) endStep).getSeedSupplier().get(),
biOperator::apply));
         } else if (endStep instanceof GroupCountStep) {
             final GroupCountStep.GroupCountBiOperator<Object> biOperator = GroupCountStep.GroupCountBiOperator.instance();
             result = nextRDD
                     .mapPartitions(partitions -> {
                         final GroupCountStep<Object, Object> clone = (GroupCountStep)
endStep.clone();
-                        return () -> IteratorUtils.map(partitions, clone::projectTraverser);
+                        return IteratorUtils.map(partitions, clone::projectTraverser);
                     })
                     .fold(((GroupCountStep<Object, Object>) endStep).getSeedSupplier().get(),
biOperator::apply);
         } else

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a49ce627/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
index 28a4d55..6735fe5 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/gryo/GryoSerializer.java
@@ -24,7 +24,7 @@ import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.python.PythonBroadcast;
-import org.apache.spark.broadcast.HttpBroadcast;
+import org.apache.spark.broadcast.TorrentBroadcast;
 import org.apache.spark.network.util.ByteUnit;
 import org.apache.spark.scheduler.CompressedMapStatus;
 import org.apache.spark.scheduler.HighlyCompressedMapStatus;
@@ -48,24 +48,32 @@ import scala.Tuple3;
 import scala.collection.mutable.WrappedArray;
 import scala.runtime.BoxedUnit;
 
+import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class GryoSerializer extends Serializer {
+public final class GryoSerializer extends Serializer implements Serializable {
 
     //private final Option<String> userRegistrator;
     private final int bufferSize;
     private final int maxBufferSize;
+    private final int poolSize;
+    private final ArrayList<String> ioRegList = new ArrayList<>();
+    private final boolean referenceTracking;
+    private final boolean registrationRequired;
 
-    private final GryoPool gryoPool;
+
+    private transient GryoPool gryoPool;
 
     public GryoSerializer(final SparkConf sparkConfiguration) {
         final long bufferSizeKb = sparkConfiguration.getSizeAsKb("spark.kryoserializer.buffer",
"64k");
         final long maxBufferSizeMb = sparkConfiguration.getSizeAsMb("spark.kryoserializer.buffer.max",
"64m");
-        final boolean referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking",
true);
-        final boolean registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired",
false);
+        referenceTracking = sparkConfiguration.getBoolean("spark.kryo.referenceTracking",
true);
+        registrationRequired = sparkConfiguration.getBoolean("spark.kryo.registrationRequired",
false);
         if (bufferSizeKb >= ByteUnit.GiB.toKiB(2L)) {
             throw new IllegalArgumentException("spark.kryoserializer.buffer must be less
than 2048 mb, got: " + bufferSizeKb + " mb.");
         } else {
@@ -77,9 +85,19 @@ public final class GryoSerializer extends Serializer {
                 //this.userRegistrator = sparkConfiguration.getOption("spark.kryo.registrator");
             }
         }
-        this.gryoPool = GryoPool.build().
-                poolSize(sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT)).
-                ioRegistries(makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY,
Collections.emptyList())).
+        poolSize = sparkConfiguration.getInt(GryoPool.CONFIG_IO_GRYO_POOL_SIZE, GryoPool.CONFIG_IO_GRYO_POOL_SIZE_DEFAULT);
+        List<Object> list = makeApacheConfiguration(sparkConfiguration).getList(GryoPool.CONFIG_IO_REGISTRY,
Collections.emptyList());
+        list.forEach(c -> {
+                    ioRegList.add(c.toString());
+                }
+        );
+    }
+
+    private GryoPool createPool(){
+        List<Object> list = new ArrayList<>(ioRegList);
+        return GryoPool.build().
+                poolSize(poolSize).
+                ioRegistries(list).
                 initializeMapper(builder -> {
                     try {
                         builder.addCustom(Tuple2.class, new Tuple2Serializer())
@@ -91,7 +109,7 @@ public final class GryoSerializer extends Serializer {
                                 .addCustom(CompressedMapStatus.class)
                                 .addCustom(BlockManagerId.class)
                                 .addCustom(HighlyCompressedMapStatus.class, new ExternalizableSerializer())
  // externalizable implemented so its okay
-                                .addCustom(HttpBroadcast.class)
+                                .addCustom(TorrentBroadcast.class)
                                 .addCustom(PythonBroadcast.class)
                                 .addCustom(BoxedUnit.class)
                                 .addCustom(Class.forName("scala.reflect.ClassTag$$anon$1"),
new JavaSerializer())
@@ -118,6 +136,13 @@ public final class GryoSerializer extends Serializer {
     }
 
     public GryoPool getGryoPool() {
+        if (gryoPool == null) {
+            synchronized (this) {
+                if (gryoPool == null) {
+                    gryoPool = createPool();
+                }
+            }
+        }
         return this.gryoPool;
     }
 


Mime
View raw message