beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/8] incubator-beam git commit: Move GroupByKey expansion into DirectPipelineRunner
Date Tue, 05 Apr 2016 20:56:35 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 2b5544531 -> 6c34f3a34


Move GroupByKey expansion into DirectPipelineRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/589ef8a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/589ef8a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/589ef8a0

Branch: refs/heads/master
Commit: 589ef8a09336f3363280e47039592b7d3afbc8f8
Parents: c1de175
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Mar 24 15:26:37 2016 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu Mar 24 19:23:01 2016 -0700

----------------------------------------------------------------------
 .../FlinkBatchTransformTranslators.java         |  13 +-
 .../spark/translation/TransformTranslator.java  |  21 +-
 .../sdk/runners/DirectPipelineRunner.java       | 248 +++++++++++++++
 .../inprocess/GroupByKeyEvaluatorFactory.java   |   2 +-
 .../dataflow/sdk/transforms/GroupByKey.java     | 310 +------------------
 .../cloud/dataflow/sdk/util/GroupByKeyOnly.java |  43 +++
 .../cloud/dataflow/sdk/util/ReduceFnRunner.java |   1 -
 .../sdk/util/ReifyTimestampsAndWindows.java     |  48 +++
 .../GroupByKeyEvaluatorFactoryTest.java         |   4 +-
 9 files changed, 365 insertions(+), 325 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
index 48c783d..b09d033 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java
@@ -53,6 +53,7 @@ import com.google.cloud.dataflow.sdk.transforms.join.CoGbkResultSchema;
 import com.google.cloud.dataflow.sdk.transforms.join.CoGroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.join.KeyedPCollectionTuple;
 import com.google.cloud.dataflow.sdk.transforms.join.RawUnionValue;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollectionView;
@@ -96,7 +97,7 @@ public class FlinkBatchTransformTranslators {
   // --------------------------------------------------------------------------------------------
   //  Transform Translator Registry
   // --------------------------------------------------------------------------------------------
-  
+
   @SuppressWarnings("rawtypes")
   private static final Map<Class<? extends PTransform>, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<>();
 
@@ -112,7 +113,7 @@ public class FlinkBatchTransformTranslators {
 
     TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslatorBatch());
 
-    TRANSLATORS.put(GroupByKey.GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch());
+    TRANSLATORS.put(GroupByKeyOnly.class, new GroupByKeyOnlyTranslatorBatch());
     // TODO we're currently ignoring windows here but that has to change in the future
     TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslatorBatch());
 
@@ -302,10 +303,10 @@ public class FlinkBatchTransformTranslators {
     }
   }
 
