tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [01/17] tinkerpop git commit: Made is so MemoryComputeKey implements Cloneable. This is actually really important we have NOT been cloning the BiOperators of OrderGlobalStep and GroupStep. We have just been 'getting lucky' in that Spark and Giraph use Se
Date Mon, 09 Jan 2017 14:54:13 GMT
Repository: tinkerpop
Updated Branches:
  refs/heads/tp32 c3e6ed903 -> 2d824cf29


Made is so MemoryComputeKey implements Cloneable. This is actually really important we have
NOT been cloning the BiOperators of OrderGlobalStep and GroupStep. We have just been 'getting
lucky' in that Spark and Giraph use Serialization and thus we get a clone for free. However,
for parallelization within a JVM, we woulld have issues except we never realized because we
had a single global Memory for TinkerGraph. Now we don't and clone()ing bi operators works.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8deca706
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8deca706
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8deca706

Branch: refs/heads/tp32
Commit: 8deca70680e8c89b31fea1cc99300740f45eec56
Parents: 1ac003d
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Wed Jan 4 04:55:12 2017 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Thu Jan 5 16:59:45 2017 -0700

----------------------------------------------------------------------
 .../process/computer/MemoryComputeKey.java      | 24 ++++++++++++++++++--
 .../process/traversal/step/map/GroupStep.java   | 18 +++++++++++++--
 .../traversal/step/map/OrderGlobalStep.java     | 13 ++++++++++-
 .../util/function/ChainedComparator.java        | 18 +++++++++++++--
 .../process/computer/TinkerWorkerMemory.java    |  9 +-------
 5 files changed, 67 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
