nemo-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] johnyangk closed pull request #123: [NEMO-129] Support Beam's WindowedWordCount example
Date Fri, 19 Oct 2018 02:13:31 GMT
johnyangk closed pull request #123: [NEMO-129] Support Beam's WindowedWordCount example
URL: https://github.com/apache/incubator-nemo/pull/123
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
index 2486a00f6..a4e5d1b11 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java
@@ -16,6 +16,7 @@
 package org.apache.nemo.compiler.frontend.beam;
 
 import com.google.common.collect.Iterables;
+import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.construction.ParDoTranslation;
 import org.apache.beam.runners.core.construction.TransformInputs;
 import org.apache.beam.sdk.Pipeline;
@@ -48,10 +49,7 @@
 import java.lang.annotation.*;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Stack;
+import java.util.*;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
@@ -198,11 +196,40 @@ private static void parDoMultiOutputTranslator(final TranslationContext
ctx,
         pValueWithTupleTag.getKey()));
   }
 
+  /**
+   * Create a group by key transform.
+   * It returns GroupByKeyAndWindowDoFnTransform if window function is not default.
+   * @param ctx translation context
+   * @param transformVertex transform vertex
+   * @return group by key transform
+   */
+  private static Transform createGBKTransform(
+    final TranslationContext ctx,
+    final TransformVertex transformVertex) {
+    final AppliedPTransform pTransform = transformVertex.getNode().toAppliedPTransform(PIPELINE.get());
+    final PCollection<?> mainInput = (PCollection<?>)
+      Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
+    final TupleTag mainOutputTag = new TupleTag<>();
+
+    if (mainInput.getWindowingStrategy() == WindowingStrategy.globalDefault()) {
+      return new GroupByKeyTransform();
+    } else {
+      return new GroupByKeyAndWindowDoFnTransform(
+        getOutputCoders(pTransform),
+        mainOutputTag,
+        Collections.emptyList(),  /*  GBK does not have additional outputs */
+        mainInput.getWindowingStrategy(),
+        Collections.emptyList(), /*  GBK does not have additional side inputs */
+        ctx.pipelineOptions,
+        SystemReduceFn.buffering(mainInput.getCoder()));
+    }
+  }
+
   @PrimitiveTransformTranslator(GroupByKey.class)
   private static void groupByKeyTranslator(final TranslationContext ctx,
                                            final PrimitiveTransformVertex transformVertex,
                                            final GroupByKey<?, ?> transform) {
-    final IRVertex vertex = new OperatorVertex(new GroupByKeyTransform());
+    final IRVertex vertex = new OperatorVertex(createGBKTransform(ctx, transformVertex));
     ctx.addVertex(vertex);
     transformVertex.getNode().getInputs().values().forEach(input -> ctx.addEdgeTo(vertex,
input));
     transformVertex.getNode().getOutputs().values().forEach(output -> ctx.registerMainOutputFrom(vertex,
output));
@@ -297,7 +324,7 @@ private static void combineTranslator(final TranslationContext ctx,
       // Attempt to translate the CompositeTransform again.
       // Add GroupByKey, which is the first transform in the given CompositeTransform.
       // Make sure it consumes the output from the last vertex in OneToOneEdge-translated
hierarchy.
-      final IRVertex groupByKeyIRVertex = new OperatorVertex(new GroupByKeyTransform());
+      final IRVertex groupByKeyIRVertex = new OperatorVertex(createGBKTransform(ctx, transformVertex));
       ctx.addVertex(groupByKeyIRVertex);
       last.getNode().getOutputs().values().forEach(outputFromCombiner
           -> ctx.addEdgeTo(groupByKeyIRVertex, outputFromCombiner));
@@ -617,7 +644,8 @@ private void registerAdditionalOutputFrom(final IRVertex irVertex, final
PValue
       if (srcTransform instanceof FlattenTransform) {
         return CommunicationPatternProperty.Value.OneToOne;
       }
-      if (dstTransform instanceof GroupByKeyTransform) {
+      if (dstTransform instanceof GroupByKeyAndWindowDoFnTransform
+        || dstTransform instanceof GroupByKeyTransform) {
         return CommunicationPatternProperty.Value.Shuffle;
       }
       if (dstTransform instanceof CreateViewTransform) {
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
new file mode 100644
index 000000000..8679c7348
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/AbstractDoFnTransform.java
@@ -0,0 +1,190 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.nemo.common.ir.OutputCollector;
+import org.apache.nemo.common.ir.vertex.transform.Transform;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is a base class for Beam DoFn Transforms.
+ *
+ * @param <InputT> input type.
+ * @param <InterT> intermediate type.
+ * @param <OutputT> output type.
+ */
+public abstract class AbstractDoFnTransform<InputT, InterT, OutputT> implements
+  Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
+
+  private final TupleTag<OutputT> mainOutputTag;
+  private final List<TupleTag<?>> additionalOutputTags;
+  private final Collection<PCollectionView<?>> sideInputs;
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private final DoFn<InterT, OutputT> doFn;
+  private final SerializablePipelineOptions serializedOptions;
+  private final Coder<InputT> inputCoder;
+  private final Map<TupleTag<?>, Coder<?>> outputCoders;
+
+  private transient OutputCollector<WindowedValue<OutputT>> outputCollector;
+  private transient DoFnRunner<InterT, OutputT> doFnRunner;
+  private transient SideInputReader sideInputReader;
+  private transient DoFnInvoker<InterT, OutputT> doFnInvoker;
+  private transient DoFnRunners.OutputManager outputManager;
+
+  /**
+   * AbstractDoFnTransform constructor.
+   * @param doFn doFn
+   * @param inputCoder input coder
+   * @param outputCoders output coders
+   * @param mainOutputTag main output tag
+   * @param additionalOutputTags additional output tags
+   * @param windowingStrategy windowing strategy
+   * @param sideInputs side inputs
+   * @param options pipeline options
+   */
+  public AbstractDoFnTransform(final DoFn<InterT, OutputT> doFn,
+                               final Coder<InputT> inputCoder,
+                               final Map<TupleTag<?>, Coder<?>> outputCoders,
+                               final TupleTag<OutputT> mainOutputTag,
+                               final List<TupleTag<?>> additionalOutputTags,
+                               final WindowingStrategy<?, ?> windowingStrategy,
+                               final Collection<PCollectionView<?>> sideInputs,
+                               final PipelineOptions options) {
+    this.doFn = doFn;
+    this.inputCoder = inputCoder;
+    this.outputCoders = outputCoders;
+    this.mainOutputTag = mainOutputTag;
+    this.additionalOutputTags = additionalOutputTags;
+    this.sideInputs = sideInputs;
+    this.serializedOptions = new SerializablePipelineOptions(options);
+    this.windowingStrategy = windowingStrategy;
+  }
+
+  protected final DoFnRunners.OutputManager getOutputManager() {
+    return outputManager;
+  }
+
+  protected final WindowingStrategy getWindowingStrategy() {
+    return windowingStrategy;
+  }
+
+  protected final SideInputReader getSideInputReader() {
+    return sideInputReader;
+  }
+
+  protected final TupleTag<OutputT> getMainOutputTag() {
+    return mainOutputTag;
+  }
+
+  protected final DoFnRunner<InterT, OutputT> getDoFnRunner() {
+    return doFnRunner;
+  }
+
+  public final DoFn getDoFn() {
+    return doFn;
+  }
+
+  @Override
+  public final void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>>
oc) {
+    // deserialize pipeline option
+    final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
+    this.outputCollector = oc;
+
+    // create output manager
+    outputManager = new DefaultOutputManager<>(
+      outputCollector, context, mainOutputTag);
+
+    // create side input reader
+    if (!sideInputs.isEmpty()) {
+      sideInputReader = new BroadcastVariableSideInputReader(context, sideInputs);
+    } else {
+      sideInputReader = NullSideInputReader.of(sideInputs);
+    }
+
+    // create step context
+    // this transform does not support state and timer.
+    final StepContext stepContext = new StepContext() {
+      @Override
+      public StateInternals stateInternals() {
+        throw new UnsupportedOperationException("Not support stateInternals in DoFnTransform");
+      }
+
+      @Override
+      public TimerInternals timerInternals() {
+        throw new UnsupportedOperationException("Not support timerInternals in DoFnTransform");
+      }
+    };
+
+    final DoFn wrappedDoFn = wrapDoFn(doFn);
+
+    // invoker
+    doFnInvoker = DoFnInvokers.invokerFor(wrappedDoFn);
+    doFnInvoker.invokeSetup();
+
+    // DoFnRunners.simpleRunner takes care of all the hard stuff of running the DoFn
+    // and that this approach is the standard used by most of the Beam runners
+    doFnRunner = DoFnRunners.simpleRunner(
+      options,
+      wrappedDoFn,
+      sideInputReader,
+      outputManager,
+      mainOutputTag,
+      additionalOutputTags,
+      stepContext,
+      inputCoder,
+      outputCoders,
+      windowingStrategy);
+
+    doFnRunner.startBundle();
+  }
+
+  @Override
+  public final void close() {
+    beforeClose();
+    doFnRunner.finishBundle();
+    doFnInvoker.invokeTeardown();
+  }
+
+  /**
+   * An abstract function that wraps the original doFn.
+   * @param originalDoFn the original doFn.
+   * @return wrapped doFn.
+   */
+  abstract DoFn wrapDoFn(final DoFn originalDoFn);
+
+  @Override
+  public abstract void onData(final WindowedValue<InputT> data);
+
+  /**
+   * An abstract function that is called before close.
+   */
+  abstract void beforeClose();
+}
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
index 8dbf0518e..76cd84be1 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransform.java
@@ -15,20 +15,13 @@
  */
 package org.apache.nemo.compiler.frontend.beam.transform;
 
-import org.apache.beam.runners.core.*;
-import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
-import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.nemo.common.ir.OutputCollector;
-import org.apache.nemo.common.ir.vertex.transform.Transform;
-import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
 
 import java.util.Collection;
 import java.util.List;
@@ -40,21 +33,7 @@
  * @param <InputT> input type.
  * @param <OutputT> output type.
  */
-public final class DoFnTransform<InputT, OutputT> implements
-  Transform<WindowedValue<InputT>, WindowedValue<OutputT>> {
-
-  private OutputCollector<WindowedValue<OutputT>> outputCollector;
-  private final TupleTag<OutputT> mainOutputTag;
-  private final List<TupleTag<?>> additionalOutputTags;
-  private final Collection<PCollectionView<?>> sideInputs;
-  private final WindowingStrategy<?, ?> windowingStrategy;
-  private final DoFn<InputT, OutputT> doFn;
-  private final SerializablePipelineOptions serializedOptions;
-  private transient DoFnRunner<InputT, OutputT> doFnRunner;
-  private transient SideInputReader sideInputReader;
-  private transient DoFnInvoker<InputT, OutputT> doFnInvoker;
-  private final Coder<InputT> inputCoder;
-  private final Map<TupleTag<?>, Coder<?>> outputCoders;
+public final class DoFnTransform<InputT, OutputT> extends AbstractDoFnTransform<InputT,
InputT, OutputT> {
 
   /**
    * DoFnTransform Constructor.
@@ -70,88 +49,29 @@ public DoFnTransform(final DoFn<InputT, OutputT> doFn,
                        final WindowingStrategy<?, ?> windowingStrategy,
                        final Collection<PCollectionView<?>> sideInputs,
                        final PipelineOptions options) {
-    this.doFn = doFn;
-    this.inputCoder = inputCoder;
-    this.outputCoders = outputCoders;
-    this.mainOutputTag = mainOutputTag;
-    this.additionalOutputTags = additionalOutputTags;
-    this.sideInputs = sideInputs;
-    this.serializedOptions = new SerializablePipelineOptions(options);
-    this.windowingStrategy = windowingStrategy;
+    super(doFn, inputCoder, outputCoders, mainOutputTag,
+      additionalOutputTags, windowingStrategy, sideInputs, options);
   }
 
   @Override
-  public void prepare(final Context context, final OutputCollector<WindowedValue<OutputT>>
oc) {
-    // deserialize pipeline option
-    final NemoPipelineOptions options = serializedOptions.get().as(NemoPipelineOptions.class);
-
-    this.outputCollector = oc;
-
-    // create output manager
-    final DoFnRunners.OutputManager outputManager = new DefaultOutputManager<>(
-      outputCollector, context, mainOutputTag);
-
-    // create side input reader
-    if (!sideInputs.isEmpty()) {
-      sideInputReader = new BroadcastVariableSideInputReader(context, sideInputs);
-    } else {
-      sideInputReader = NullSideInputReader.of(sideInputs);
-    }
-
-    // create step context
-    // this transform does not support state and timer.
-    final StepContext stepContext = new StepContext() {
-      @Override
-      public StateInternals stateInternals() {
-        throw new UnsupportedOperationException("Not support stateInternals in DoFnTransform");
-      }
-
-      @Override
-      public TimerInternals timerInternals() {
-        throw new UnsupportedOperationException("Not support timerInternals in DoFnTransform");
-      }
-    };
-
-    // invoker
-    doFnInvoker = DoFnInvokers.invokerFor(doFn);
-    doFnInvoker.invokeSetup();
-
-    // DoFnRunners.simpleRunner takes care of all the hard stuff of running the DoFn
-    // and that this approach is the standard used by most of the Beam runners
-    doFnRunner = DoFnRunners.simpleRunner(
-      options,
-      doFn,
-      sideInputReader,
-      outputManager,
-      mainOutputTag,
-      additionalOutputTags,
-      stepContext,
-      inputCoder,
-      outputCoders,
-      windowingStrategy);
-
-    doFnRunner.startBundle();
+  protected DoFn wrapDoFn(final DoFn initDoFn) {
+    return initDoFn;
   }
 
   @Override
   public void onData(final WindowedValue<InputT> data) {
-    doFnRunner.processElement(data);
-  }
-
-  public DoFn getDoFn() {
-    return doFn;
+    getDoFnRunner().processElement(data);
   }
 
   @Override
-  public void close() {
-    doFnRunner.finishBundle();
-    doFnInvoker.invokeTeardown();
+  protected void beforeClose() {
+    // nothing
   }
 
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder();
-    sb.append("DoTransform:" + doFn);
+    sb.append("DoTransform:" + getDoFn());
     return sb.toString();
   }
 }
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
new file mode 100644
index 000000000..4827e69b1
--- /dev/null
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyAndWindowDoFnTransform.java
@@ -0,0 +1,196 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.compiler.frontend.beam.transform;
+
+import org.apache.beam.runners.core.*;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Groups elements according to key and window.
+ * @param <K> key type.
+ * @param <InputT> input type.
+ */
+public final class GroupByKeyAndWindowDoFnTransform<K, InputT>
+    extends AbstractDoFnTransform<KV<K, InputT>, KeyedWorkItem<K, InputT>,
KV<K, Iterable<InputT>>> {
+  private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransform.class.getName());
+
+  private final SystemReduceFn reduceFn;
+  private final Map<K, List<WindowedValue<InputT>>> keyToValues;
+  private transient InMemoryTimerInternalsFactory timerInternalsFactory;
+
+  /**
+   * GroupByKey constructor.
+   */
+  public GroupByKeyAndWindowDoFnTransform(final Map<TupleTag<?>, Coder<?>>
outputCoders,
+                                          final TupleTag<KV<K, Iterable<InputT>>>
mainOutputTag,
+                                          final List<TupleTag<?>> additionalOutputTags,
+                                          final WindowingStrategy<?, ?> windowingStrategy,
+                                          final Collection<PCollectionView<?>>
sideInputs,
+                                          final PipelineOptions options,
+                                          final SystemReduceFn reduceFn) {
+    super(null, /* doFn */
+      null, /* inputCoder */
+      outputCoders,
+      mainOutputTag,
+      additionalOutputTags,
+      windowingStrategy,
+      sideInputs,
+      options);
+    this.keyToValues = new HashMap<>();
+    this.reduceFn = reduceFn;
+  }
+
+  /**
+   * This creates a new DoFn that groups elements by key and window.
+   * @param doFn original doFn.
+   * @return GroupAlsoByWindowViaWindowSetNewDoFn
+   */
+  @Override
+  protected DoFn wrapDoFn(final DoFn doFn) {
+    timerInternalsFactory = new InMemoryTimerInternalsFactory();
+    // This function performs group by key and window operation
+    return
+      GroupAlsoByWindowViaWindowSetNewDoFn.create(
+        getWindowingStrategy(),
+        new InMemoryStateInternalsFactory(),
+        timerInternalsFactory,
+        getSideInputReader(),
+        reduceFn,
+        getOutputManager(),
+        getMainOutputTag());
+  }
+
+  @Override
+  public void onData(final WindowedValue<KV<K, InputT>> element) {
+    final KV<K, InputT> kv = element.getValue();
+    keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
+    keyToValues.get(kv.getKey()).add(element.withValue(kv.getValue()));
+  }
+
+  /**
+   * This advances the input watermark and processing time to the timestamp max value
+   * in order to emit all data.
+   */
+  @Override
+  protected void beforeClose() {
+    final InMemoryTimerInternals timerInternals = timerInternalsFactory.timerInternals;
+    try {
+      // Finish any pending windows by advancing the input watermark to infinity.
+      timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+      // Finally, advance the processing time to infinity to fire any timers.
+      timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+      timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    } catch (final Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    if (keyToValues.isEmpty()) {
+      LOG.warn("Beam GroupByKeyAndWindowDoFnTransform received no data!");
+    } else {
+      // timer
+      final Iterable<TimerInternals.TimerData> timerData = getTimers(timerInternals);
+
+      keyToValues.entrySet().stream().forEach(entry -> {
+        // The GroupAlsoByWindowViaWindowSetNewDoFn requires KeyedWorkItem,
+        // so we convert the KV to KeyedWorkItem
+        final KeyedWorkItem<K, InputT> keyedWorkItem =
+          KeyedWorkItems.workItem(entry.getKey(), timerData, entry.getValue());
+        getDoFnRunner().processElement(WindowedValue.valueInGlobalWindow(keyedWorkItem));
+      });
+      keyToValues.clear();
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("GroupByKeyAndWindowDoFnTransform:");
+    return sb.toString();
+  }
+
+  private Iterable<TimerInternals.TimerData> getTimers(final InMemoryTimerInternals
timerInternals) {
+    final List<TimerInternals.TimerData> timerData = new LinkedList<>();
+
+    while (true) {
+      TimerInternals.TimerData timer;
+      boolean hasFired = false;
+
+      while ((timer = timerInternals.removeNextEventTimer()) != null) {
+        hasFired = true;
+        timerData.add(timer);
+      }
+      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+        hasFired = true;
+        timerData.add(timer);
+      }
+      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+        hasFired = true;
+        timerData.add(timer);
+      }
+      if (!hasFired) {
+        break;
+      }
+    }
+
+    return timerData;
+  }
+
+  /**
+   * InMemoryStateInternalsFactory.
+   */
+  final class InMemoryStateInternalsFactory implements StateInternalsFactory<K> {
+    private final InMemoryStateInternals inMemoryStateInternals;
+
+    InMemoryStateInternalsFactory() {
+      this.inMemoryStateInternals = InMemoryStateInternals.forKey(null);
+    }
+
+    @Override
+    public StateInternals stateInternalsForKey(final K key) {
+      return inMemoryStateInternals;
+    }
+  }
+
+  /**
+   * InMemoryTimerInternalsFactory.
+   */
+  final class InMemoryTimerInternalsFactory implements TimerInternalsFactory<K> {
+    private final InMemoryTimerInternals timerInternals;
+
+    InMemoryTimerInternalsFactory() {
+      this.timerInternals = new InMemoryTimerInternals();
+    }
+
+    @Override
+    public TimerInternals timerInternalsForKey(final K key) {
+      return timerInternals;
+    }
+  }
+}
+
diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
index 38b26411d..fc122f91f 100644
--- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
+++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GroupByKeyTransform.java
@@ -47,7 +47,6 @@ public void prepare(final Context context, final OutputCollector<WindowedValue<K
 
   @Override
   public void onData(final I element) {
-    // TODO #129: support window in group by key for windowed groupByKey
     final WindowedValue<KV> windowedValue = (WindowedValue<KV>) element;
     final KV kv = windowedValue.getValue();
     keyToValues.putIfAbsent(kv.getKey(), new ArrayList());
@@ -56,13 +55,12 @@ public void onData(final I element) {
 
   @Override
   public void close() {
-    // TODO #129: support window in group by key for windowed groupByKey
     if (keyToValues.isEmpty()) {
       LOG.warn("Beam GroupByKeyTransform received no data!");
     } else {
       keyToValues.entrySet().stream().map(entry ->
         WindowedValue.valueInGlobalWindow(KV.of(entry.getKey(), entry.getValue())))
-          .forEach(outputCollector::emit);
+        .forEach(outputCollector::emit);
       keyToValues.clear();
     }
   }
@@ -75,4 +73,3 @@ public String toString() {
     return sb.toString();
   }
 }
-
diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
index a30ee4638..9a65e7a56 100644
--- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
+++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/DoFnTransformTest.java
@@ -19,16 +19,11 @@
 import com.google.common.collect.ImmutableMap;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.state.StateSpec;
-import org.apache.beam.sdk.state.StateSpecs;
-import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.WindowingStrategy;
@@ -43,7 +38,6 @@
 
 import static java.util.Collections.emptyList;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -153,7 +147,6 @@ public void testMultiOutputOutput() {
     doFnTransform.close();
   }
 
-
   // TODO #216: implement side input and windowing
   @Test
   public void testSideInputs() {
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
index 51fd3bd9d..6a09f3bf3 100644
--- a/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/GenericSourceSink.java
@@ -94,12 +94,7 @@ public static PDone write(final PCollection<String> dataToWrite,
       dataToWrite.apply(ParDo.of(new HDFSWrite(path)));
       return PDone.in(dataToWrite.getPipeline());
     } else {
-      // (Only relevant to local file writes) withWindowedWrites() is required for local
file writes.
-      // Without it, the FileResultCoder#encode, which assumes WindowedValue, will not be
able
-      // to properly handle the FileResult (Beam's file metadata information), and hang the
job.
-      // The root cause is that the Nemo runtime currently only supports batch applications,
and
-      // does not use the Beam's WindowedValue by default.
-      return dataToWrite.apply(TextIO.write().to(path).withWindowedWrites());
+      return dataToWrite.apply(TextIO.write().to(path));
     }
   }
 
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
new file mode 100644
index 000000000..913156a1e
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WindowedWordCount.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.*;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
+import org.apache.nemo.compiler.frontend.beam.NemoPipelineRunner;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * A Windowed WordCount application.
+ */
+public final class WindowedWordCount {
+  /**
+   * Private Constructor.
+   */
+  private WindowedWordCount() {
+  }
+
+  /**
+   * Main function for the MR BEAM program.
+   * @param args arguments.
+   */
+  public static void main(final String[] args) {
+    final String inputFilePath = args[0];
+    final String outputFilePath = args[1];
+    final String windowType = args[2];
+    final Window<String> windowFn;
+    if (windowType.equals("fixed")) {
+      windowFn = Window.<String>into(FixedWindows.of(Duration.standardSeconds(5)));
+    } else {
+      windowFn = Window.<String>into(SlidingWindows.of(Duration.standardSeconds(10))
+        .every(Duration.standardSeconds(5)));
+    }
+
+    final PipelineOptions options = PipelineOptionsFactory.create().as(NemoPipelineOptions.class);
+    options.setRunner(NemoPipelineRunner.class);
+    options.setJobName("WindowedWordCount");
+
+    final Pipeline p = Pipeline.create(options);
+    GenericSourceSink.read(p, inputFilePath)
+        .apply(ParDo.of(new DoFn<String, String>() {
+            @ProcessElement
+            public void processElement(@Element final String elem,
+                                       final OutputReceiver<String> out) {
+              final String[] splitt = elem.split("!");
+              out.outputWithTimestamp(splitt[0], new Instant(Long.valueOf(splitt[1])));
+            }
+        }))
+        .apply(windowFn)
+        .apply(MapElements.<String, KV<String, Long>>via(new SimpleFunction<String,
KV<String, Long>>() {
+          @Override
+          public KV<String, Long> apply(final String line) {
+            final String[] words = line.split(" +");
+            final String documentId = words[0] + "#" + words[1];
+            final Long count = Long.parseLong(words[2]);
+            return KV.of(documentId, count);
+          }
+        }))
+        .apply(Sum.longsPerKey())
+        .apply(MapElements.<KV<String, Long>, String>via(new SimpleFunction<KV<String,
Long>, String>() {
+          @Override
+          public String apply(final KV<String, Long> kv) {
+            return kv.getKey() + ": " + kv.getValue();
+          }
+        }))
+        .apply(new WriteOneFilePerWindow(outputFilePath, null));
+    p.run();
+  }
+}
diff --git a/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
new file mode 100644
index 000000000..fad134d8b
--- /dev/null
+++ b/examples/beam/src/main/java/org/apache/nemo/examples/beam/WriteOneFilePerWindow.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+import org.apache.beam.sdk.io.FileBasedSink;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.io.fs.ResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+import org.joda.time.format.DateTimeFormatter;
+import org.joda.time.format.ISODateTimeFormat;
+import javax.annotation.Nullable;
+import static org.apache.beam.repackaged.beam_runners_core_java.com.google.common.base.MoreObjects.firstNonNull;
+
+ /**
+  * This class is brought from beam/examples/common/WriteOneFilePerWindow.java.
+  *
+  */
+public final class WriteOneFilePerWindow extends PTransform<PCollection<String>,
PDone> {
+  // change from hourMinute to hourMinuteSecond
+  private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinuteSecond();
+  private String filenamePrefix;
+  @Nullable
+  private Integer numShards;
+   public WriteOneFilePerWindow(final String filenamePrefix, final Integer numShards) {
+    this.filenamePrefix = filenamePrefix;
+    this.numShards = numShards;
+  }
+   @Override
+  public PDone expand(final PCollection<String> input) {
+    final ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
+    TextIO.Write write =
+        TextIO.write()
+            .to(new PerWindowFiles(resource))
+            .withTempDirectory(resource.getCurrentDirectory())
+            .withWindowedWrites();
+    if (numShards != null) {
+      write = write.withNumShards(numShards);
+    }
+    return input.apply(write);
+  }
+   /**
+   * A {@link FilenamePolicy} produces a base file name for a write based on metadata about
the data
+   * being written. This always includes the shard number and the total number of shards.
For
+   * windowed writes, it also includes the window and pane index (a sequence number assigned
to each
+   * trigger firing).
+   */
+  public static final class PerWindowFiles extends FileBasedSink.FilenamePolicy {
+     private final ResourceId baseFilename;
+     PerWindowFiles(final ResourceId baseFilename) {
+      this.baseFilename = baseFilename;
+    }
+     String filenamePrefixForWindow(final IntervalWindow window) {
+      final String prefix =
+          baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
+      return String.format(
+          "%s-%s-%s", prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
+    }
+     @Override
+    public ResourceId windowedFilename(
+        final int shardNumber,
+        final int numShards,
+        final BoundedWindow window,
+        final PaneInfo paneInfo,
+        final FileBasedSink.OutputFileHints outputFileHints) {
+      System.out.println("Windowd file name: " + window);
+      final IntervalWindow intervalWindow = (IntervalWindow) window;
+      final String filename =
+          String.format(
+              "%s-%s-of-%s%s",
+              filenamePrefixForWindow(intervalWindow),
+              shardNumber,
+              numShards,
+              outputFileHints.getSuggestedFilenameSuffix());
+      return baseFilename
+          .getCurrentDirectory()
+          .resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
+    }
+     @Override
+    public ResourceId unwindowedFilename(
+        final int shardNumber, final int numShards, final FileBasedSink.OutputFileHints outputFileHints)
{
+      throw new UnsupportedOperationException("Unsupported.");
+    }
+  }
+}
diff --git a/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
new file mode 100644
index 000000000..27ee4d8eb
--- /dev/null
+++ b/examples/beam/src/test/java/org/apache/nemo/examples/beam/WindowedWordCountITCase.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2018 Seoul National University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nemo.examples.beam;
+
+import org.apache.nemo.client.JobLauncher;
+import org.apache.nemo.common.test.ArgBuilder;
+import org.apache.nemo.common.test.ExampleTestUtil;
+import org.apache.nemo.compiler.optimizer.policy.ConditionalLargeShufflePolicy;
+import org.apache.nemo.compiler.optimizer.policy.DefaultPolicy;
+import org.apache.nemo.examples.beam.policy.*;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+/**
+ * Test Windowed word count program with JobLauncher.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(JobLauncher.class)
+public final class WindowedWordCountITCase {
+  private static final int TIMEOUT = 120000;
+  private static ArgBuilder builder;
+  private static final String fileBasePath = System.getProperty("user.dir") + "/../resources/";
+
+  private static final String inputFileName = "test_input_windowed_wordcount";
+  private static final String outputFileName = "test_output_windowed_wordcount";
+  private static final String expectedOutputFileName = "expected_output_windowed_wordcount";
+  private static final String expectedSlidingWindowOutputFileName = "expected_output_sliding_windowed_wordcount";
+  private static final String executorResourceFileName = fileBasePath + "beam_test_executor_resources.json";
+  private static final String inputFilePath =  fileBasePath + inputFileName;
+  private static final String outputFilePath =  fileBasePath + outputFileName;
+
+  @Test (timeout = TIMEOUT)
+  public void testFixedWindow() throws Exception {
+    builder = new ArgBuilder()
+      .addUserMain(WindowedWordCount.class.getCanonicalName())
+      .addUserArgs(inputFilePath, outputFilePath, "fixed");
+
+    JobLauncher.main(builder
+        .addResourceJson(executorResourceFileName)
+        .addJobId(WindowedWordCountITCase.class.getSimpleName())
+        .addOptimizationPolicy(DefaultPolicyParallelismFive.class.getCanonicalName())
+        .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedOutputFileName);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
+
+
+  @Test (timeout = TIMEOUT)
+  public void testSlidingWindow() throws Exception {
+    builder = new ArgBuilder()
+      .addUserMain(WindowedWordCount.class.getCanonicalName())
+      .addUserArgs(inputFilePath, outputFilePath, "sliding");
+
+    JobLauncher.main(builder
+      .addResourceJson(executorResourceFileName)
+      .addJobId(WindowedWordCountITCase.class.getSimpleName())
+      .addOptimizationPolicy(DefaultPolicy.class.getCanonicalName())
+      .build());
+
+    try {
+      ExampleTestUtil.ensureOutputValidity(fileBasePath, outputFileName, expectedSlidingWindowOutputFileName);
+    } finally {
+      ExampleTestUtil.deleteOutputFile(fileBasePath, outputFileName);
+    }
+  }
+}
diff --git a/examples/resources/expected_output_sliding_windowed_wordcount b/examples/resources/expected_output_sliding_windowed_wordcount
new file mode 100644
index 000000000..4437daa0c
--- /dev/null
+++ b/examples/resources/expected_output_sliding_windowed_wordcount
@@ -0,0 +1,18 @@
+gw#m: 10
+gw#m: 10
+gw#m: 90
+gw#m: 90
+john#m: 40
+john#m: 50
+john#m: 80
+john#m: 90
+jykim#f: 100
+jykim#f: 40
+jykim#f: 60
+mh#m: 30
+mh#m: 30
+mh#m: 70
+mh#m: 70
+wonook#m: 100
+wonook#m: 50
+wonook#m: 50
diff --git a/examples/resources/expected_output_windowed_wordcount b/examples/resources/expected_output_windowed_wordcount
new file mode 100644
index 000000000..636bb9e30
--- /dev/null
+++ b/examples/resources/expected_output_windowed_wordcount
@@ -0,0 +1,11 @@
+gw#m: 10
+gw#m: 90
+john#m: 40
+john#m: 40
+john#m: 50
+jykim#f: 40
+jykim#f: 60
+mh#m: 30
+mh#m: 70
+wonook#m: 50
+wonook#m: 50
diff --git a/examples/resources/test_input_windowed_wordcount b/examples/resources/test_input_windowed_wordcount
new file mode 100644
index 000000000..e26179f65
--- /dev/null
+++ b/examples/resources/test_input_windowed_wordcount
@@ -0,0 +1,15 @@
+wonook m 50!1536907180000
+john m 20!1536907181000
+gw m 90!1536907182000
+john m 20!1536907183000
+mh m 30!1536907184000
+john m 20!1536907185000
+wonook m 50!1536907186000
+jykim f 40!1536907187000
+john m 20!1536907188000
+jykim f 20!1536907189000
+mh m 70!1536907190000
+jykim f 40!1536907191000
+gw m 10!1536907192000
+john m 20!1536907193000
+john m 30!1536907194000
diff --git a/examples/resources/test_input_wordcount b/examples/resources/test_input_wordcount
index b59abc4a8..8813dbdd8 100644
--- a/examples/resources/test_input_wordcount
+++ b/examples/resources/test_input_wordcount
@@ -11,4 +11,4 @@ jykim f 20
 mh m 70
 jykim f 40
 gw m 10
-john m 20
\ No newline at end of file
+john m 20


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message