-  private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKey.GroupByKeyOnly<K, V>> {
+  private static class GroupByKeyOnlyTranslatorBatch<K, V> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<GroupByKeyOnly<K, V>> {
 
     @Override
-    public void translateNode(GroupByKey.GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) {
+    public void translateNode(GroupByKeyOnly<K, V> transform, FlinkBatchTranslationContext context) {
       DataSet<KV<K, V>> inputDataSet = context.getInputDataSet(context.getInput(transform));
       GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> groupReduceFunction = new FlinkKeyedListAggregationFunction<>();
 
@@ -406,7 +407,7 @@ public class FlinkBatchTransformTranslators {
 //      context.setOutputDataSet(transform.getOutput(), outputDataSet);
 //    }
 //  }
-  
+
   private static class ParDoBoundTranslatorBatch<IN, OUT> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<ParDo.Bound<IN, OUT>> {
     private static final Logger LOG = LoggerFactory.getLogger(ParDoBoundTranslatorBatch.class);
 
@@ -589,6 +590,6 @@ public class FlinkBatchTransformTranslators {
   // --------------------------------------------------------------------------------------------
   //  Miscellaneous
   // --------------------------------------------------------------------------------------------
-  
+
   private FlinkBatchTransformTranslators() {}
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
index 0bd047c..adb6e68 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java
@@ -18,11 +18,6 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
-import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
-
 import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.Arrays;
@@ -30,6 +25,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputDirectory;
+import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFilePrefix;
+import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.getOutputFileTemplate;
+import static org.apache.beam.runners.spark.io.hadoop.ShardNameBuilder.replaceShardCount;
+
 import com.google.api.client.util.Maps;
 import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
 import com.google.cloud.dataflow.sdk.coders.Coder;
@@ -40,7 +40,6 @@ import com.google.cloud.dataflow.sdk.transforms.Combine;
 import com.google.cloud.dataflow.sdk.transforms.Create;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.Flatten;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.View;
@@ -49,6 +48,7 @@ import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
 import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
 import com.google.cloud.dataflow.sdk.util.AssignWindowsDoFn;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -80,6 +80,7 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFunction;
+
 import scala.Tuple2;
 
 /**
@@ -128,10 +129,10 @@ public final class TransformTranslator {
     };
   }
 
-  private static <K, V> TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>> gbk() {
-    return new TransformEvaluator<GroupByKey.GroupByKeyOnly<K, V>>() {
+  private static <K, V> TransformEvaluator<GroupByKeyOnly<K, V>> gbk() {
+    return new TransformEvaluator<GroupByKeyOnly<K, V>>() {
       @Override
-      public void evaluate(GroupByKey.GroupByKeyOnly<K, V> transform, EvaluationContext context) {
+      public void evaluate(GroupByKeyOnly<K, V> transform, EvaluationContext context) {
         @SuppressWarnings("unchecked")
         JavaRDDLike<WindowedValue<KV<K, V>>, ?> inRDD =
             (JavaRDDLike<WindowedValue<KV<K, V>>, ?>) context.getInputRDD(transform);
@@ -768,7 +769,7 @@ public final class TransformTranslator {
     EVALUATORS.put(HadoopIO.Write.Bound.class, writeHadoop());
     EVALUATORS.put(ParDo.Bound.class, parDo());
     EVALUATORS.put(ParDo.BoundMulti.class, multiDo());
-    EVALUATORS.put(GroupByKey.GroupByKeyOnly.class, gbk());
+    EVALUATORS.put(GroupByKeyOnly.class, gbk());
     EVALUATORS.put(Combine.GroupedValues.class, grouped());
     EVALUATORS.put(Combine.Globally.class, combineGlobally());
     EVALUATORS.put(Combine.PerKey.class, combinePerKey());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
index 872cfef..629be83 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/DirectPipelineRunner.java
@@ -16,6 +16,7 @@
 
 package com.google.cloud.dataflow.sdk.runners;
 
+import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
@@ -24,6 +25,9 @@ import com.google.cloud.dataflow.sdk.Pipeline.PipelineVisitor;
 import com.google.cloud.dataflow.sdk.PipelineResult;
 import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
 import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.CoderException;
+import com.google.cloud.dataflow.sdk.coders.IterableCoder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
 import com.google.cloud.dataflow.sdk.coders.ListCoder;
 import com.google.cloud.dataflow.sdk.io.AvroIO;
 import com.google.cloud.dataflow.sdk.io.FileBasedSink;
@@ -38,19 +42,26 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.Combine;
 import com.google.cloud.dataflow.sdk.transforms.Combine.KeyedCombineFn;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.Partition;
 import com.google.cloud.dataflow.sdk.transforms.Partition.PartitionFn;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.util.AppliedCombineFn;
+import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
+import com.google.cloud.dataflow.sdk.util.GroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
 import com.google.cloud.dataflow.sdk.util.MapAggregatorValues;
 import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunner;
 import com.google.cloud.dataflow.sdk.util.PerKeyCombineFnRunners;
+import com.google.cloud.dataflow.sdk.util.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.util.SerializableUtils;
+import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
 import com.google.cloud.dataflow.sdk.util.TestCredential;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.util.common.Counter;
 import com.google.cloud.dataflow.sdk.util.common.CounterSet;
 import com.google.cloud.dataflow.sdk.values.KV;
@@ -71,8 +82,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -138,6 +151,8 @@ public class DirectPipelineRunner
     }
   }
 
+  /////////////////////////////////////////////////////////////////////////////
+
   /**
    * Records that instances of the specified PTransform class
    * should be evaluated by the corresponding TransformEvaluator.
@@ -243,6 +258,8 @@ public class DirectPipelineRunner
       return (OutputT) applyTextIOWrite((TextIO.Write.Bound) transform, (PCollection<?>) input);
     } else if (transform instanceof AvroIO.Write.Bound) {
       return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, (PCollection<?>) input);
+    } else if (transform instanceof GroupByKey) {
+      return (OutputT) ((PCollection) input).apply(new DirectGroupByKey((GroupByKey) transform));
     } else {
       return super.apply(transform, input);
     }
@@ -388,6 +405,43 @@ public class DirectPipelineRunner
     }
   }
 
+  private static class DirectGroupByKey<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, Iterable<V>>>> {
+
+    private GroupByKey<K, V> originalTransform;
+
+    public DirectGroupByKey(GroupByKey<K, V> originalTransform) {
+      this.originalTransform = originalTransform;
+    }
+
+    @Override
+    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+      WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
+      return input
+          // Make each input element's timestamp and assigned windows
+          // explicit, in the value part.
+          .apply(new ReifyTimestampsAndWindows<K, V>())
+
+          // Group by just the key.
+          // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
+          // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
+          // introduced in here.
+          .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
+
+          // Sort each key's values by timestamp. GroupAlsoByWindow requires
+          // its input to be sorted by timestamp.
+          .apply(new DirectPipelineRunner.SortValuesByTimestamp<K, V>())
+
+          // Group each key's values by window, merging windows as needed.
+          .apply(new DirectPipelineRunner.GroupAlsoByWindow<K, V>(windowingStrategy))
+
+          // And update the windowing strategy as appropriate.
+          .setWindowingStrategyInternal(
+              originalTransform.updateWindowingStrategy(windowingStrategy));
+    }
+  }
+
   /**
    * Apply the override for AvroIO.Write.Bound if the user requested sharding controls
    * greater than one.
@@ -1117,6 +1171,128 @@ public class DirectPipelineRunner
 
   /////////////////////////////////////////////////////////////////////////////
 
+  /**
+   * Helper transform that sorts the values associated with each key
+   * by timestamp.
+   */
+  private static class SortValuesByTimestamp<K, V>
+      extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
+                         PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
+    @Override
+    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
+        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+      return input.apply(ParDo.of(
+          new DoFn<KV<K, Iterable<WindowedValue<V>>>,
+                   KV<K, Iterable<WindowedValue<V>>>>() {
+            @Override
+            public void processElement(ProcessContext c) {
+              KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
+              K key = kvs.getKey();
+              Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
+              List<WindowedValue<V>> sortedValues = new ArrayList<>();
+              for (WindowedValue<V> value : unsortedValues) {
+                sortedValues.add(value);
+              }
+              Collections.sort(sortedValues,
+                               new Comparator<WindowedValue<V>>() {
+                  @Override
+                  public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
+                    return e1.getTimestamp().compareTo(e2.getTimestamp());
+                  }
+                });
+              c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
+            }}))
+          .setCoder(input.getCoder());
+    }
+  }
+
+  /**
+   * Helper transform that takes a collection of timestamp-ordered
+   * values associated with each key, groups the values by window,
+   * combines windows as needed, and for each window in each key,
+   * outputs a collection of key/value-list pairs implicitly assigned
+   * to the window and with the timestamp derived from that window.
+   */
+  private static class GroupAlsoByWindow<K, V>
+      extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
+                         PCollection<KV<K, Iterable<V>>>> {
+    private final WindowingStrategy<?, ?> windowingStrategy;
+
+    public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public PCollection<KV<K, Iterable<V>>> apply(
+        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
+      @SuppressWarnings("unchecked")
+      KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
+          (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
+
+      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+      Coder<Iterable<WindowedValue<V>>> inputValueCoder =
+          inputKvCoder.getValueCoder();
+
+      IterableCoder<WindowedValue<V>> inputIterableValueCoder =
+          (IterableCoder<WindowedValue<V>>) inputValueCoder;
+      Coder<WindowedValue<V>> inputIterableElementCoder =
+          inputIterableValueCoder.getElemCoder();
+      WindowedValueCoder<V> inputIterableWindowedValueCoder =
+          (WindowedValueCoder<V>) inputIterableElementCoder;
+
+      Coder<V> inputIterableElementValueCoder =
+          inputIterableWindowedValueCoder.getValueCoder();
+      Coder<Iterable<V>> outputValueCoder =
+          IterableCoder.of(inputIterableElementValueCoder);
+      Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
+
+      return input
+          .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
+          .setCoder(outputKvCoder);
+    }
+
+    private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>
+        groupAlsoByWindowsFn(
+            WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
+      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
+          strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
+    }
+  }
+
+  /**
+   * The key by which GBK groups inputs - elements are grouped by the encoded form of the key,
+   * but the original key may be accessed as well.
+   */
+  private static class GroupingKey<K> {
+    private K key;
+    private byte[] encodedKey;
+
+    public GroupingKey(K key, byte[] encodedKey) {
+      this.key = key;
+      this.encodedKey = encodedKey;
+    }
+
+    public K getKey() {
+      return key;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o instanceof GroupingKey) {
+        GroupingKey<?> that = (GroupingKey<?>) o;
+        return Arrays.equals(this.encodedKey, that.encodedKey);
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public int hashCode() {
+      return Arrays.hashCode(encodedKey);
+    }
+  }
+
   private final DirectPipelineOptions options;
   private boolean testSerializability;
   private boolean testEncodability;
@@ -1153,4 +1329,76 @@ public class DirectPipelineRunner
   public String toString() {
     return "DirectPipelineRunner#" + hashCode();
   }
+
+  public static <K, V> void evaluateGroupByKeyOnly(
+      GroupByKeyOnly<K, V> transform,
+      EvaluationContext context) {
+    PCollection<KV<K, V>> input = context.getInput(transform);
+
+    List<ValueWithMetadata<KV<K, V>>> inputElems =
+        context.getPCollectionValuesWithMetadata(input);
+
+    Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
+
+    Map<DirectPipelineRunner.GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
+
+    for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
+      K key = elem.getValue().getKey();
+      V value = elem.getValue().getValue();
+      byte[] encodedKey;
+      try {
+        encodedKey = encodeToByteArray(keyCoder, key);
+      } catch (CoderException exn) {
+        // TODO: Put in better element printing:
+        // truncate if too long.
+        throw new IllegalArgumentException(
+            "unable to encode key " + key + " of input to " + transform +
+            " using " + keyCoder,
+            exn);
+      }
+      DirectPipelineRunner.GroupingKey<K> groupingKey =
+          new GroupingKey<>(key, encodedKey);
+      List<V> values = groupingMap.get(groupingKey);
+      if (values == null) {
+        values = new ArrayList<V>();
+        groupingMap.put(groupingKey, values);
+      }
+      values.add(value);
+    }
+
+    List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
+        new ArrayList<>();
+    for (Map.Entry<DirectPipelineRunner.GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
+      DirectPipelineRunner.GroupingKey<K> groupingKey = entry.getKey();
+      K key = groupingKey.getKey();
+      List<V> values = entry.getValue();
+      values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
+      outputElems.add(ValueWithMetadata
+                      .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values)))
+                      .withKey(key));
+    }
+
+    context.setPCollectionValuesWithMetadata(context.getOutput(transform),
+                                             outputElems);
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public
+  static <K, V> void registerGroupByKeyOnly() {
+    registerDefaultTransformEvaluator(
+        GroupByKeyOnly.class,
+        new TransformEvaluator<GroupByKeyOnly>() {
+          @Override
+          public void evaluate(
+              GroupByKeyOnly transform,
+              EvaluationContext context) {
+            evaluateGroupByKeyOnly(transform, context);
+          }
+        });
+  }
+
+  static {
+    DirectPipelineRunner.registerGroupByKeyOnly();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
index 3ec4af1..b59ec56 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactory.java
@@ -27,7 +27,6 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.StepTransformResult.Build
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
 import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
@@ -35,6 +34,7 @@ import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowViaWindowSetDoFn;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItemCoder;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
+import com.google.cloud.dataflow.sdk.util.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
index 8fde3e0..490269b 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
@@ -16,40 +16,20 @@
 
 package com.google.cloud.dataflow.sdk.transforms;
 
-import static com.google.cloud.dataflow.sdk.util.CoderUtils.encodeToByteArray;
-
 import com.google.cloud.dataflow.sdk.coders.Coder;
 import com.google.cloud.dataflow.sdk.coders.Coder.NonDeterministicException;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
 import com.google.cloud.dataflow.sdk.coders.IterableCoder;
 import com.google.cloud.dataflow.sdk.coders.KvCoder;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
-import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.ValueWithMetadata;
-import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.DefaultTrigger;
 import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
 import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
-import com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn;
-import com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn;
-import com.google.cloud.dataflow.sdk.util.SystemReduceFn;
-import com.google.cloud.dataflow.sdk.util.WindowedValue;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
-import com.google.cloud.dataflow.sdk.util.WindowedValue.WindowedValueCoder;
 import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 /**
  * {@code GroupByKey<K, V>} takes a {@code PCollection<KV<K, V>>},
  * groups the values by key and windows, and returns a
@@ -234,34 +214,12 @@ public class GroupByKey<K, V>
 
   @Override
   public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-    // This operation groups by the combination of key and window,
+    // This primitive operation groups by the combination of key and window,
     // merging windows as needed, using the windows assigned to the
     // key/value input elements and the window merge operation of the
     // window function associated with the input PCollection.
-    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-
-    // By default, implement GroupByKey[AndWindow] via a series of lower-level
-    // operations.
-    return input
-        // Make each input element's timestamp and assigned windows
-        // explicit, in the value part.
-        .apply(new ReifyTimestampsAndWindows<K, V>())
-
-        // Group by just the key.
-        // Combiner lifting will not happen regardless of the disallowCombinerLifting value.
-        // There will be no combiners right after the GroupByKeyOnly because of the two ParDos
-        // introduced in here.
-        .apply(new GroupByKeyOnly<K, WindowedValue<V>>())
-
-        // Sort each key's values by timestamp. GroupAlsoByWindow requires
-        // its input to be sorted by timestamp.
-        .apply(new SortValuesByTimestamp<K, V>())
-
-        // Group each key's values by window, merging windows as needed.
-        .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
-
-        // And update the windowing strategy as appropriate.
-        .setWindowingStrategyInternal(updateWindowingStrategy(windowingStrategy));
+    return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+        updateWindowingStrategy(input.getWindowingStrategy()), input.isBounded());
   }
 
   @Override
@@ -289,7 +247,7 @@ public class GroupByKey<K, V>
    * transform, which is also used as the {@code Coder} of the keys of
    * the output of this transform.
    */
-  static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {
+  public static <K, V> Coder<K> getKeyCoder(Coder<KV<K, V>> inputCoder) {
     return getInputKvCoder(inputCoder).getKeyCoder();
   }
 
@@ -311,265 +269,7 @@ public class GroupByKey<K, V>
   /**
    * Returns the {@code Coder} of the output of this transform.
    */
-  static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) {
+  public static <K, V> KvCoder<K, Iterable<V>> getOutputKvCoder(Coder<KV<K, V>> inputCoder) {
     return KvCoder.of(getKeyCoder(inputCoder), getOutputValueCoder(inputCoder));
   }
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Helper transform that makes timestamps and window assignments
-   * explicit in the value part of each key/value pair.
-   */
-  public static class ReifyTimestampsAndWindows<K, V>
-      extends PTransform<PCollection<KV<K, V>>,
-                         PCollection<KV<K, WindowedValue<V>>>> {
-    @Override
-    public PCollection<KV<K, WindowedValue<V>>> apply(
-        PCollection<KV<K, V>> input) {
-      @SuppressWarnings("unchecked")
-      KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
-      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-      Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
-      Coder<WindowedValue<V>> outputValueCoder = FullWindowedValueCoder.of(
-          inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
-      Coder<KV<K, WindowedValue<V>>> outputKvCoder =
-          KvCoder.of(keyCoder, outputValueCoder);
-      return input.apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
-          .setCoder(outputKvCoder);
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Helper transform that sorts the values associated with each key
-   * by timestamp.
-   */
-  public static class SortValuesByTimestamp<K, V>
-      extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
-                         PCollection<KV<K, Iterable<WindowedValue<V>>>>> {
-    @Override
-    public PCollection<KV<K, Iterable<WindowedValue<V>>>> apply(
-        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
-      return input.apply(ParDo.of(
-          new DoFn<KV<K, Iterable<WindowedValue<V>>>,
-                   KV<K, Iterable<WindowedValue<V>>>>() {
-            @Override
-            public void processElement(ProcessContext c) {
-              KV<K, Iterable<WindowedValue<V>>> kvs = c.element();
-              K key = kvs.getKey();
-              Iterable<WindowedValue<V>> unsortedValues = kvs.getValue();
-              List<WindowedValue<V>> sortedValues = new ArrayList<>();
-              for (WindowedValue<V> value : unsortedValues) {
-                sortedValues.add(value);
-              }
-              Collections.sort(sortedValues,
-                               new Comparator<WindowedValue<V>>() {
-                  @Override
-                  public int compare(WindowedValue<V> e1, WindowedValue<V> e2) {
-                    return e1.getTimestamp().compareTo(e2.getTimestamp());
-                  }
-                });
-              c.output(KV.<K, Iterable<WindowedValue<V>>>of(key, sortedValues));
-            }}))
-          .setCoder(input.getCoder());
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Helper transform that takes a collection of timestamp-ordered
-   * values associated with each key, groups the values by window,
-   * combines windows as needed, and for each window in each key,
-   * outputs a collection of key/value-list pairs implicitly assigned
-   * to the window and with the timestamp derived from that window.
-   */
-  public static class GroupAlsoByWindow<K, V>
-      extends PTransform<PCollection<KV<K, Iterable<WindowedValue<V>>>>,
-                         PCollection<KV<K, Iterable<V>>>> {
-    private final WindowingStrategy<?, ?> windowingStrategy;
-
-    public GroupAlsoByWindow(WindowingStrategy<?, ?> windowingStrategy) {
-      this.windowingStrategy = windowingStrategy;
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public PCollection<KV<K, Iterable<V>>> apply(
-        PCollection<KV<K, Iterable<WindowedValue<V>>>> input) {
-      @SuppressWarnings("unchecked")
-      KvCoder<K, Iterable<WindowedValue<V>>> inputKvCoder =
-          (KvCoder<K, Iterable<WindowedValue<V>>>) input.getCoder();
-
-      Coder<K> keyCoder = inputKvCoder.getKeyCoder();
-      Coder<Iterable<WindowedValue<V>>> inputValueCoder =
-          inputKvCoder.getValueCoder();
-
-      IterableCoder<WindowedValue<V>> inputIterableValueCoder =
-          (IterableCoder<WindowedValue<V>>) inputValueCoder;
-      Coder<WindowedValue<V>> inputIterableElementCoder =
-          inputIterableValueCoder.getElemCoder();
-      WindowedValueCoder<V> inputIterableWindowedValueCoder =
-          (WindowedValueCoder<V>) inputIterableElementCoder;
-
-      Coder<V> inputIterableElementValueCoder =
-          inputIterableWindowedValueCoder.getValueCoder();
-      Coder<Iterable<V>> outputValueCoder =
-          IterableCoder.of(inputIterableElementValueCoder);
-      Coder<KV<K, Iterable<V>>> outputKvCoder = KvCoder.of(keyCoder, outputValueCoder);
-
-      return input
-          .apply(ParDo.of(groupAlsoByWindowsFn(windowingStrategy, inputIterableElementValueCoder)))
-          .setCoder(outputKvCoder);
-    }
-
-    private <W extends BoundedWindow> GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>
-        groupAlsoByWindowsFn(
-            WindowingStrategy<?, W> strategy, Coder<V> inputIterableElementValueCoder) {
-      return new GroupAlsoByWindowsViaOutputBufferDoFn<K, V, Iterable<V>, W>(
-          strategy, SystemReduceFn.<K, V, W>buffering(inputIterableElementValueCoder));
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Primitive helper transform that groups by key only, ignoring any
-   * window assignments.
-   */
-  public static class GroupByKeyOnly<K, V>
-      extends PTransform<PCollection<KV<K, V>>,
-                         PCollection<KV<K, Iterable<V>>>> {
-
-    @SuppressWarnings({"rawtypes", "unchecked"})
-    @Override
-    public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-      return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
-          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
-    }
-
-    /**
-     * Returns the {@code Coder} of the input to this transform, which
-     * should be a {@code KvCoder}.
-     */
-    @SuppressWarnings("unchecked")
-    KvCoder<K, V> getInputKvCoder(Coder<KV<K, V>> inputCoder) {
-      if (!(inputCoder instanceof KvCoder)) {
-        throw new IllegalStateException(
-            "GroupByKey requires its input to use KvCoder");
-      }
-      return (KvCoder<K, V>) inputCoder;
-    }
-
-    @Override
-    protected Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
-      return GroupByKey.getOutputKvCoder(input.getCoder());
-    }
-  }
-
-
-  /////////////////////////////////////////////////////////////////////////////
-
-  static {
-    registerWithDirectPipelineRunner();
-  }
-
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  private static <K, V> void registerWithDirectPipelineRunner() {
-    DirectPipelineRunner.registerDefaultTransformEvaluator(
-        GroupByKeyOnly.class,
-        new DirectPipelineRunner.TransformEvaluator<GroupByKeyOnly>() {
-          @Override
-          public void evaluate(
-              GroupByKeyOnly transform,
-              DirectPipelineRunner.EvaluationContext context) {
-            evaluateHelper(transform, context);
-          }
-        });
-  }
-
-  private static <K, V> void evaluateHelper(
-      GroupByKeyOnly<K, V> transform,
-      DirectPipelineRunner.EvaluationContext context) {
-    PCollection<KV<K, V>> input = context.getInput(transform);
-
-    List<ValueWithMetadata<KV<K, V>>> inputElems =
-        context.getPCollectionValuesWithMetadata(input);
-
-    Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
-
-    Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
-
-    for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
-      K key = elem.getValue().getKey();
-      V value = elem.getValue().getValue();
-      byte[] encodedKey;
-      try {
-        encodedKey = encodeToByteArray(keyCoder, key);
-      } catch (CoderException exn) {
-        // TODO: Put in better element printing:
-        // truncate if too long.
-        throw new IllegalArgumentException(
-            "unable to encode key " + key + " of input to " + transform +
-            " using " + keyCoder,
-            exn);
-      }
-      GroupingKey<K> groupingKey = new GroupingKey<>(key, encodedKey);
-      List<V> values = groupingMap.get(groupingKey);
-      if (values == null) {
-        values = new ArrayList<V>();
-        groupingMap.put(groupingKey, values);
-      }
-      values.add(value);
-    }
-
-    List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
-        new ArrayList<>();
-    for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
-      GroupingKey<K> groupingKey = entry.getKey();
-      K key = groupingKey.getKey();
-      List<V> values = entry.getValue();
-      values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
-      outputElems.add(ValueWithMetadata
-                      .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values)))
-                      .withKey(key));
-    }
-
-    context.setPCollectionValuesWithMetadata(context.getOutput(transform),
-                                             outputElems);
-  }
-
-  private static class GroupingKey<K> {
-    private K key;
-    private byte[] encodedKey;
-
-    public GroupingKey(K key, byte[] encodedKey) {
-      this.key = key;
-      this.encodedKey = encodedKey;
-    }
-
-    public K getKey() {
-      return key;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (o instanceof GroupingKey) {
-        GroupingKey<?> that = (GroupingKey<?>) o;
-        return Arrays.equals(this.encodedKey, that.encodedKey);
-      } else {
-        return false;
-      }
-    }
-
-    @Override
-    public int hashCode() {
-      return Arrays.hashCode(encodedKey);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java
new file mode 100644
index 0000000..8db87d2
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/GroupByKeyOnly.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * Runner-specific primitive that groups by key only, ignoring any window assignments.
+ */
+public class GroupByKeyOnly<K, V>
+    extends PTransform<PCollection<KV<K, V>>,
+                       PCollection<KV<K, Iterable<V>>>> {
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  @Override
+  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
+        input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+  }
+
+  @Override
+  public Coder<KV<K, Iterable<V>>> getDefaultOutputCoder(PCollection<KV<K, V>> input) {
+    return GroupByKey.getOutputKvCoder(input.getCoder());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
index 2e2d1f6..8a4d7ac 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java
@@ -18,7 +18,6 @@ package com.google.cloud.dataflow.sdk.util;
 import com.google.cloud.dataflow.sdk.options.PipelineOptions;
 import com.google.cloud.dataflow.sdk.transforms.Aggregator;
 import com.google.cloud.dataflow.sdk.transforms.DoFn;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey.GroupByKeyOnly;
 import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
 import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
 import com.google.cloud.dataflow.sdk.transforms.windowing.OutputTimeFn;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java
new file mode 100644
index 0000000..1a6cf9a
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReifyTimestampsAndWindows.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (C) 2015 Google Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package com.google.cloud.dataflow.sdk.util;
+
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.coders.KvCoder;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.util.WindowedValue.FullWindowedValueCoder;
+import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+/**
+ * Helper transform that makes timestamps and window assignments
+ * explicit in the value part of each key/value pair.
+ */
+public class ReifyTimestampsAndWindows<K, V>
+    extends PTransform<PCollection<KV<K, V>>,
+                       PCollection<KV<K, WindowedValue<V>>>> {
+  @Override
+  public PCollection<KV<K, WindowedValue<V>>> apply(
+      PCollection<KV<K, V>> input) {
+    @SuppressWarnings("unchecked")
+    KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) input.getCoder();
+    Coder<K> keyCoder = inputKvCoder.getKeyCoder();
+    Coder<V> inputValueCoder = inputKvCoder.getValueCoder();
+    Coder<WindowedValue<V>> outputValueCoder = FullWindowedValueCoder.of(
+        inputValueCoder, input.getWindowingStrategy().getWindowFn().windowCoder());
+    Coder<KV<K, WindowedValue<V>>> outputKvCoder =
+        KvCoder.of(keyCoder, outputValueCoder);
+    return input.apply(ParDo.of(new ReifyTimestampAndWindowsDoFn<K, V>()))
+        .setCoder(outputKvCoder);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/589ef8a0/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
index 4ced82f..a683b31 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/GroupByKeyEvaluatorFactoryTest.java
@@ -26,9 +26,9 @@ import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.C
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
 import com.google.cloud.dataflow.sdk.transforms.Create;
-import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItem;
 import com.google.cloud.dataflow.sdk.util.KeyedWorkItems;
+import com.google.cloud.dataflow.sdk.util.ReifyTimestampsAndWindows;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
 import com.google.cloud.dataflow.sdk.values.KV;
 import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -60,7 +60,7 @@ public class GroupByKeyEvaluatorFactoryTest {
     PCollection<KV<String, Integer>> values =
         p.apply(Create.of(firstFoo, firstBar, secondFoo, firstBaz, secondBar, thirdFoo));
     PCollection<KV<String, WindowedValue<Integer>>> kvs =
-        values.apply(new GroupByKey.ReifyTimestampsAndWindows<String, Integer>());
+        values.apply(new ReifyTimestampsAndWindows<String, Integer>());
     PCollection<KeyedWorkItem<String, Integer>> groupedKvs =
         kvs.apply(new GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly<String, Integer>());
 



Mime
View raw message