tinkerpop-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ok...@apache.org
Subject [08/50] incubator-tinkerpop git commit: Have all the ReducingBarrierSteps no longer implement MapReduce and using MemoryComputeKeys for their reduction. GroupStep, FoldStep, and GroupStepV3d0 are not complete yet. Having the darndest time with GroupStep
Date Tue, 01 Mar 2016 14:26:19 GMT
Have all the ReducingBarrierSteps no longer implement MapReduce and using MemoryComputeKeys for their reduction. GroupStep, FoldStep, and GroupStepV3d0 are not complete yet. Having the darndest time with GroupStep -- once I get it, then the others will follow from it. Pushing to save work.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/65bbdaa3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/65bbdaa3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/65bbdaa3

Branch: refs/heads/master
Commit: 65bbdaa336b4034421f7e2599bb3f2c307aa4773
Parents: b03c1ad
Author: Marko A. Rodriguez <okrammarko@gmail.com>
Authored: Mon Feb 22 14:41:02 2016 -0700
Committer: Marko A. Rodriguez <okrammarko@gmail.com>
Committed: Mon Feb 22 14:41:02 2016 -0700

----------------------------------------------------------------------
 .../traversal/TraversalVertexProgram.java       |  31 ++++--
 .../computer/traversal/TraverserExecutor.java   |  28 +++--
 .../traversal/step/map/CountGlobalStep.java     | 101 ++----------------
 .../process/traversal/step/map/FoldStep.java    |  41 ++++----
 .../traversal/step/map/GroupCountStep.java      |  94 +++--------------
 .../process/traversal/step/map/GroupStep.java   |  11 +-
 .../traversal/step/map/GroupStepV3d0.java       |   9 +-
 .../traversal/step/map/MaxGlobalStep.java       | 103 +++----------------
 .../traversal/step/map/MeanGlobalStep.java      | 102 ++----------------
 .../traversal/step/map/MinGlobalStep.java       | 100 +++---------------
 .../traversal/step/map/SumGlobalStep.java       |  98 ++----------------
 .../process/traversal/step/map/TreeStep.java    |  97 ++++-------------
 .../step/util/ReducingBarrierStep.java          |  29 +++---
 13 files changed, 176 insertions(+), 668 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index ad71f8a..d5059a7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.FinalGet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
@@ -80,7 +81,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
 
     // TODO: if not an adjacent traversal, use Local message scope -- a dual messaging system.
     private static final Set<MessageScope> MESSAGE_SCOPES = new HashSet<>(Collections.singletonList(MessageScope.Global.instance()));
-    private static final Set<VertexComputeKey> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(
+    private Set<MemoryComputeKey> memoryComputeKeys = new HashSet<>();
+    private static final Set<VertexComputeKey> VERTEX_COMPUTE_KEYS = new HashSet<>(Arrays.asList(
             VertexComputeKey.of(HALTED_TRAVERSERS, false),
             VertexComputeKey.of(TraversalSideEffects.SIDE_EFFECTS, false)));
 
@@ -110,11 +112,18 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
         if (!this.traversal.get().isLocked())
             this.traversal.get().applyStrategies();
         this.traversalMatrix = new TraversalMatrix<>(this.traversal.get());
+        this.memoryComputeKeys.add(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), true));
         for (final MapReducer<?, ?, ?, ?, ?> mapReducer : TraversalHelper.getStepsOfAssignableClassRecursively(MapReducer.class, this.traversal.get())) {
             this.mapReducers.add(mapReducer.getMapReduce());
+            this.memoryComputeKeys.add(MemoryComputeKey.of(mapReducer.getMapReduce().getMemoryKey(), MemoryComputeKey.setOperator(), false));
         }
-        if (!(this.traversal.get().getEndStep() instanceof SideEffectCapStep) && !(this.traversal.get().getEndStep() instanceof ReducingBarrierStep))
+        if (!(this.traversal.get().getEndStep() instanceof SideEffectCapStep) && !(this.traversal.get().getEndStep() instanceof ReducingBarrierStep)) {
             this.mapReducers.add(new TraverserMapReduce(this.traversal.get()));
+            this.memoryComputeKeys.add(MemoryComputeKey.of(TraverserMapReduce.TRAVERSERS, MemoryComputeKey.setOperator(), false));
+        }
+        for (final ReducingBarrierStep<?, ?> reducingBarrierStep : TraversalHelper.getStepsOfAssignableClassRecursively(ReducingBarrierStep.class, this.traversal.get())) {
+            this.memoryComputeKeys.add(MemoryComputeKey.of(ReducingBarrierStep.REDUCING, reducingBarrierStep.getBiOperator(), false));
+        }
     }
 
     @Override
@@ -126,6 +135,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     @Override
     public void setup(final Memory memory) {
         memory.set(VOTE_TO_HALT, true);
+        for (final ReducingBarrierStep<?, ?> reducingBarrierStep : TraversalHelper.getStepsOfAssignableClassRecursively(ReducingBarrierStep.class, this.traversal.get())) {
+            memory.set(ReducingBarrierStep.REDUCING, reducingBarrierStep.getSeedSupplier().get());
+        }
     }
 
     @Override
@@ -162,9 +174,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                         aliveTraverses.add((Traverser.Admin) traverser);
                 });
             }
