nemo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From johnya...@apache.org
Subject [incubator-nemo] branch master updated: [NEMO-129] Support Beam's WindowedWordCount example (#123)
Date Fri, 19 Oct 2018 02:13:33 GMT
This is an automated email from the ASF dual-hosted git repository.

johnyangk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git


The following commit(s) were added to refs/heads/master by this push:
     new c4170bf  [NEMO-129] Support Beam's WindowedWordCount example (#123)
c4170bf is described below

commit c4170bf59217e29ee64478350ba42fc95b1ee2d1
Author: Taegeon Um <taegeonum@gmail.com>
AuthorDate: Fri Oct 19 11:13:29 2018 +0900

    [NEMO-129] Support Beam's WindowedWordCount example (#123)
    
    JIRA: [NEMO-129: Support Beam's WindowedWordCount example](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-129)
    
    **Major changes:**
    - Create `GroupByKeyAndWindowDoFnTransform` to group elements according to the key and
window. Use `GroupAlsoByWindowViaWindowSetNewDoFn` that is a wrapper class for group by key
and window in beam. If there is no window, use existing `GroupByKeyTransform` for performance.
    
    **Minor changes to note:**
    - refactor `DoFnTransform` and create `AbstractDoFnTransform` to reuse codes in both `DoFnTransform`
and `GroupByKeyAndWindowDoFnTransform`
    
    **Tests for the changes:**
    - test windowed word count (fixed window and sliding window)
    
    **Other comments:**
    -
    
    Closes #123
---
 .../compiler/frontend/beam/PipelineTranslator.java |  42 ++++-
 ...FnTransform.java => AbstractDoFnTransform.java} | 111 ++++++++----
 .../frontend/beam/transform/DoFnTransform.java     |  98 +----------
 .../GroupByKeyAndWindowDoFnTransform.java          | 196 +++++++++++++++++++++
 .../beam/transform/GroupByKeyTransform.java        |   5 +-
 .../frontend/beam/transform/DoFnTransformTest.java |   7 -
 .../nemo/examples/beam/GenericSourceSink.java      |   7 +-
 .../nemo/examples/beam/WindowedWordCount.java      |  91 ++++++++++
 .../nemo/examples/beam/WriteOneFilePerWindow.java  | 102 +++++++++++
 .../examples/beam/WindowedWordCountITCase.java     |  87 +++++++++
 .../expected_output_sliding_windowed_wordcount     |  18 ++
 .../resources/expected_output_windowed_wordcount   |  11 ++
 examples/resources/test_input_windowed_wordcount   |  15 ++
 examples/resources/test_input_wordcount            |   2 +-
 14 files changed, 639 insertions(+), 153 deletions(-)

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 2486a00..a4e5d1b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -16,6 +16,7 @@
 package org.apache.nemo.compiler.frontend.beam;
 
 import com.google.common.collect.Iterables;
+import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.sdk.Pipeline;
@@ -48,10 +49,7 @@ import java.io.IOException;
 import java.lang.annotation.*;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
@@ -198,11 +196,40 @@ public final class PipelineTranslator
         pValueWithTupleTag.getKey()));
   }
 
+  /**
+   * Create a group by key transform.
+   * It returns GroupByKeyAndWindowDoFnTransform if window function is not default.
+   * @param ctx translation context
+   * @param transformVertex transform vertex
+   * @return group by key transform
+   */
+  private static Transform createGBKTransform(
+    final TranslationContext ctx,
+    final TransformVertex transformVertex) {
+    final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+    final PCollection<?> mainInput = (PCollection<?>)
+      Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
+    final TupleTag mainOutputTag = new TupleTag<>();
+
+    if (mainInput.getWindowingStrategy() == WindowingStrategy.globalDefault()) {
+      return new GroupByKeyTransform();
+    } else {
+      return new GroupByKeyAndWindowDoFnTransform(
+        getOutputCoders(pTransform),
+        mainOutputTag,
+        Collections.emptyList(),  /*  GBK does not have additional outputs */
+        mainInput.getWindowingStrategy(),
+        Collections.emptyList(), /*  GBK does not have additional side inputs */
+        ctx.pipelineOptions,
+        SystemReduceFn.buffering(mainInput.getCoder()));
+    }
+  }
+
   @PrimitiveTransformTranslator(GroupByKey.class)
   private static void groupByKeyTranslator(final TranslationContext ctx,
                                            final PrimitiveTransformVertex transformVertex,
                                            final GroupByKey<?, ?> transform) {
-    final IRVertex vertex = new OperatorVertex(new GroupByKeyTransform());
+    final IRVertex vertex = new OperatorVertex(createGBKTransform(ctx, transformVertex));
     ctx.addVertex(vertex);
     transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex,
input));
     transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex,