index 94ca675..70adf3d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MemoryComputeKey.java
@@ -22,6 +22,8 @@ package org.apache.tinkerpop.gremlin.process.computer;
 import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
 
 import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.util.function.BinaryOperator;
 
 /**
@@ -32,10 +34,10 @@ import java.util.function.BinaryOperator;
  *
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MemoryComputeKey<A> implements Serializable {
+public final class MemoryComputeKey<A> implements Serializable, Cloneable {
 
     private final String key;
-    private final BinaryOperator<A> reducer;
+    private BinaryOperator<A> reducer;
     private final boolean isTransient;
     private final boolean isBroadcast;
 
@@ -73,7 +75,25 @@ public final class MemoryComputeKey<A> implements Serializable {
         return object instanceof MemoryComputeKey && ((MemoryComputeKey) object).key.equals(this.key);
     }
 
+    @Override
+    public MemoryComputeKey<A> clone() {
+        try {
+            final MemoryComputeKey<A> clone = (MemoryComputeKey<A>) super.clone();
+            for (final Method method : this.reducer.getClass().getMethods()) {
+                if (method.getName().equals("clone") && 0 == method.getParameterCount())
{
+                    clone.reducer = (BinaryOperator<A>) method.invoke(this.reducer);
+                    break;
+                }
+            }
+            return clone;
+        } catch (final IllegalAccessException | InvocationTargetException | CloneNotSupportedException
e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
     public static <A> MemoryComputeKey<A> of(final String key, final BinaryOperator<A>
reducer, final boolean isBroadcast, final boolean isTransient) {
         return new MemoryComputeKey<>(key, reducer, isBroadcast, isTransient);
     }
+
+
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index de4e223..d6ce421 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -95,7 +95,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S,
Map<K, V>>
             final TraverserSet traverserSet = new TraverserSet<>();
             this.preTraversal.reset();
             this.preTraversal.addStart(traverser);
-            while(this.preTraversal.hasNext()) {
+            while (this.preTraversal.hasNext()) {
                 traverserSet.add(this.preTraversal.nextTraverser());
             }
             map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet);
@@ -158,7 +158,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S,
Map<K, V>>
 
     ///////////////////////
 
-    public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K,
V>>, Serializable {
+    public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K,
V>>, Serializable, Cloneable {
 
         // size limit before Barrier.processAllStarts() to lazy reduce
         private static final int SIZE_LIMIT = 1000;
@@ -182,6 +182,20 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S,
Map<K, V>>
         }
 
         @Override
+        public GroupBiOperator<K, V> clone() {
+            try {
+                final GroupBiOperator<K, V> clone = (GroupBiOperator<K, V>) super.clone();
+                if (null != this.valueTraversal) {
+                    clone.valueTraversal = this.valueTraversal.clone();
+                    clone.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class,
clone.valueTraversal).orElse(null);
+                }
+                return clone;
+            } catch (final CloneNotSupportedException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        @Override
         public Map<K, V> apply(final Map<K, V> mapA, final Map<K, V> mapB)
{
             for (final K key : mapB.keySet()) {
                 Object objectA = mapA.get(key);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
index 60be2d6..a7d21b2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
@@ -144,7 +144,7 @@ public final class OrderGlobalStep<S, C extends Comparable> extends
CollectingBa
 
     ////////////////
 
-    public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>,
Serializable {
+    public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>,
Serializable, Cloneable {
 
         private ChainedComparator chainedComparator;
         private long limit;
@@ -159,6 +159,17 @@ public final class OrderGlobalStep<S, C extends Comparable> extends
CollectingBa
         }
 
         @Override
+        public OrderBiOperator<S> clone() {
+            try {
+                final OrderBiOperator<S> clone = (OrderBiOperator<S>) super.clone();
+                clone.chainedComparator = this.chainedComparator.clone();
+                return clone;
+            } catch (final CloneNotSupportedException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+            }
+        }
+
+        @Override
         public TraverserSet<S> apply(final TraverserSet<S> setA, final TraverserSet<S>
setB) {
             setA.addAll(setB);
             if (Long.MAX_VALUE != this.limit && setA.bulkSize() > this.limit)
{

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
index 44a994b..bdb2e6d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/ChainedComparator.java
@@ -34,9 +34,9 @@ import java.util.stream.Collectors;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class ChainedComparator<S, C extends Comparable> implements Comparator<S>,
Serializable {
+public final class ChainedComparator<S, C extends Comparable> implements Comparator<S>,
Serializable, Cloneable {
 
-    private final List<Pair<Traversal.Admin<S, C>, Comparator<C>>>
comparators = new ArrayList<>();
+    private List<Pair<Traversal.Admin<S, C>, Comparator<C>>> comparators
= new ArrayList<>();
     private final boolean isShuffle;
     private final boolean traversers;
 
@@ -66,4 +66,18 @@ public final class ChainedComparator<S, C extends Comparable> implements
Compara
         }
         return 0;
     }
+
+    @Override
+    public ChainedComparator<S, C> clone() {
+        try {
+            final ChainedComparator<S, C> clone = (ChainedComparator<S, C>) super.clone();
+            clone.comparators = new ArrayList<>();
+            for (final Pair<Traversal.Admin<S, C>, Comparator<C>> comparator
: this.comparators) {
+                clone.comparators.add(new Pair<>(comparator.getValue0().clone(), comparator.getValue1()));
+            }
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8deca706/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
index 081e4fa..1afa27e 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerMemory.java
@@ -21,9 +21,7 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.computer;
 
 import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
-import org.apache.tinkerpop.gremlin.util.Serializer;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
@@ -41,12 +39,7 @@ public final class TinkerWorkerMemory implements Memory.Admin {
     public TinkerWorkerMemory(final TinkerMemory mainMemory) {
         this.mainMemory = mainMemory;
         for (final MemoryComputeKey key : this.mainMemory.memoryKeys.values()) {
-            try {
-                final MemoryComputeKey clone = Serializer.cloneObject(key);
-                this.reducers.put(clone.getKey(), clone.getReducer());
-            } catch (final IOException | ClassNotFoundException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
+            this.reducers.put(key.getKey(), key.clone().getReducer());
         }
     }
 


Mime
View raw message