-            memory.add(VOTE_TO_HALT, aliveTraverses.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, aliveTraverses), this.traversalMatrix));
+            memory.add(VOTE_TO_HALT, aliveTraverses.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, aliveTraverses), this.traversalMatrix, memory));
         } else {  // ITERATION 1+
-            memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix));
+            memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix, memory));
         }
     }
 
@@ -172,6 +184,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     public boolean terminate(final Memory memory) {
         final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
         if (voteToHalt) {
+            if (memory.exists(ReducingBarrierStep.REDUCING))
+                memory.set(ReducingBarrierStep.REDUCING, FinalGet.tryFinalGet(memory.get(ReducingBarrierStep.REDUCING)));
             return true;
         } else {
             memory.set(VOTE_TO_HALT, true);
@@ -181,17 +195,12 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
 
     @Override
     public Set<VertexComputeKey> getVertexComputeKeys() {
-        return ELEMENT_COMPUTE_KEYS;
+        return VERTEX_COMPUTE_KEYS;
     }
 
     @Override
     public Set<MemoryComputeKey> getMemoryComputeKeys() {
-        final Set<MemoryComputeKey> memoryComputeKeys = new HashSet<>();
-        memoryComputeKeys.add(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), true));
-        for (final MapReduce mapReduce : this.mapReducers) {
-            memoryComputeKeys.add(MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false));
-        }
-        return memoryComputeKeys;
+        return this.memoryComputeKeys;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
index 078e880..4660183 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
@@ -18,12 +18,14 @@
  */
 package org.apache.tinkerpop.gremlin.process.computer.traversal;
 
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Edge;
@@ -40,7 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public final class TraverserExecutor {
 
-    public static boolean execute(final Vertex vertex, final Messenger<TraverserSet<?>> messenger, final TraversalMatrix<?, ?> traversalMatrix) {
+    public static boolean execute(final Vertex vertex, final Messenger<TraverserSet<?>> messenger, final TraversalMatrix<?, ?> traversalMatrix, final Memory memory) {
 
         final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
         final AtomicBoolean voteToHalt = new AtomicBoolean(true);
@@ -69,11 +71,11 @@ public final class TraverserExecutor {
                 traversers.remove();
                 final Step<?, ?> currentStep = traversalMatrix.getStepById(traverser.getStepId());
                 if (!currentStep.getId().equals(previousStep.getId()))
-                    TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers);
+                    TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers, memory);
                 currentStep.addStart((Traverser.Admin) traverser);
                 previousStep = currentStep;
             }
-            TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers);
+            TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers, memory);
             assert toProcessTraversers.isEmpty();
             // process all the local objects and send messages or store locally again
             if (!aliveTraversers.isEmpty()) {
@@ -102,14 +104,18 @@ public final class TraverserExecutor {
         return voteToHalt.get();
     }
 
-    private static void drainStep(final Step<?, ?> step, final TraverserSet<?> aliveTraversers, final TraverserSet<?> haltedTraversers) {
-        step.forEachRemaining(traverser -> {
-            if (traverser.asAdmin().isHalted()) {
-                traverser.asAdmin().detach();
-                haltedTraversers.add((Traverser.Admin) traverser);
-            } else
-                aliveTraversers.add((Traverser.Admin) traverser);
-        });
+    private static void drainStep(final Step<?, ?> step, final TraverserSet<?> aliveTraversers, final TraverserSet<?> haltedTraversers, final Memory memory) {
+        if (step instanceof ReducingBarrierStep) {
+            memory.add(ReducingBarrierStep.REDUCING, step.next().get());
+        } else {
+            step.forEachRemaining(traverser -> {
+                if (traverser.asAdmin().isHalted()) {
+                    traverser.asAdmin().detach();
+                    haltedTraversers.add((Traverser.Admin) traverser);
+                } else
+                    aliveTraversers.add((Traverser.Admin) traverser);
+            });
+        }
     }
 
     private static Vertex getHostingVertex(final Object object) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
index 096580d..4d72146 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
@@ -18,128 +18,49 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class CountGlobalStep<S> extends ReducingBarrierStep<S, Long> implements MapReducer {
+public final class CountGlobalStep<S> extends ReducingBarrierStep<S, Long> {
 
     private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(TraverserRequirement.BULK);
 
     public CountGlobalStep(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier(new ConstantSupplier<>(0L));
-        this.setBiFunction(CountBiFunction.<S>instance());
+        this.setReducingBiOperator(new CountBiOperator());
     }
 
-
     @Override
-    public Set<TraverserRequirement> getRequirements() {
-        return REQUIREMENTS;
+    public Long projectTraverser(final Traverser.Admin<S> traverser) {
+        return traverser.bulk();
     }
 
+
     @Override
-    public MapReduce<MapReduce.NullObject, Long, MapReduce.NullObject, Long, Long> getMapReduce() {
-        return CountGlobalMapReduce.instance();
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
     }
 
     ///////////
 
-    private static class CountBiFunction<S> implements BiFunction<Long, Traverser<S>, Long>, Serializable {
-
-        private static final CountBiFunction INSTANCE = new CountBiFunction();
-
-        private CountBiFunction() {
-
-        }
+    private static class CountBiOperator implements BinaryOperator<Long>, Serializable {
 
         @Override
-        public Long apply(final Long mutatingSeed, final Traverser<S> traverser) {
-            return mutatingSeed + traverser.bulk();
-        }
-
-        public final static <S> CountBiFunction<S> instance() {
-            return INSTANCE;
+        public Long apply(final Long mutatingSeed, final Long count) {
+            return mutatingSeed + count;
         }
     }
 
-    ///////////
-
-    private static class CountGlobalMapReduce extends StaticMapReduce<MapReduce.NullObject, Long, MapReduce.NullObject, Long, Long> {
-
-        private static CountGlobalMapReduce INSTANCE = new CountGlobalMapReduce();
-
-        private CountGlobalMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final MapReduce.Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<NullObject, Long> emitter) {
-            final Iterator<Long> values = IteratorUtils.map(vertex.<Set<Traverser.Admin<Long>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(Collections.emptySet()).iterator(),
-                    traverser -> traverser.get() * traverser.bulk());
-            long count = getCount(values);
-            if (count > 0)
-                emitter.emit(count);
-        }
-
-        @Override
-        public void combine(final NullObject key, final Iterator<Long> values, final ReduceEmitter<NullObject, Long> emitter) {
-            this.reduce(key, values, emitter);
-        }
-
-        @Override
-        public void reduce(final NullObject key, final Iterator<Long> values, final ReduceEmitter<NullObject, Long> emitter) {
-            long count = getCount(values);
-            if (count > 0)
-                emitter.emit(count);
-        }
-
-        private Long getCount(final Iterator<Long> longs) {
-            long count = 0l;
-            while (longs.hasNext()) {
-                count = count + longs.next();
-            }
-            return count;
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        @Override
-        public Long generateFinalResult(final Iterator<KeyValue<NullObject, Long>> keyValues) {
-            return keyValues.hasNext() ? keyValues.next().getValue() : 0L;
-        }
-
-        public static final CountGlobalMapReduce instance() {
-            return INSTANCE;
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
index c61f89f..ca45767 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
@@ -25,10 +25,12 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequire
 import org.apache.tinkerpop.gremlin.util.function.ArrayListSupplier;
 
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Set;
 import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 import java.util.function.Supplier;
 
 /**
@@ -39,13 +41,18 @@ public final class FoldStep<S, E> extends ReducingBarrierStep<S, E> {
     private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(TraverserRequirement.OBJECT);
 
     public FoldStep(final Traversal.Admin traversal) {
-        this(traversal, (Supplier) ArrayListSupplier.instance(), (BiFunction) ArrayListBiFunction.instance());
+        this(traversal, (Supplier) ArrayListSupplier.instance(), (BiFunction) new ListBiOperator<>());
+    }
+
+    @Override
+    public E projectTraverser(final Traverser.Admin<S> traverser) {
+        return (E) Collections.singletonList(traverser.get());
     }
 
     public FoldStep(final Traversal.Admin traversal, final Supplier<E> seed, final BiFunction<E, S, E> foldFunction) {
         super(traversal);
         this.setSeedSupplier(seed);
-        this.setBiFunction(new FoldBiFunction<>(foldFunction));
+        this.setReducingBiOperator(new FoldBiOperator<>(foldFunction));
     }
 
     @Override
@@ -55,41 +62,29 @@ public final class FoldStep<S, E> extends ReducingBarrierStep<S, E> {
 
     /////////
 
-    private static class ArrayListBiFunction<S> implements BiFunction<ArrayList<S>, S, ArrayList<S>>, Serializable {
-
-        private static final ArrayListBiFunction INSTANCE = new ArrayListBiFunction();
-
-        private ArrayListBiFunction() {
-
-        }
+    private static class ListBiOperator<S> implements BinaryOperator<List<S>>, Serializable {
 
         @Override
-        public ArrayList<S> apply(final ArrayList<S> mutatingSeed, final S traverser) {
-            mutatingSeed.add(traverser);
+        public List<S> apply(final List<S> mutatingSeed, final List<S> list) {
+            mutatingSeed.addAll(list);
             return mutatingSeed;
         }
 
-        public final static <S> ArrayListBiFunction<S> instance() {
-            return INSTANCE;
-        }
     }
 
     ///////
 
-    public static class FoldBiFunction<S, E> implements BiFunction<E, Traverser<S>, E>, Serializable {
+    public static class FoldBiOperator<E> implements BinaryOperator<E>, Serializable {
 
-        private final BiFunction<E, S, E> biFunction;
+        private final BiFunction biFunction;
 
-        public FoldBiFunction(final BiFunction<E, S, E> biFunction) {
+        public FoldBiOperator(final BiFunction biFunction) {
             this.biFunction = biFunction;
         }
 
         @Override
-        public E apply(E seed, final Traverser<S> traverser) {
-            for (int i = 0; i < traverser.bulk(); i++) {
-                seed = this.biFunction.apply(seed, traverser.get());
-            }
-            return seed;
+        public E apply(E seed, E other) {
+            return (E) this.biFunction.apply(seed, other);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
index 6c7762c..e669bca 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
@@ -18,44 +18,36 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.MapHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
 
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Long>> implements MapReducer, TraversalParent, ByModulating {
+public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Long>> implements TraversalParent, ByModulating {
 
     private Traversal.Admin<S, E> groupTraversal = null;
 
     public GroupCountStep(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier(HashMapSupplier.instance());
-        this.setBiFunction(new GroupCountBiFunction(this));
+        this.setReducingBiOperator(new GroupCountBiOperator());
     }
 
 
@@ -69,14 +61,15 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
         return null == this.groupTraversal ? Collections.emptyList() : Collections.singletonList(this.groupTraversal);
     }
 
-    @Override
-    public Set<TraverserRequirement> getRequirements() {
-        return this.getSelfAndChildRequirements(TraverserRequirement.BULK);
+    public Map<E, Long> projectTraverser(final Traverser.Admin<S> traverser) {
+        final Map<E, Long> map = new HashMap<>(); // TODO: make singleton map
+        map.put(TraversalUtil.applyNullable(traverser, this.groupTraversal), traverser.bulk());
+        return map;
     }
 
     @Override
-    public MapReduce<E, Long, E, Long, Map<E, Long>> getMapReduce() {
-        return GroupCountMapReduce.instance();
+    public Set<TraverserRequirement> getRequirements() {
+        return this.getSelfAndChildRequirements(TraverserRequirement.BULK);
     }
 
     @Override
@@ -84,7 +77,6 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
         final GroupCountStep<S, E> clone = (GroupCountStep<S, E>) super.clone();
         if (null != this.groupTraversal)
             clone.groupTraversal = clone.integrateChild(this.groupTraversal.clone());
-        clone.setBiFunction(new GroupCountBiFunction<>(clone));
         return clone;
     }
 
@@ -104,74 +96,12 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
 
     ///////////
 
-    private static class GroupCountBiFunction<S, E> implements BiFunction<Map<E, Long>, Traverser<S>, Map<E, Long>>, Serializable {
-
-        private final GroupCountStep<S, E> groupCountStep;
-
-        private GroupCountBiFunction(final GroupCountStep<S, E> groupCountStep) {
-            this.groupCountStep = groupCountStep;
-
-        }
+    private static class GroupCountBiOperator<E> implements BinaryOperator<Map<E, Long>>, Serializable {
 
         @Override
-        public Map<E, Long> apply(final Map<E, Long> mutatingSeed, final Traverser<S> traverser) {
-            MapHelper.incr(mutatingSeed, TraversalUtil.applyNullable(traverser.asAdmin(), this.groupCountStep.groupTraversal), traverser.bulk());
+        public Map<E, Long> apply(final Map<E, Long> mutatingSeed, final Map<E, Long> map) {
+            map.forEach((k, v) -> MapHelper.incr(mutatingSeed, k, v));
             return mutatingSeed;
         }
     }
-
-    ///////////
-
-    public static final class GroupCountMapReduce<E> extends StaticMapReduce<E, Long, E, Long, Map<E, Long>> {
-
-        private static final GroupCountMapReduce INSTANCE = new GroupCountMapReduce();
-
-        private GroupCountMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<E, Long> emitter) {
-            final Map<E, Long> groupCount = new HashMap<>();
-            vertex.<TraverserSet<Map<E, Long>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet ->
-                    traverserSet.forEach(traverser ->
-                            traverser.get().forEach((k, v) -> MapHelper.incr(groupCount, k, (v * traverser.bulk())))));
-            groupCount.forEach(emitter::emit);
-        }
-
-        @Override
-        public void reduce(final E key, final Iterator<Long> values, final ReduceEmitter<E, Long> emitter) {
-            long counter = 0;
-            while (values.hasNext()) {
-                counter = counter + values.next();
-            }
-            emitter.emit(key, counter);
-        }
-
-        @Override
-        public void combine(final E key, final Iterator<Long> values, final ReduceEmitter<E, Long> emitter) {
-            reduce(key, values, emitter);
-        }
-
-        @Override
-        public Map<E, Long> generateFinalResult(final Iterator<KeyValue<E, Long>> keyValues) {
-            final Map<E, Long> map = new HashMap<>();
-            keyValues.forEachRemaining(keyValue -> map.put(keyValue.getKey(), keyValue.getValue()));
-            return map;
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        public static final <E> GroupCountMapReduce<E> instance() {
-            return INSTANCE;
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index de3fd28..a673fea 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -70,7 +70,12 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
     public GroupStep(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier((Supplier) new GroupStepHelper.GroupMapSupplier());
-        this.setBiFunction(new GroupBiFunction(this));
+        //this.setBiFunction(new GroupBiFunction(this));
+    }
+
+    @Override
+    public Map<K, V> projectTraverser(Traverser.Admin<S> traverser) {
+        return new HashMap<>();
     }
 
     @Override
@@ -90,7 +95,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
     }
 
     @Override
-    public void modulateBy(final Traversal.Admin<?,?> kvTraversal) {
+    public void modulateBy(final Traversal.Admin<?, ?> kvTraversal) {
         if ('k' == this.state) {
             this.keyTraversal = this.integrateChild(kvTraversal);
             this.state = 'v';
@@ -118,7 +123,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
         clone.valueReduceTraversal = clone.integrateChild(this.valueReduceTraversal.clone());
         clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
         clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone());
-        clone.setBiFunction(new GroupBiFunction<>((GroupStep) clone));
+        //clone.setBiFunction(new GroupBiFunction<>((GroupStep) clone));
         return clone;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
index d28d4e3..a2b7f5e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
@@ -69,7 +69,12 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
     public GroupStepV3d0(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier((Supplier) new GroupMapSupplierV3d0());
-        this.setBiFunction(new GroupBiFunction(this));
+       // this.setBiFunction(new GroupBiFunction(this));
+    }
+
+    @Override
+    public Map<K, R> projectTraverser(Traverser.Admin<S> traverser) {
+        return null;
     }
 
     @Override
@@ -123,7 +128,7 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
             clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
         if (null != this.reduceTraversal)
             clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone());
-        clone.setBiFunction(new GroupBiFunction<>((GroupStepV3d0) clone));
+       // clone.setBiFunction(new GroupBiFunction<>((GroupStepV3d0) clone));
         return clone;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
index c552bab..7db28e5 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
@@ -18,124 +18,49 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 
 import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.max;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MaxGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> implements MapReducer {
+public final class MaxGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> {
+
+
+    private static final Double NAN = Double.valueOf(Double.NaN);
 
     public MaxGlobalStep(final Traversal.Admin traversal) {
         super(traversal);
-        this.setSeedSupplier(new ConstantSupplier<>(null));
-        this.setBiFunction(MaxGlobalBiFunction.<S>instance());
+        this.setSeedSupplier(new ConstantSupplier<>((S) NAN));
+        this.setReducingBiOperator(new MaxGlobalBiOperator<>());
     }
 
     @Override
-    public Set<TraverserRequirement> getRequirements() {
-        return Collections.singleton(TraverserRequirement.OBJECT);
+    public S projectTraverser(final Traverser.Admin<S> traverser) {
+        return traverser.get();
     }
 
     @Override
-    public MapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> getMapReduce() {
-        return MaxGlobalMapReduce.instance();
+    public Set<TraverserRequirement> getRequirements() {
+        return Collections.singleton(TraverserRequirement.OBJECT);
     }
 
     /////
 
-    private static class MaxGlobalBiFunction<S extends Number> implements BiFunction<S, Traverser<S>, S>, Serializable {
-
-        private static final MaxGlobalBiFunction INSTANCE = new MaxGlobalBiFunction();
-
-        private MaxGlobalBiFunction() {
-
-        }
-
+    private static class MaxGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
         @Override
-        public S apply(final S mutatingSeed, final Traverser<S> traverser) {
-            final S value = traverser.get();
-            return mutatingSeed != null ? (S) max(mutatingSeed, traverser.get()) : value;
-        }
-
-        public static <S extends Number> MaxGlobalBiFunction<S> instance() {
-            return INSTANCE;
-        }
-    }
-
-    ///////////
-
-    private static class MaxGlobalMapReduce extends StaticMapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> {
-
-        private static final MaxGlobalMapReduce INSTANCE = new MaxGlobalMapReduce();
-
-        private MaxGlobalMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final MapReduce.Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<NullObject, Number> emitter) {
-            final Iterator<Number> values = IteratorUtils.map(vertex.<Set<Traverser.Admin<Number>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(Collections.emptySet()).iterator(), Traverser.Admin::get);
-            if (values.hasNext())
-                emitter.emit(getMax(values));
-        }
-
-        @Override
-        public void combine(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
-            this.reduce(key, values, emitter);
-        }
-
-        @Override
-        public void reduce(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
-            if (values.hasNext())
-                emitter.emit(getMax(values));
-        }
-
-        private Number getMax(final Iterator<Number> numbers) {
-            Number max = null;
-            while (numbers.hasNext()) {
-                final Number value = numbers.next();
-                max = max != null ? max(value, max) : value;
-            }
-            return max;
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        @Override
-        public Number generateFinalResult(final Iterator<KeyValue<NullObject, Number>> keyValues) {
-            return keyValues.hasNext() ? keyValues.next().getValue() : Double.NaN;
-
-        }
-
-        public static MaxGlobalMapReduce instance() {
-            return INSTANCE;
+        public S apply(final S mutatingSeed, final S number) {
+            return !NAN.equals(mutatingSeed) ? (S) max(mutatingSeed, number) : number;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
index a4e4272..53688cd 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
@@ -18,29 +18,20 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.NumberHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.FinalGet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.function.MeanNumberSupplier;
 
 import java.io.Serializable;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 import java.util.function.Supplier;
 
-import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.add;
 import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.div;
 import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.mul;
 
@@ -48,109 +39,36 @@ import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.mul;
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  * @author Daniel Kuppitz (http://gremlin.guru)
  */
-public final class MeanGlobalStep<S extends Number, E extends Number> extends ReducingBarrierStep<S, E> implements MapReducer {
+public final class MeanGlobalStep<S extends Number, E extends Number> extends ReducingBarrierStep<S, E> {
 
     private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(TraverserRequirement.OBJECT, TraverserRequirement.BULK);
 
     public MeanGlobalStep(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier((Supplier) MeanNumberSupplier.instance());
-        this.setBiFunction((BiFunction) MeanGlobalBiFunction.instance());
+        this.setReducingBiOperator(new MeanGlobalBiOperator<>());
     }
 
     @Override
-    public Set<TraverserRequirement> getRequirements() {
-        return REQUIREMENTS;
+    public E projectTraverser(final Traverser.Admin<S> traverser) {
+        return (E) new MeanNumber(traverser.get(), traverser.bulk());
     }
 
     @Override
-    public MapReduce<Number, Long, Number, Long, Number> getMapReduce() {
-        return MeanGlobalMapReduce.instance();
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
     }
 
     /////
 
-    private static class MeanGlobalBiFunction<S extends Number> implements BiFunction<S, Traverser<S>, S>, Serializable {
-
-        private static final MeanGlobalBiFunction INSTANCE = new MeanGlobalBiFunction();
-
-        private MeanGlobalBiFunction() {
-
-        }
-
-        @Override
-        public S apply(final S mutatingSeed, final Traverser<S> traverser) {
-            return (S) ((MeanNumber) mutatingSeed).add(traverser.get(), traverser.bulk());
-        }
-
-        public static <S extends Number> MeanGlobalBiFunction<S> instance() {
-            return INSTANCE;
-        }
-    }
-
-    ///////////
-
-    private static final class MeanGlobalMapReduce extends StaticMapReduce<Number, Long, Number, Long, Number> {
-
-        private static final MeanGlobalMapReduce INSTANCE = new MeanGlobalMapReduce();
-
-        private MeanGlobalMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final MapReduce.Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<Number, Long> emitter) {
-            vertex.<TraverserSet<Number>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> traverserSet.forEach(traverser -> emitter.emit(traverser.get(), traverser.bulk())));
-        }
-
-        @Override
-        public void combine(final Number key, final Iterator<Long> values, final ReduceEmitter<Number, Long> emitter) {
-            this.reduce(key, values, emitter);
-        }
+    private static class MeanGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
 
         @Override
-        public void reduce(final Number key, final Iterator<Long> values, final ReduceEmitter<Number, Long> emitter) {
-            long counter = 0;
-            while (values.hasNext()) {
-                counter = counter + values.next();
-            }
-            emitter.emit(key, counter);
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        @Override
-        public Number generateFinalResult(final Iterator<KeyValue<Number, Long>> keyValues) {
-            if (keyValues.hasNext()) {
-                KeyValue<Number, Long> pair = keyValues.next();
-                long counter = pair.getValue();
-                Number result = mul(pair.getKey(), counter);
-                while (keyValues.hasNext()) {
-                    long incr = pair.getValue();
-                    pair = keyValues.next();
-                    result = add(result, mul(pair.getKey(), incr));
-                    counter += incr;
-                }
-                return div(result, counter, true);
-            }
-            return Double.NaN;
-        }
-
-        public static MeanGlobalMapReduce instance() {
-            return INSTANCE;
+        public S apply(final S mutatingSeed, final S number) {
+            return (S) (number instanceof MeanNumber ? ((MeanNumber) mutatingSeed).add((MeanNumber) number) : ((MeanNumber) mutatingSeed).add(number, 1l));
         }
     }
 
-    ///
-
     public static final class MeanNumber extends Number implements Comparable<Number>, FinalGet<Number> {
 
         private long count;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
index 88dd46f..a1bc2c4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
@@ -18,122 +18,48 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 
 import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.min;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MinGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> implements MapReducer {
+public final class MinGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> {
+
+    private static final Double NAN = Double.valueOf(Double.NaN);
 
     public MinGlobalStep(final Traversal.Admin traversal) {
         super(traversal);
-        this.setSeedSupplier(new ConstantSupplier<>(null));
-        this.setBiFunction(MinGlobalBiFunction.instance());
+        this.setSeedSupplier(new ConstantSupplier<>((S) NAN));
+        this.setReducingBiOperator(new MinGlobalBiOperator<>());
     }
 
     @Override
-    public Set<TraverserRequirement> getRequirements() {
-        return Collections.singleton(TraverserRequirement.OBJECT);
+    public S projectTraverser(final Traverser.Admin<S> traverser) {
+        return traverser.get();
     }
 
     @Override
-    public MapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> getMapReduce() {
-        return MinGlobalMapReduce.instance();
+    public Set<TraverserRequirement> getRequirements() {
+        return Collections.singleton(TraverserRequirement.OBJECT);
     }
 
     /////
 
-    private static class MinGlobalBiFunction<S extends Number> implements BiFunction<S, Traverser<S>, S>, Serializable {
-
-        private static final MinGlobalBiFunction INSTANCE = new MinGlobalBiFunction();
-
-        private MinGlobalBiFunction() {
-
-        }
-
+    private static class MinGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
         @Override
-        public S apply(final S mutatingSeed, final Traverser<S> traverser) {
-            final S value = traverser.get();
-            return mutatingSeed != null ? (S) min(mutatingSeed, traverser.get()) : value;
-        }
-
-        public static <S extends Number> MinGlobalBiFunction<S> instance() {
-            return INSTANCE;
-        }
-    }
-
-    ///////////
-
-    private static class MinGlobalMapReduce extends StaticMapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> {
-
-        private static final MinGlobalMapReduce INSTANCE = new MinGlobalMapReduce();
-
-        private MinGlobalMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final MapReduce.Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<NullObject, Number> emitter) {
-            final Iterator<Number> values = IteratorUtils.map(vertex.<Set<Traverser.Admin<Number>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(Collections.emptySet()).iterator(), Traverser.Admin::get);
-            if (values.hasNext())
-                emitter.emit(getMin(values));
-        }
-
-        @Override
-        public void combine(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
-            this.reduce(key, values, emitter);
-        }
-
-        @Override
-        public void reduce(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
-            if (values.hasNext()) emitter.emit(getMin(values));
-        }
-
-        private Number getMin(final Iterator<Number> numbers) {
-            Number min = null;
-            while (numbers.hasNext()) {
-                final Number value = numbers.next();
-                min = min != null ? min(value, min) : value;
-            }
-            return min;
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        @Override
-        public Number generateFinalResult(final Iterator<KeyValue<NullObject, Number>> keyValues) {
-            return keyValues.hasNext() ? keyValues.next().getValue() : Double.NaN;
-        }
-
-        public static MinGlobalMapReduce instance() {
-            return INSTANCE;
+        public S apply(final S mutatingSeed, final S number) {
+            return !NAN.equals(mutatingSeed) ? (S) min(mutatingSeed, number) : number;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
index 8c13192..bea7717 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
@@ -18,25 +18,16 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 
 import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.add;
 import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.mul;
@@ -44,7 +35,7 @@ import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.mul;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class SumGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> implements MapReducer {
+public final class SumGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> {
 
     private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
             TraverserRequirement.BULK,
@@ -54,94 +45,27 @@ public final class SumGlobalStep<S extends Number> extends ReducingBarrierStep<S
     public SumGlobalStep(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier(new ConstantSupplier<>((S) Integer.valueOf(0)));
-        this.setBiFunction(SumGlobalBiFunction.instance());
+        this.setReducingBiOperator(new SumGlobalBiOperator<S>());
     }
 
-
     @Override
-    public Set<TraverserRequirement> getRequirements() {
-        return REQUIREMENTS;
+    public S projectTraverser(final Traverser.Admin<S> traverser) {
+        return (S) mul(traverser.get(), traverser.bulk());
     }
 
+
     @Override
-    public MapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> getMapReduce() {
-        return SumGlobalMapReduce.instance();
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
     }
 
     /////
 
-    private static class SumGlobalBiFunction<S extends Number> implements BiFunction<S, Traverser<S>, S>, Serializable {
-
-        private static final SumGlobalBiFunction INSTANCE = new SumGlobalBiFunction<>();
-
-        private SumGlobalBiFunction() {
-
-        }
+    private static class SumGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
 
         @Override
-        public S apply(final S mutatingSeed, final Traverser<S> traverser) {
-            return (S) add(mutatingSeed, mul(traverser.get(), traverser.bulk()));
-        }
-
-        public static <S extends Number> SumGlobalBiFunction<S> instance() {
-            return INSTANCE;
-        }
-    }
-
-    ///////////
-
-    private static class SumGlobalMapReduce extends StaticMapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> {
-
-        private static final SumGlobalMapReduce INSTANCE = new SumGlobalMapReduce();
-
-        private SumGlobalMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final MapReduce.Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<NullObject, Number> emitter) {
-            final Iterator<Number> values = IteratorUtils.map(vertex.<Set<Traverser.Admin<Number>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(Collections.emptySet()).iterator(),
-                    traverser -> mul(traverser.get(), traverser.bulk()));
-            if (values.hasNext())
-                emitter.emit(getSum(values));
-        }
-
-        @Override
-        public void combine(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
-            this.reduce(key, values, emitter);
-        }
-
-        @Override
-        public void reduce(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
-            if (values.hasNext())
-                emitter.emit(getSum(values));
-        }
-
-        private Number getSum(final Iterator<Number> numbers) {
-            Number sum = numbers.next();
-            while (numbers.hasNext()) {
-                sum = add(sum, numbers.next());
-            }
-            return sum;
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        @Override
-        public Number generateFinalResult(final Iterator<KeyValue<NullObject, Number>> keyValues) {
-            return keyValues.hasNext() ? keyValues.next().getValue() : 0;
-        }
-
-        public static SumGlobalMapReduce instance() {
-            return INSTANCE;
+        public S apply(final S mutatingSeed, final S number) {
+            return (S) add(mutatingSeed, number);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
index 07d8112..fa41210 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
@@ -18,45 +18,37 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.Path;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.PathProcessor;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.Tree;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalRing;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.function.TreeSupplier;
 
 import java.io.Serializable;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 import java.util.function.Supplier;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements MapReducer, TraversalParent, ByModulating, PathProcessor {
+public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements TraversalParent, ByModulating, PathProcessor {
 
     private TraversalRing<Object, Object> traversalRing = new TraversalRing<>();
 
     public TreeStep(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier((Supplier) TreeSupplier.instance());
-        this.setBiFunction(new TreeBiFunction(this));
+        this.setReducingBiOperator(new TreeBiOperator());
     }
 
 
@@ -76,16 +68,26 @@ public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements M
     }
 
     @Override
-    public MapReduce<MapReduce.NullObject, Tree, MapReduce.NullObject, Tree, Tree> getMapReduce() {
-        return TreeMapReduce.instance();
+    public Tree projectTraverser(final Traverser.Admin<S> traverser) {
+        final Tree topTree = new Tree();
+        Tree depth = topTree;
+        final Path path = traverser.path();
+        for (int i = 0; i < path.size(); i++) {
+            final Object object = TraversalUtil.apply(path.<Object>get(i), this.traversalRing.next());
+            if (!depth.containsKey(object))
+                depth.put(object, new Tree<>());
+            depth = (Tree) depth.get(object);
+        }
+        this.traversalRing.reset();
+        return topTree;
     }
 
+
     @Override
     public TreeStep<S> clone() {
         final TreeStep<S> clone = (TreeStep<S>) super.clone();
         clone.traversalRing = this.traversalRing.clone();
         clone.getLocalChildren().forEach(clone::integrateChild);
-        clone.setBiFunction(new TreeBiFunction<>(clone));
         return clone;
     }
 
@@ -107,73 +109,12 @@ public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements M
 
     ///////////
 
-    private static class TreeBiFunction<S> implements BiFunction<Tree, Traverser<S>, Tree>, Serializable {
-
-        private final TreeStep<S> treeStep;
-
-        private TreeBiFunction(final TreeStep<S> treeStep) {
-            this.treeStep = treeStep;
-        }
+    private static class TreeBiOperator implements BinaryOperator<Tree>, Serializable {
 
         @Override
-        public Tree apply(final Tree mutatingSeed, final Traverser<S> traverser) {
-            Tree depth = mutatingSeed;
-            final Path path = traverser.path();
-            for (int i = 0; i < path.size(); i++) {
-                final Object object = TraversalUtil.apply(path.<Object>get(i), this.treeStep.traversalRing.next());
-                if (!depth.containsKey(object))
-                    depth.put(object, new Tree<>());
-                depth = (Tree) depth.get(object);
-            }
-            this.treeStep.traversalRing.reset();
+        public Tree apply(final Tree mutatingSeed, final Tree tree) {
+            mutatingSeed.addTree(tree);
             return mutatingSeed;
         }
     }
-
-    ///////////
-
-    public static final class TreeMapReduce extends StaticMapReduce<MapReduce.NullObject, Tree, MapReduce.NullObject, Tree, Tree> {
-
-        private static final TreeMapReduce INSTANCE = new TreeMapReduce();
-
-        private TreeMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<NullObject, Tree> emitter) {
-            vertex.<TraverserSet<Tree>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> traverserSet.forEach(traverser -> emitter.emit(traverser.get())));
-        }
-
-        @Override
-        public void combine(final NullObject key, final Iterator<Tree> values, final ReduceEmitter<NullObject, Tree> emitter) {
-            this.reduce(key, values, emitter);
-        }
-
-        @Override
-        public void reduce(final NullObject key, final Iterator<Tree> values, final ReduceEmitter<NullObject, Tree> emitter) {
-            final Tree tree = new Tree();
-            values.forEachRemaining(tree::addTree);
-            emitter.emit(tree);
-        }
-
-        @Override
-        public Tree generateFinalResult(final Iterator<KeyValue<NullObject, Tree>> keyValues) {
-            return keyValues.hasNext() ? keyValues.next().getValue() : new Tree();
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        public static final TreeMapReduce instance() {
-            return INSTANCE;
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
index ea94499..921385c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
@@ -28,7 +28,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -37,21 +36,21 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 
 import java.util.Iterator;
 import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 import java.util.function.Supplier;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 
-public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> implements MapReducer, Barrier {
+public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> implements Barrier {
 
 
     public static final String REDUCING = Graph.Hidden.hide("reducing");
 
     protected Supplier<E> seedSupplier;
-    protected BiFunction<E, Traverser<S>, E> reducingBiFunction;
+    protected BinaryOperator<E> reducingBiOperator;
     private boolean done = false;
-
     private E seed = null;
 
     public ReducingBarrierStep(final Traversal.Admin traversal) {
@@ -62,8 +61,18 @@ public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> imple
         this.seedSupplier = seedSupplier;
     }
 
-    public void setBiFunction(final BiFunction<E, Traverser<S>, E> reducingBiFunction) {
-        this.reducingBiFunction = reducingBiFunction;
+    public Supplier<E> getSeedSupplier() {
+        return this.seedSupplier;
+    }
+
+    public abstract E projectTraverser(final Traverser.Admin<S> traverser);
+
+    public void setReducingBiOperator(final BinaryOperator<E> reducingBiOperator) {
+        this.reducingBiOperator = reducingBiOperator;
+    }
+
+    public BinaryOperator<E> getBiOperator() {
+        return this.reducingBiOperator;
     }
 
     public void reset() {
@@ -90,7 +99,7 @@ public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> imple
     public void processAllStarts() {
         if (this.seed == null) this.seed = this.seedSupplier.get();
         while (this.starts.hasNext())
-            this.seed = this.reducingBiFunction.apply(this.seed, this.starts.next());
+            this.seed = this.reducingBiOperator.apply(this.seed, this.projectTraverser(this.starts.next()));
     }
 
     @Override
@@ -112,11 +121,6 @@ public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> imple
         return clone;
     }
 
-    @Override
-    public MapReduce getMapReduce() {
-        return new DefaultMapReduce(this.seedSupplier, this.reducingBiFunction);
-    }
-
     ///////
 
     public static class DefaultMapReduce extends StaticMapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Object> {
@@ -163,7 +167,6 @@ public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> imple
         @Override
         public Object generateFinalResult(final Iterator keyValues) {
             return ((KeyValue) keyValues.next()).getValue();
-
         }
 
         @Override


Mime
View raw message