output));
@@ -297,7 +324,7 @@ public final class PipelineTranslator
       // Attempt to translate the CompositeTransform again.
       // Add GroupByKey, which is the first transform in the given CompositeTransform.
       // Make sure it consumes the output from the last vertex in OneToOneEdge-translated
hierarchy.
-      final IRVertex groupByKeyIRVertex = new OperatorVertex(new GroupByKeyTransform());
+      final IRVertex groupByKeyIRVertex = new OperatorVertex(createGBKTransform(ctx, transformVertex));
       ctx.addVertex(groupByKeyIRVertex);
       last.getNode().getOutputs().values().forEach(outputFromCombiner
           -> ctx.addEdgeTo(groupByKeyIRVertex, outputFromCombiner));
@@ -617,7 +644,8 @@ public final class PipelineTranslator
       if (srcTransform instanceof FlattenTransform) {
         return CommunicationPatternProperty.Value.OneToOne;
       }
-      if (dstTransform instanceof GroupByKeyTransform) {
+      if (dstTransform instanceof GroupByKeyAndWindowDoFnTransform
+        || dstTransform instanceof GroupByKeyTransform) {
         return CommunicationPatternProperty.Value.Shuffle;
       }
       if (dstTransform instanceof CreateViewTransform) {
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
similarity index 60%
copy from compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
copy to compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
index 8dbf051..8679c73 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -35,41 +35,49 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * DoFn transform implementation.
+ * This is a base class for Beam DoFn Transforms.
  *
  * @param <InputT> input type.
+ * @param <InterT> intermediate type.
  * @param <OutputT> output type.
  */
-public final class DoFnTransform<InputT, OutputT> implements
+public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
   Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
 
-  private OutputCollector<WindowedValue<OutputT>> outputCollector;
   private final TupleTag<OutputT> mainOutputTag;
   private final List<TupleTag<?>> additionalOutputTags;
   private final Collection<PCollectionView<?>> sideInputs;
   private final WindowingStrategy<?, ?> windowingStrategy;
-  private final DoFn<InputT, OutputT> doFn;
+  private final DoFn<InterT, OutputT> doFn;
   private final SerializablePipelineOptions serializedOptions;
-  private transient DoFnRunner<InputT, OutputT> doFnRunner;
-  private transient SideInputReader sideInputReader;
-  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
   private final Coder<InputT> inputCoder;
   private final Map<TupleTag<?>, Coder<?>> outputCoders;
 
+  private transient OutputCollector<WindowedValue<OutputT>> outputCollector;
+  private transient DoFnRunner<InterT, OutputT> doFnRunner;
+  private transient SideInputReader sideInputReader;
+  private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
+  private transient DoFnRunners.OutputManager outputManager;
+
   /**
-   * DoFnTransform Constructor.
-   *
-   * @param doFn    doFn.
-   * @param options Pipeline options.
+   * AbstractDoFnTransform constructor.
+   * @param doFn doFn
+   * @param inputCoder input coder
+   * @param outputCoders output coders
+   * @param mainOutputTag main output tag
+   * @param additionalOutputTags additional output tags
+   * @param windowingStrategy windowing strategy
+   * @param sideInputs side inputs
+   * @param options pipeline options
    */
-  public DoFnTransform(final DoFn<InputT, OutputT> doFn,
-                       final Coder<InputT> inputCoder,
-                       final Map<TupleTag<?>, Coder<?>> outputCoders,
-                       final TupleTag<OutputT> mainOutputTag,
-                       final List<TupleTag<?>> additionalOutputTags,
-                       final WindowingStrategy<?, ?> windowingStrategy,
-                       final Collection<PCollectionView<?>> sideInputs,
-                       final PipelineOptions options) {
+  public AbstractDoFnTransform(final DoFn<InterT, OutputT> doFn,
+                               final Coder<InputT> inputCoder,
+                               final Map<TupleTag<?>, Coder<?>> outputCoders,
+                               final TupleTag<OutputT> mainOutputTag,
+                               final List<TupleTag<?>> additionalOutputTags,
+                               final WindowingStrategy<?, ?> windowingStrategy,
+                               final Collection<PCollectionView<?>> sideInputs,
+                               final PipelineOptions options) {
     this.doFn = doFn;
     this.inputCoder = inputCoder;
     this.outputCoders = outputCoders;
@@ -80,15 +88,38 @@ public final class DoFnTransform<InputT, OutputT> implements
     this.windowingStrategy = windowingStrategy;
   }
 
+  protected final DoFnRunners.OutputManager getOutputManager() {
+    return outputManager;
+  }
+
+  protected final WindowingStrategy getWindowingStrategy() {
+    return windowingStrategy;
+  }
+
+  protected final SideInputReader getSideInputReader() {
+    return sideInputReader;
+  }
+
+  protected final TupleTag<OutputT> getMainOutputTag() {
+    return mainOutputTag;
+  }
+
+  protected final DoFnRunner<InterT, OutputT> getDoFnRunner() {
+    return doFnRunner;
+  }
+
+  public final DoFn getDoFn() {
+    return doFn;
+  }
+
   @Override
-  public void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>>
oc) {
+  public final void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>>
oc) {
     // deserialize pipeline option
     final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
-
     this.outputCollector = oc;
 
     // create output manager
-    final DoFnRunners.OutputManager outputManager = new DefaultOutputManager<>(
+    outputManager = new DefaultOutputManager<>(
       outputCollector, context, mainOutputTag);
 
     // create side input reader
@@ -112,15 +143,17 @@ public final class DoFnTransform<InputT, OutputT> implements
       }
     };
 
+    final DoFn wrappedDoFn = wrapDoFn(doFn);
+
     // invoker
-    doFnInvoker = DoFnInvokers.invokerFor(doFn);
+    doFnInvoker = DoFnInvokers.invokerFor(wrappedDoFn);
     doFnInvoker.invokeSetup();
 
     // DoFnRunners.simpleRunner takes care of all the hard stuff of running the DoFn
     // and that this approach is the standard used by most of the Beam runners
     doFnRunner = DoFnRunners.simpleRunner(
       options,
-      doFn,
+      wrappedDoFn,
       sideInputReader,
       outputManager,
       mainOutputTag,
@@ -134,24 +167,24 @@ public final class DoFnTransform<InputT, OutputT> implements
   }
 
   @Override
-  public void onData(final WindowedValue<InputT> data) {
-    doFnRunner.processElement(data);
-  }
-
-  public DoFn getDoFn() {
-    return doFn;
-  }
-
-  @Override
-  public void close() {
+  public final void close() {
+    beforeClose();
     doFnRunner.finishBundle();
     doFnInvoker.invokeTeardown();
   }
 
+  /**
+   * An abstract function that wraps the original doFn.
+   * @param originalDoFn the original doFn.
+   * @return wrapped doFn.
+   */
+  abstract DoFn wrapDoFn(final DoFn originalDoFn);
+
   @Override
-  public String toString() {
-    final StringBuilder sb = new StringBuilder();
-    sb.append("DoTransform:" + doFn);
-    return sb.toString();
-  }
+  public abstract void onData(final WindowedValue<InputT> data);
+
+  /**
+   * An abstract function that is called before close.
+   */
+  abstract void beforeClose();
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 8dbf051..76cd84b 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -15,20 +15,13 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
-import org.apache.beam.runners.core.*;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 
 import java.util.Collection;
 import java.util.List;
@@ -40,21 +33,7 @@ import java.util.Map;
  * @param <InputT> input type.
  * @param <OutputT> output type.
  */
-public final class DoFnTransform<InputT, OutputT> implements
-  Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
-
-  private OutputCollector<WindowedValue<OutputT>> outputCollector;
-  private final TupleTag<OutputT> mainOutputTag;
-  private final List<TupleTag<?>> additionalOutputTags;
-  private final Collection<PCollectionView<?>> sideInputs;
-  private final WindowingStrategy<?, ?> windowingStrategy;
-  private final DoFn<InputT, OutputT> doFn;
-  private final SerializablePipelineOptions serializedOptions;
-  private transient DoFnRunner<InputT, OutputT> doFnRunner;
-  private transient SideInputReader sideInputReader;
-  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
-  private final Coder<InputT> inputCoder;
-  private final Map<TupleTag<?>, Coder<?>> outputCoders;
+public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<InputT,
InputT, OutputT> {
 
   /**
    * DoFnTransform Constructor.
@@ -70,88 +49,29 @@ public final class DoFnTransform<InputT, OutputT> implements
                        final WindowingStrategy<?, ?> windowingStrategy,
                        final Collection<PCollectionView<?>> sideInputs,
                        final PipelineOptions options) {
-    this.doFn = doFn;
-    this.inputCoder = inputCoder;
-    this.outputCoders = outputCoders;
-    this.mainOutputTag = mainOutputTag;
-    this.additionalOutputTags = additionalOutputTags;
-    this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializablePipelineOptions(options);
-    this.windowingStrategy = windowingStrategy;
+    super(doFn, inputCoder, outputCoders, mainOutputTag,
+      additionalOutputTags, windowingStrategy, sideInputs, options);
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>>
oc) {
-    // deserialize pipeline option
-    final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
-
-    this.outputCollector = oc;
-
-    // create output manager
-    final DoFnRunners.OutputManager outputManager = new DefaultOutputManager<>(
-      outputCollector, context, mainOutputTag);
-
-    // create side input reader
-    if (!sideInputs.isEmpty()) {
-      sideInputReader = new BroadcastVariableSideInputReader(context, sideInputs);
-    } else {
-      sideInputReader = NullSideInputReader.of(sideInputs);
-    }
-
-    // create step context
-    // this transform does not support state and timer.
-    final StepContext stepContext = new StepContext() {
-      @Override
-      public StateInternals stateInternals() {
-        throw new UnsupportedOperationException("Not support stateInternals in DoFnTransform");
-      }
-
-      @Override
-      public TimerInternals timerInternals() {
-        throw new UnsupportedOperationException("Not support timerInternals in DoFnTransform");
-      }
-    };
-
-    // invoker
-    doFnInvoker = DoFnInvokers.invokerFor(doFn);
-    doFnInvoker.invokeSetup();
-
-    // DoFnRunners.simpleRunner takes care of all the hard stuff of running the DoFn
-    // and that this approach is the standard used by most of the Beam runners
-    doFnRunner = DoFnRunners.simpleRunner(
-      options,
-      doFn,
-      sideInputReader,
-      outputManager,
-      mainOutputTag,
-      additionalOutputTags,
-      stepContext,
-      inputCoder,
-      outputCoders,
-      windowingStrategy);
-
-    doFnRunner.startBundle();
+  protected DoFn wrapDoFn(final DoFn initDoFn) {
+    return initDoFn;
   }
 
   @Override
   public void onData(final WindowedValue<InputT> data) {
-    doFnRunner.processElement(data);
-  }
-
-  public DoFn getDoFn() {
-    return doFn;
+    getDoFnRunner().processElement(data);
   }
 
   @Override
-  public void close() {
-    doFnRunner.finishBundle();
-    doFnInvoker.invokeTeardown();
+  protected void beforeClose() {
+    // nothing
   }
 
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("DoTransform:" + doFn);
+    sb.append("DoTransform:" + getDoFn());
     return sb.toString();
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
new file mode 100644
index 0000000..4827e69
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Groups elements according to key and window.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ */
+public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
+    extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>,
KV<K, Iterable<InputT>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransform.class.getName());
+
+  private final SystemReduceFn reduceFn;
+  private final Map<K, List<WindowedValue<InputT>>> keyToValues;
+  private transient InMemoryTimerInternalsFactory timerInternalsFactory;
+
+  /**
+   * GroupByKey constructor.
+   */
+  public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>>
outputCoders,
+                                          final TupleTag<KV<K, Iterable<InputT>>>
mainOutputTag,
+                                          final List<TupleTag<?>> additionalOutputTags,
+                                          final WindowingStrategy<?, ?> windowingStrategy,
+                                          final Collection<PCollectionView<?>>
sideInputs,
+                                          final PipelineOptions options,
+                                          final SystemReduceFn reduceFn) {
+    super(null, /* doFn */
+      null, /* inputCoder */
+      outputCoders,
+      mainOutputTag,
+      additionalOutputTags,
+      windowingStrategy,
+      sideInputs,
+      options);
+    this.keyToValues = new HashMap<>();
+    this.reduceFn = reduceFn;
+  }
+
+  /**
+   * This creates a new DoFn that groups elements by key and window.
+   * @param doFn original doFn.
+   * @return GroupAlsoByWindowViaWindowSetNewDoFn
+   */
+  @Override
+  protected DoFn wrapDoFn(final DoFn doFn) {
+    timerInternalsFactory = new InMemoryTimerInternalsFactory();
+    // This function performs group by key and window operation
+    return
+      GroupAlsoByWindowViaWindowSetNewDoFn.create(
+        getWindowingStrategy(),
+        new InMemoryStateInternalsFactory(),
+        timerInternalsFactory,
+        getSideInputReader(),
+        reduceFn,
+        getOutputManager(),
+        getMainOutputTag());
+  }
+
+  @Override
+  public void onData(final WindowedValue<KV<K, InputT>> element) {
+    final KV<K, InputT> kv = element.getValue();
+    keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
+    keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
+  }
+
+  /**
+   * This advances the input watermark and processing time to the timestamp max value
+   * in order to emit all data.
+   */
+  @Override
+  protected void beforeClose() {
+    final InMemoryTimerInternals timerInternals = timerInternalsFactory.timerInternals;
+    try {
+      // Finish any pending windows by advancing the input watermark to infinity.
+      timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+      // Finally, advance the processing time to infinity to fire any timers.
+      timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+      timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    if (keyToValues.isEmpty()) {
+      LOG.warn("Beam GroupByKeyAndWindowDoFnTransform received no data!");
+    } else {
+      // timer
+      final Iterable<TimerInternals.TimerData> timerData = getTimers(timerInternals);
+
+      keyToValues.entrySet().stream().forEach(entry -> {
+        // The GroupAlsoByWindowViaWindowSetNewDoFn requires KeyedWorkItem,
+        // so we convert the KV to KeyedWorkItem
+        final KeyedWorkItem<K, InputT> keyedWorkItem =
+          KeyedWorkItems.workItem(entry.getKey(), timerData, entry.getValue());
+        getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+      });
+      keyToValues.clear();
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("GroupByKeyAndWindowDoFnTransform:");
+    return sb.toString();
+  }
+
+  private Iterable<TimerInternals.TimerData> getTimers(final InMemoryTimerInternals
timerInternals) {
+    final List<TimerInternals.TimerData> timerData = new LinkedList<>();
+
+    while (true) {
+      TimerInternals.TimerData timer;
+      boolean hasFired = false;
+
+      while ((timer = timerInternals.removeNextEventTimer()) != null) {
+        hasFired = true;
+        timerData.add(timer);
+      }
+      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+        hasFired = true;
+        timerData.add(timer);
+      }
+      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+        hasFired = true;
+        timerData.add(timer);
+      }
+      if (!hasFired) {
+        break;
+      }
+    }
+
+    return timerData;
+  }
+
+  /**
+   * InMemoryStateInternalsFactory.
+   */
+  final class InMemoryStateInternalsFactory implements StateInternalsFactory<K> {
+    private final InMemoryStateInternals inMemoryStateInternals;
+
+    InMemoryStateInternalsFactory() {
+      this.inMemoryStateInternals = InMemoryStateInternals.forKey(null);
+    }
+
+    @Override
+    public StateInternals stateInternalsForKey(final K key) {
+      return inMemoryStateInternals;
+    }
+  }
+
+  /**
+   * InMemoryTimerInternalsFactory.
+   */
+  final class InMemoryTimerInternalsFactory implements TimerInternalsFactory<K> {
+    private final InMemoryTimerInternals timerInternals;
+
+    InMemoryTimerInternalsFactory() {
+      this.timerInternals = new InMemoryTimerInternals();
+    }
+
+    @Override
+    public TimerInternals timerInternalsForKey(final K key) {
+      return timerInternals;
+    }
+  }
+}
+
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index 38b2641..fc122f9 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -47,7 +47,6 @@ public final class GroupByKeyTransform<I> implements Transform<I,
WindowedValue<
 
   @Override
   public void onData(final I element) {
-    // TODO #129: support window in group by key for windowed groupByKey
     final WindowedValue<KV> windowedValue = (WindowedValue<KV>) element;
     final KV kv = windowedValue.getValue();
     keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
@@ -56,13 +55,12 @@ public final class GroupByKeyTransform<I> implements Transform<I,
WindowedValue<
 
   @Override
   public void close() {
-    // TODO #129: support window in group by key for windowed groupByKey
     if (keyToValues.isEmpty()) {
       LOG.warn("Beam GroupByKeyTransform received no data!");
     } else {
       keyToValues.entrySet().stream().map(entry ->
         WindowedValue.valueInGlobalWindow(KV.of(entry.getKey(), entry.getValue())))
-          .forEach(outputCollector::emit);
+        .forEach(outputCollector::emit);
       keyToValues.clear();
     }
   }
@@ -75,4 +73,3 @@ public final class GroupByKeyTransform<I> implements Transform<I,
WindowedValue<
     return sb.toString();
   }
 }
