tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [09/50] incubator-tinkerpop git commit: Got GroupStep working --- buts its a hack unfortunately. Pushing to save work.
Date Tue, 01 Mar 2016 14:26:20 GMT
Got GroupStep working --- buts its a hack unfortunately. Pushing to save work.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/33891225
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/33891225
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/33891225

Branch: refs/heads/master
Commit: 3389122591a788ceb566f9bdf5a1ba6b72789faa
Parents: 65bbdaa
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Mon Feb 22 15:15:21 2016 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Mon Feb 22 15:15:21 2016 -0700

----------------------------------------------------------------------
 .../traversal/TraversalVertexProgram.java       |  12 +-
 .../process/traversal/step/map/GroupStep.java   | 180 ++++---------------
 2 files changed, 45 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/33891225/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index d5059a7..92ea9eb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -36,6 +36,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.FinalGet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
@@ -184,8 +185,15 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     public boolean terminate(final Memory memory) {
         final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
         if (voteToHalt) {
-            if (memory.exists(ReducingBarrierStep.REDUCING))
-                memory.set(ReducingBarrierStep.REDUCING, FinalGet.tryFinalGet(memory.get(ReducingBarrierStep.REDUCING)));
+            for (final ReducingBarrierStep<?, ?> reducingBarrierStep : TraversalHelper.getStepsOfAssignableClassRecursively(ReducingBarrierStep.class,
this.traversal.get())) {
+                if (memory.exists(ReducingBarrierStep.REDUCING)) {
+                    if (reducingBarrierStep instanceof GroupStep)
+                        memory.set(ReducingBarrierStep.REDUCING, ((GroupStep) reducingBarrierStep).getReducedMap(memory.get(ReducingBarrierStep.REDUCING)));
+                    else
+                        memory.set(ReducingBarrierStep.REDUCING, FinalGet.tryFinalGet(memory.get(ReducingBarrierStep.REDUCING)));
+                }
+            }
+
             return true;
         } else {
             memory.set(VOTE_TO_HALT, true);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/33891225/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index a673fea..1051c01 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -18,46 +18,32 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
-import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.GroupStepHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Supplier;
+import java.util.function.BinaryOperator;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
implements MapReducer, GraphComputing, TraversalParent, ByModulating {
+public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
implements TraversalParent, ByModulating {
 
     private char state = 'k';
 
@@ -65,22 +51,26 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S,
Map<K, V>>
     private Traversal.Admin<S, ?> valueTraversal = this.integrateChild(__.identity().asAdmin());
  // used in OLAP
     private Traversal.Admin<?, V> reduceTraversal = this.integrateChild(__.fold().asAdmin());
     // used in OLAP
     private Traversal.Admin<S, V> valueReduceTraversal = this.integrateChild(__.fold().asAdmin());
// used in OLTP
-    private boolean byPass = false;
 
     public GroupStep(final Traversal.Admin traversal) {
         super(traversal);
-        this.setSeedSupplier((Supplier) new GroupStepHelper.GroupMapSupplier());
-        //this.setBiFunction(new GroupBiFunction(this));
+        this.setSeedSupplier(HashMapSupplier.instance());
+        this.setReducingBiOperator(new GroupBiOperator<>());
     }
 
     @Override
-    public Map<K, V> projectTraverser(Traverser.Admin<S> traverser) {
-        return new HashMap<>();
-    }
-
-    @Override
-    public void onGraphComputer() {
-        this.byPass = true;
+    public Map<K, V> projectTraverser(final Traverser.Admin<S> traverser) {
+        final K key = TraversalUtil.applyNullable(traverser, this.keyTraversal);
+        this.valueTraversal.addStart(traverser);
+        final TraverserSet<?> value = new TraverserSet<>();
+        this.valueTraversal.forEachRemaining(t -> {
+            Traverser.Admin x = traverser.split(t, (Step) this);
+            x.setBulk(1l);
+            value.add(x);
+        });
+        final Map<K, V> map = new HashMap<>();
+        map.put(key, (V) value);
+        return map;
     }
 
     @Override
@@ -123,7 +113,6 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S,
Map<K, V>>
         clone.valueReduceTraversal = clone.integrateChild(this.valueReduceTraversal.clone());
         clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
         clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone());
-        //clone.setBiFunction(new GroupBiFunction<>((GroupStep) clone));
         return clone;
     }
 
@@ -134,139 +123,40 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S,
Map<K, V>>
         return result;
     }
 
-    @Override
-    public MapReduce<K, Collection<?>, K, V, Map<K, V>> getMapReduce()
{
-        return new GroupMapReduce<>(this);
-    }
-
-    @Override
-    public Traverser<Map<K, V>> processNextStart() {
-        if (this.byPass) {
-            final Traverser.Admin<S> traverser = this.starts.next();
-            final K key = TraversalUtil.applyNullable(traverser, this.keyTraversal);
-            this.valueTraversal.addStart(traverser);
-            final BulkSet<?> value = this.valueTraversal.toBulkSet();
-            return traverser.asAdmin().split(new Object[]{key, value}, (Step) this);
-        } else {
-            return super.processNextStart();
-        }
-    }
 
     @Override
     public String toString() {
         return StringFactory.stepString(this, this.keyTraversal, this.valueReduceTraversal);
     }
 
-    ///////////
-
-    private static class GroupBiFunction<S, K, V> implements BiFunction<Map<K,
Traversal.Admin<S, V>>, Traverser.Admin<S>, Map<K, Traversal.Admin<S,
V>>>, Serializable {
-
-        private final GroupStep<S, K, V> groupStep;
-        private Map<K, Integer> counters = new HashMap<>();
-
-        private GroupBiFunction(final GroupStep<S, K, V> groupStep) {
-            this.groupStep = groupStep;
-        }
-
-        @Override
-        public Map<K, Traversal.Admin<S, V>> apply(final Map<K, Traversal.Admin<S,
V>> mutatingSeed, final Traverser.Admin<S> traverser) {
-            final K key = TraversalUtil.applyNullable(traverser, this.groupStep.keyTraversal);
-            Traversal.Admin<S, V> traversal = mutatingSeed.get(key);
-            if (null == traversal) {
-                traversal = this.groupStep.valueReduceTraversal.clone();
-                this.counters.put(key, 0);
-                mutatingSeed.put(key, traversal);
-            }
-
-            traversal.addStart(traverser);
-            final int count = this.counters.compute(key, (k, i) -> ++i);
-            if (count > 10000) {
-                this.counters.put(key, 0);
-                TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, traversal).ifPresent(Barrier::processAllStarts);
-            }
-            return mutatingSeed;
+    public Map<K, V> getReducedMap(final Map<K, TraverserSet> traverserMap) {
+        final Map<K, V> reducedMap = new HashMap<>();
+        for (final K key : traverserMap.keySet()) {
+            final Traversal.Admin<?, V> reduceClone = this.reduceTraversal.clone();
+            reduceClone.addStarts(traverserMap.get(key).iterator());
+            reducedMap.put(key, reduceClone.next());
         }
+        return reducedMap;
     }
 
     ///////////
 
-    public static final class GroupMapReduce<S, K, V> implements MapReduce<K, Collection<?>,
K, V, Map<K, V>> {
-
-        public static final String GROUP_BY_STEP_STEP_ID = "gremlin.groupStep.stepId";
-
-        private String groupStepId;
-        private Traversal.Admin<?, V> reduceTraversal;
-
-        private GroupMapReduce() {
-
-        }
-
-        public GroupMapReduce(final GroupStep<S, K, V> step) {
-            this.groupStepId = step.getId();
-            this.reduceTraversal = step.reduceTraversal.clone();
-        }
-
-        @Override
-        public void storeState(final Configuration configuration) {
-            MapReduce.super.storeState(configuration);
-            configuration.setProperty(GROUP_BY_STEP_STEP_ID, this.groupStepId);
-        }
-
-        @Override
-        public void loadState(final Graph graph, final Configuration configuration) {
-            this.groupStepId = configuration.getString(GROUP_BY_STEP_STEP_ID);
-            this.reduceTraversal = ((GroupStep) new TraversalMatrix<>(TraversalVertexProgram.getTraversal(graph,
configuration)).getStepById(this.groupStepId)).reduceTraversal.clone();
-        }
-
-        @Override
-        public boolean doStage(final Stage stage) {
-            return !stage.equals(Stage.COMBINE);
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<K, Collection<?>>
emitter) {
-            vertex.<TraverserSet<Object[]>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet
-> traverserSet.forEach(traverser -> {
-                final Object[] objects = traverser.get();
-                emitter.emit((K) objects[0], (Collection<?>) objects[1]);
-            }));
-        }
-
-        @Override
-        public void reduce(final K key, final Iterator<Collection<?>> values,
final ReduceEmitter<K, V> emitter) {
-            Traversal.Admin<?, V> reduceTraversalClone = this.reduceTraversal.clone();
-            while (values.hasNext()) {
-                reduceTraversalClone.addStarts(reduceTraversalClone.getTraverserGenerator().generateIterator(values.next().iterator(),
(Step) reduceTraversalClone.getStartStep(), 1l));
-                TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, reduceTraversalClone).ifPresent(Barrier::processAllStarts);
-            }
-            emitter.emit(key, reduceTraversalClone.next());
-        }
+    private static class GroupBiOperator<S, K, V> implements BinaryOperator<Map<K,
V>>, Serializable {
 
-        @Override
-        public Map<K, V> generateFinalResult(final Iterator<KeyValue<K, V>>
keyValues) {
-            final Map<K, V> map = new HashMap<>();
-            keyValues.forEachRemaining(keyValue -> map.put(keyValue.getKey(), keyValue.getValue()));
-            return map;
+        private GroupBiOperator() {
         }
 
         @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        @Override
-        public GroupMapReduce<S, K, V> clone() {
-            try {
-                final GroupMapReduce<S, K, V> clone = (GroupMapReduce<S, K, V>)
super.clone();
-                clone.reduceTraversal = this.reduceTraversal.clone();
-                return clone;
-            } catch (final CloneNotSupportedException e) {
-                throw new IllegalStateException(e.getMessage(), e);
+        public Map<K, V> apply(final Map<K, V> mutatingSeed, final Map<K,
V> map) {
+            for (final K key : map.keySet()) {
+                TraverserSet<?> traverserSet = (TraverserSet) mutatingSeed.get(key);
+                if (null == traverserSet) {
+                    traverserSet = new TraverserSet<>();
+                    mutatingSeed.put(key, (V) traverserSet);
+                }
+                traverserSet.addAll((TraverserSet) map.get(key));
             }
-        }
-
-        @Override
-        public String toString() {
-            return StringFactory.mapReduceString(this, this.getMemoryKey());
+            return mutatingSeed;
         }
     }
 }
\ No newline at end of file


Mime
View raw message