beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [02/23] beam git commit: Better name for batch implementation of GroupAlsoByWindow.
Date Tue, 28 Feb 2017 22:35:09 GMT
Better name for batch implementation of GroupAlsoByWindow.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf0c119b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf0c119b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf0c119b

Branch: refs/heads/master
Commit: bf0c119b50c2dc45f21a8c740a6f98136771d7af
Parents: a41afdc
Author: Sela <ansela@paypal.com>
Authored: Mon Feb 13 16:30:16 2017 +0200
Committer: Sela <ansela@paypal.com>
Committed: Wed Mar 1 00:17:58 2017 +0200

----------------------------------------------------------------------
 .../translation/GroupCombineFunctions.java      |   2 +-
 .../translation/SparkGroupAlsoByWindowFn.java   | 198 -------------------
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java | 198 +++++++++++++++++++
 .../spark/translation/TranslationUtils.java     |   2 +-
 4 files changed, 200 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bf0c119b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index bb95065..8a41b4e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -84,7 +84,7 @@ public class GroupCombineFunctions {
     //--- now group also by window.
     // GroupAlsoByWindow currently uses a dummy in-memory StateInternals
     return groupedByKey.flatMap(
-        new SparkGroupAlsoByWindowFn<>(
+        new SparkGroupAlsoByWindowViaOutputBufferFn<>(
             windowingStrategy,
             new TranslationUtils.InMemoryStateInternalsFactory<K>(),
             SystemReduceFn.<K, V, W>buffering(valueCoder),

http://git-wip-us.apache.org/repos/asf/beam/blob/bf0c119b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
deleted file mode 100644
index bd37fdb..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn;
-import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
-import org.apache.beam.runners.core.InMemoryTimerInternals;
-import org.apache.beam.runners.core.OutputWindowedValue;
-import org.apache.beam.runners.core.ReduceFnRunner;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateInternalsFactory;
-import org.apache.beam.runners.core.SystemReduceFn;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
-import org.apache.beam.runners.core.triggers.TriggerStateMachines;
-import org.apache.beam.runners.spark.aggregators.NamedAggregators;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.spark.Accumulator;
-import org.apache.spark.api.java.function.FlatMapFunction;
-import org.joda.time.Instant;
-
-/**
- * An implementation of {@link GroupAlsoByWindowViaOutputBufferDoFn}
- * for the Spark runner.
- */
-public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow>
-    implements FlatMapFunction<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>,
-        WindowedValue<KV<K, Iterable<InputT>>>> {
-
-  private final WindowingStrategy<?, W> windowingStrategy;
-  private final StateInternalsFactory<K> stateInternalsFactory;
-  private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>,
W> reduceFn;
-  private final SparkRuntimeContext runtimeContext;
-  private final Aggregator<Long, Long> droppedDueToClosedWindow;
-
-
-  public SparkGroupAlsoByWindowFn(
-      WindowingStrategy<?, W> windowingStrategy,
-      StateInternalsFactory<K> stateInternalsFactory,
-      SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W>
reduceFn,
-      SparkRuntimeContext runtimeContext,
-      Accumulator<NamedAggregators> accumulator) {
-    this.windowingStrategy = windowingStrategy;
-    this.stateInternalsFactory = stateInternalsFactory;
-    this.reduceFn = reduceFn;
-    this.runtimeContext = runtimeContext;
-
-    droppedDueToClosedWindow = runtimeContext.createAggregator(
-        accumulator,
-        GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
-        Sum.ofLongs());
-  }
-
-  @Override
-  public Iterable<WindowedValue<KV<K, Iterable<InputT>>>> call(
-      WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>> windowedValue)
throws Exception {
-    K key = windowedValue.getValue().getKey();
-    Iterable<WindowedValue<InputT>> values = windowedValue.getValue().getValue();
-
-    //------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------//
-
-    // Used with Batch, we know that all the data is available for this key. We can't use
the
-    // timer manager from the context because it doesn't exist. So we create one and emulate
the
-    // watermark, knowing that we have all data and it is in timestamp order.
-    InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
-    timerInternals.advanceProcessingTime(Instant.now());
-    timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-    StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
-    GABWOutputWindowedValue<K, InputT> outputter = new GABWOutputWindowedValue<>();
-
-    ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner =
-        new ReduceFnRunner<>(
-            key,
-            windowingStrategy,
-            ExecutableTriggerStateMachine.create(
-                TriggerStateMachines.stateMachineForTrigger(
-                    Triggers.toProto(windowingStrategy.getTrigger()))),
-            stateInternals,
-            timerInternals,
-            outputter,
-            new SideInputReader() {
-              @Override
-              public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow)
{
-                throw new UnsupportedOperationException(
-                    "GroupAlsoByWindow must not have side inputs");
-              }
-
-              @Override
-              public <T> boolean contains(PCollectionView<T> view) {
-                throw new UnsupportedOperationException(
-                    "GroupAlsoByWindow must not have side inputs");
-              }
-
-              @Override
-              public boolean isEmpty() {
-                throw new UnsupportedOperationException(
-                    "GroupAlsoByWindow must not have side inputs");
-              }
-            },
-            droppedDueToClosedWindow,
-            reduceFn,
-            runtimeContext.getPipelineOptions());
-
-    // Process the grouped values.
-    reduceFnRunner.processElements(values);
-
-    // Finish any pending windows by advancing the input watermark to infinity.
-    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    // Finally, advance the processing time to infinity to fire any timers.
-    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-    timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
-
-    fireEligibleTimers(timerInternals, reduceFnRunner);
-
-    reduceFnRunner.persist();
-
-    return outputter.getOutputs();
-  }
-
-  private void fireEligibleTimers(InMemoryTimerInternals timerInternals,
-      ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner) throws Exception
{
-    List<TimerInternals.TimerData> timers = new ArrayList<>();
-    while (true) {
-      TimerInternals.TimerData timer;
-      while ((timer = timerInternals.removeNextEventTimer()) != null) {
-        timers.add(timer);
-      }
-      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
-        timers.add(timer);
-      }
-      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
-        timers.add(timer);
-      }
-      if (timers.isEmpty()) {
-        break;
-      }
-      reduceFnRunner.onTimers(timers);
-      timers.clear();
-    }
-  }
-
-  private static class GABWOutputWindowedValue<K, V>
-      implements OutputWindowedValue<KV<K, Iterable<V>>> {
-    private final List<WindowedValue<KV<K, Iterable<V>>>> outputs
= new ArrayList<>();
-
-    @Override
-    public void outputWindowedValue(
-        KV<K, Iterable<V>> output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo pane) {
-      outputs.add(WindowedValue.of(output, timestamp, windows, pane));
-    }
-
-    @Override
-    public <SideOutputT> void sideOutputWindowedValue(
-        TupleTag<SideOutputT> tag,
-        SideOutputT output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-      throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs.");
-    }
-
-    Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() {
-      return outputs;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/bf0c119b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
new file mode 100644
index 0000000..449e3b6
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn;
+import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.InMemoryTimerInternals;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateInternalsFactory;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.runners.spark.aggregators.NamedAggregators;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.Triggers;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.Accumulator;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.joda.time.Instant;
+
+/**
+ * An implementation of {@link GroupAlsoByWindowViaOutputBufferDoFn}
+ * for the Spark runner.
+ */
+public class SparkGroupAlsoByWindowViaOutputBufferFn<K, InputT, W extends BoundedWindow>
+    implements FlatMapFunction<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>,
+        WindowedValue<KV<K, Iterable<InputT>>>> {
+
+  private final WindowingStrategy<?, W> windowingStrategy;
+  private final StateInternalsFactory<K> stateInternalsFactory;
+  private final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>,
W> reduceFn;
+  private final SparkRuntimeContext runtimeContext;
+  private final Aggregator<Long, Long> droppedDueToClosedWindow;
+
+
+  public SparkGroupAlsoByWindowViaOutputBufferFn(
+      WindowingStrategy<?, W> windowingStrategy,
+      StateInternalsFactory<K> stateInternalsFactory,
+      SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W>
reduceFn,
+      SparkRuntimeContext runtimeContext,
+      Accumulator<NamedAggregators> accumulator) {
+    this.windowingStrategy = windowingStrategy;
+    this.stateInternalsFactory = stateInternalsFactory;
+    this.reduceFn = reduceFn;
+    this.runtimeContext = runtimeContext;
+
+    droppedDueToClosedWindow = runtimeContext.createAggregator(
+        accumulator,
+        GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
+        Sum.ofLongs());
+  }
+
+  @Override
+  public Iterable<WindowedValue<KV<K, Iterable<InputT>>>> call(
+      WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>> windowedValue)
throws Exception {
+    K key = windowedValue.getValue().getKey();
+    Iterable<WindowedValue<InputT>> values = windowedValue.getValue().getValue();
+
+    //------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------//
+
+    // Used with Batch, we know that all the data is available for this key. We can't use
the
+    // timer manager from the context because it doesn't exist. So we create one and emulate
the
+    // watermark, knowing that we have all data and it is in timestamp order.
+    InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
+    timerInternals.advanceProcessingTime(Instant.now());
+    timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+    StateInternals<K> stateInternals = stateInternalsFactory.stateInternalsForKey(key);
+    GABWOutputWindowedValue<K, InputT> outputter = new GABWOutputWindowedValue<>();
+
+    ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner =
+        new ReduceFnRunner<>(
+            key,
+            windowingStrategy,
+            ExecutableTriggerStateMachine.create(
+                TriggerStateMachines.stateMachineForTrigger(
+                    Triggers.toProto(windowingStrategy.getTrigger()))),
+            stateInternals,
+            timerInternals,
+            outputter,
+            new SideInputReader() {
+              @Override
+              public <T> T get(PCollectionView<T> view, BoundedWindow sideInputWindow)
{
+                throw new UnsupportedOperationException(
+                    "GroupAlsoByWindow must not have side inputs");
+              }
+
+              @Override
+              public <T> boolean contains(PCollectionView<T> view) {
+                throw new UnsupportedOperationException(
+                    "GroupAlsoByWindow must not have side inputs");
+              }
+
+              @Override
+              public boolean isEmpty() {
+                throw new UnsupportedOperationException(
+                    "GroupAlsoByWindow must not have side inputs");
+              }
+            },
+            droppedDueToClosedWindow,
+            reduceFn,
+            runtimeContext.getPipelineOptions());
+
+    // Process the grouped values.
+    reduceFnRunner.processElements(values);
+
+    // Finish any pending windows by advancing the input watermark to infinity.
+    timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    // Finally, advance the processing time to infinity to fire any timers.
+    timerInternals.advanceProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    timerInternals.advanceSynchronizedProcessingTime(BoundedWindow.TIMESTAMP_MAX_VALUE);
+
+    fireEligibleTimers(timerInternals, reduceFnRunner);
+
+    reduceFnRunner.persist();
+
+    return outputter.getOutputs();
+  }
+
+  private void fireEligibleTimers(InMemoryTimerInternals timerInternals,
+      ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner) throws Exception
{
+    List<TimerInternals.TimerData> timers = new ArrayList<>();
+    while (true) {
+      TimerInternals.TimerData timer;
+      while ((timer = timerInternals.removeNextEventTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      if (timers.isEmpty()) {
+        break;
+      }
+      reduceFnRunner.onTimers(timers);
+      timers.clear();
+    }
+  }
+
+  private static class GABWOutputWindowedValue<K, V>
+      implements OutputWindowedValue<KV<K, Iterable<V>>> {
+    private final List<WindowedValue<KV<K, Iterable<V>>>> outputs
= new ArrayList<>();
+
+    @Override
+    public void outputWindowedValue(
+        KV<K, Iterable<V>> output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      outputs.add(WindowedValue.of(output, timestamp, windows, pane));
+    }
+
+    @Override
+    public <SideOutputT> void sideOutputWindowedValue(
+        TupleTag<SideOutputT> tag,
+        SideOutputT output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+      throw new UnsupportedOperationException("GroupAlsoByWindow should not use side outputs.");
+    }
+
+    Iterable<WindowedValue<KV<K, Iterable<V>>>> getOutputs() {
+      return outputs;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/bf0c119b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
index 890a91b..7d83230 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TranslationUtils.java
@@ -129,7 +129,7 @@ public final class TranslationUtils {
   }
 
   /** {@link KV} to pair function. */
-  static <K, V> PairFunction<KV<K, V>, K, V> toPairFunction() {
+  public static <K, V> PairFunction<KV<K, V>, K, V> toPairFunction() {
     return new PairFunction<KV<K, V>, K, V>() {
       @Override
       public Tuple2<K, V> call(KV<K, V> kv) {


Mime
View raw message