-
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index a30ee46..9a65e7a 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -19,16 +19,11 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.StateSpecs;
-import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -43,7 +38,6 @@ import java.util.*;
 
 import static java.util.Collections.emptyList;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -153,7 +147,6 @@ public final class DoFnTransformTest {
     doFnTransform.close();
   }
 
-
   // TODO #216: implement side input and windowing
   @Test
   public void testSideInputs() {
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
index 51fd3bd..6a09f3b 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
@@ -94,12 +94,7 @@ final class GenericSourceSink {
       dataToWrite.apply(ParDo.of(new HDFSWrite(path)));
       return PDone.in(dataToWrite.getPipeline());
     } else {
-      // (Only relevant to local file writes) withWindowedWrites() is required for local
file writes.
-      // Without it, the FileResultCoder#encode, which assumes WindowedValue, will not be
able
-      // to properly handle the FileResult (Beam's file metadata information), and hang the
job.
-      // The root cause is that the Nemo runtime currently only supports batch applications,
and
-      // does not use the Beam's WindowedValue by default.
-      return dataToWrite.apply(TextIO.write().to(path).withWindowedWrites());
+      return dataToWrite.apply(TextIO.write().to(path));
     }
   }
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
new file mode 100644
index 0000000..913156a
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A Windowed WordCount application.
+ */
+public final class WindowedWordCount {
+  /**
+   * Private Constructor.
+   */
+  private WindowedWordCount() {
+  }
+
+  /**
+   * Main function for the MR BEAM program.
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String inputFilePath = args[0];
+    final String outputFilePath = args[1];
+    final String windowType = args[2];
+    final Window<String> windowFn;
+    if (windowType.equals("fixed")) {
+      windowFn = Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)));
+    } else {
+      windowFn = Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10))
+        .every(Duration.standardSeconds(5)));
+    }
+
+    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoPipelineRunner.class);
+    options.setJobName("WindowedWordCount");
+
+    final Pipeline p = Pipeline.create(options);
+    GenericSourceSink.read(p, inputFilePath)
+        .apply(ParDo.of(new DoFn<String, String>() {
+            @ProcessElement
+            public void processElement(@Element final String elem,
+                                       final OutputReceiver<String> out) {
+              final String[] splitt = elem.split("!");
+              out.outputWithTimestamp(splitt[0], new Instant(Long.valueOf(splitt[1])));
+            }
+        }))
+        .apply(windowFn)
+        .apply(MapElements.<String, KV<String, Long>>via(new SimpleFunction<String,
KV<String, Long>>() {
+          @Override
+          public KV<String, Long> apply(final String line) {
+            final String[] words = line.split(" +");
+            final String documentId = words[0] + "#" + words[1];
+            final Long count = Long.parseLong(words[2]);
+            return KV.of(documentId, count);
+          }
+        }))
+        .apply(Sum.longsPerKey())
+        .apply(MapElements.<KV<String, Long>, String>via(new SimpleFunction<KV<String,
Long>, String>() {
+          @Override
+          public String apply(final KV<String, Long> kv) {
+            return kv.getKey() + ": " + kv.getValue();
+          }
+        }))
+        .apply(new WriteOneFilePerWindow(outputFilePath, null));
+    p.run();
+  }
+}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
new file mode 100644
index 0000000..fad134d
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import javax.annotation.Nullable;
+import static org.apache.beam.repackaged.beam_runners_core_java.com.google.common.base.MoreObjects.firstNonNull;
+
+ /**
+  * This class is brought from beam/examples/common/WriteOneFilePerWindow.java.
+  *
+  */
+public final class WriteOneFilePerWindow extends PTransform<PCollection<String>,
PDone> {
+  // change from hourMinute to hourMinuteSecond
+  private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinuteSecond();
+  private String filenamePrefix;
+  @Nullable
+  private Integer numShards;
+   public WriteOneFilePerWindow(final String filenamePrefix, final Integer numShards) {
+    this.filenamePrefix = filenamePrefix;
+    this.numShards = numShards;
+  }
+   @Override
+  public PDone expand(final PCollection<String> input) {
+    final ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
+    TextIO.Write write =
+        TextIO.write()
+            .to(new PerWindowFiles(resource))
+            .withTempDirectory(resource.getCurrentDirectory())
+            .withWindowedWrites();
+    if (numShards != null) {
+      write = write.withNumShards(numShards);
+    }
+    return input.apply(write);
+  }
+   /**
+   * A {@link FilenamePolicy} produces a base file name for a write based on metadata about
the data
+   * being written. This always includes the shard number and the total number of shards.
For
+   * windowed writes, it also includes the window and pane index (a sequence number assigned
to each
+   * trigger firing).
+   */
+  public static final class PerWindowFiles extends FileBasedSink.FilenamePolicy {
+     private final ResourceId baseFilename;
+     PerWindowFiles(final ResourceId baseFilename) {
+      this.baseFilename = baseFilename;
+    }
+     String filenamePrefixForWindow(final IntervalWindow window) {
+      final String prefix =
+          baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
+      return String.format(
+          "%s-%s-%s", prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
+    }
+     @Override
+    public ResourceId windowedFilename(
+        final int shardNumber,
+        final int numShards,
+        final BoundedWindow window,
+        final PaneInfo paneInfo,
+        final FileBasedSink.OutputFileHints outputFileHints) {
+      System.out.println("Windowd file name: " + window);
+      final IntervalWindow intervalWindow = (IntervalWindow) window;
+      final String filename =
+          String.format(
+              "%s-%s-of-%s%s",
+              filenamePrefixForWindow(intervalWindow),
+              shardNumber,
+              numShards,
+              outputFileHints.getSuggestedFilenameSuffix());
+      return baseFilename
+          .getCurrentDirectory()
+          .resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+    }
+     @Override
+    public ResourceId unwindowedFilename(
+        final int shardNumber, final int numShards, final FileBasedSink.OutputFileHints outputFileHints)
{
+      throw new UnsupportedOperationException("Unsupported.");
+    }
+  }
+}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
new file mode 100644
index 0000000..27ee4d8
--- /dev/null
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.nemo.client.JobLauncher;
+import org.apache.nemo.common.test.ArgBuilder;
+import org.apache.nemo.common.test.ExampleTestUtil;
+import org.apache.nemo.compiler.optimizer.policy.ConditionalLargeShufflePolicy;
+import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.apache.nemo.examples.beam.policy.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test Windowed word count program with JobLauncher.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class WindowedWordCountITCase {
+  private static final int TIMEOUT = 120000;
+  private static ArgBuilder builder;
+  private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+
+  private static final String inputFileName = "test_input_windowed_wordcount";
+  private static final String outputFileName = "test_output_windowed_wordcount";
+  private static final String expectedOutputFileName = "expected_output_windowed_wordcount";
+  private static final String expectedSlidingWindowOutputFileName = "expected_output_sliding_windowed_wordcount";
+  private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json";
+  private static final String inputFilePath =  fileBasePath + inputFileName;
+  private static final String outputFilePath =  fileBasePath + outputFileName;
+
+  @Test (timeout = TIMEOUT)
+  public void testFixedWindow() throws Exception {
+    builder = new ArgBuilder()
+      .addUserMain(WindowedWordCount.class.getCanonicalName())
+      .addUserArgs(inputFilePath, outputFilePath, "fixed");
+
+    JobLauncher.main(builder
+        .addResourceJson(executorResourceFileName)
+        .addJobId(WindowedWordCountITCase.class.getSimpleName())
+        .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+        .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedOutputFileName);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
+
+
+  @Test (timeout = TIMEOUT)
+  public void testSlidingWindow() throws Exception {
+    builder = new ArgBuilder()
+      .addUserMain(WindowedWordCount.class.getCanonicalName())
+      .addUserArgs(inputFilePath, outputFilePath, "sliding");
+
+    JobLauncher.main(builder
+      .addResourceJson(executorResourceFileName)
+      .addJobId(WindowedWordCountITCase.class.getSimpleName())
+      .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+      .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedSlidingWindowOutputFileName);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
+}
diff --git a/examples/resources/expected_output_sliding_windowed_wordcount b/examples/resources/expected_output_sliding_windowed_wordcount
new file mode 100644
index 0000000..4437daa
--- /dev/null
+++ b/examples/resources/expected_output_sliding_windowed_wordcount
@@ -0,0 +1,18 @@
+gw#m: 10
+gw#m: 10
+gw#m: 90
+gw#m: 90
+john#m: 40
+john#m: 50
+john#m: 80
+john#m: 90
+jykim#f: 100
+jykim#f: 40
+jykim#f: 60
+mh#m: 30
+mh#m: 30
+mh#m: 70
+mh#m: 70
+wonook#m: 100
+wonook#m: 50
+wonook#m: 50
diff --git a/examples/resources/expected_output_windowed_wordcount b/examples/resources/expected_output_windowed_wordcount
new file mode 100644
index 0000000..636bb9e
--- /dev/null
+++ b/examples/resources/expected_output_windowed_wordcount
@@ -0,0 +1,11 @@
+gw#m: 10
+gw#m: 90
+john#m: 40
+john#m: 40
+john#m: 50
+jykim#f: 40
+jykim#f: 60
+mh#m: 30
+mh#m: 70
+wonook#m: 50
+wonook#m: 50
diff --git a/examples/resources/test_input_windowed_wordcount b/examples/resources/test_input_windowed_wordcount
new file mode 100644
index 0000000..e26179f
--- /dev/null
+++ b/examples/resources/test_input_windowed_wordcount
@@ -0,0 +1,15 @@
+wonook m 50!1536907180000
+john m 20!1536907181000
+gw m 90!1536907182000
+john m 20!1536907183000
+mh m 30!1536907184000
+john m 20!1536907185000
+wonook m 50!1536907186000
+jykim f 40!1536907187000
+john m 20!1536907188000
+jykim f 20!1536907189000
+mh m 70!1536907190000
+jykim f 40!1536907191000
+gw m 10!1536907192000
+john m 20!1536907193000
+john m 30!1536907194000
diff --git a/examples/resources/test_input_wordcount b/examples/resources/test_input_wordcount
index b59abc4..8813dbd 100644
--- a/examples/resources/test_input_wordcount
+++ b/examples/resources/test_input_wordcount
@@ -11,4 +11,4 @@ jykim f 20
 mh m 70
 jykim f 40
 gw m 10
-john m 20
\ No newline at end of file
+john m 20


Mime
View raw message