beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/2] beam git commit: [BEAM-1395] Remove chunking.
Date Mon, 06 Feb 2017 20:04:35 GMT
Repository: beam
Updated Branches:
  refs/heads/master 1e49ee8f2 -> e0189f352


[BEAM-1395] Remove chunking.

fixup! formatting.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/71197ae6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/71197ae6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/71197ae6

Branch: refs/heads/master
Commit: 71197ae61637d9f6317bd5abd7194d67044fee9d
Parents: 1e49ee8
Author: Sela <ansela@paypal.com>
Authored: Mon Feb 6 10:09:35 2017 +0200
Committer: Sela <ansela@paypal.com>
Committed: Mon Feb 6 21:38:51 2017 +0200

----------------------------------------------------------------------
 .../translation/SparkGroupAlsoByWindowFn.java   | 53 +++++++-------------
 1 file changed, 18 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/71197ae6/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
index 34eea65..9d84481 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java
@@ -18,7 +18,6 @@
 
 package org.apache.beam.runners.spark.translation;
 
-import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -86,7 +85,7 @@ public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow>
   public Iterable<WindowedValue<KV<K, Iterable<InputT>>>> call(
       WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>> windowedValue)
throws Exception {
     K key = windowedValue.getValue().getKey();
-    Iterable<WindowedValue<InputT>> inputs = windowedValue.getValue().getValue();
+    Iterable<WindowedValue<InputT>> values = windowedValue.getValue().getValue();
 
     //------ based on GroupAlsoByWindowsViaOutputBufferDoFn ------//
 
@@ -131,24 +130,8 @@ public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow>
             reduceFn,
             runtimeContext.getPipelineOptions());
 
-    Iterable<List<WindowedValue<InputT>>> chunks = Iterables.partition(inputs,
1000);
-    for (Iterable<WindowedValue<InputT>> chunk : chunks) {
-      // Process the chunk of elements.
-      reduceFnRunner.processElements(chunk);
-
-      // Then, since elements are sorted by their timestamp, advance the input watermark
-      // to the first element.
-      timerInternals.advanceInputWatermark(chunk.iterator().next().getTimestamp());
-      // Advance the processing times.
-      timerInternals.advanceProcessingTime(Instant.now());
-      timerInternals.advanceSynchronizedProcessingTime(Instant.now());
-
-      // Fire all the eligible timers.
-      fireEligibleTimers(timerInternals, reduceFnRunner);
-
-      // Leave the output watermark undefined. Since there's no late data in batch mode
-      // there's really no need to track it as we do for streaming.
-    }
+    // Process the grouped values.
+    reduceFnRunner.processElements(values);
 
     // Finish any pending windows by advancing the input watermark to infinity.
     timerInternals.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -168,21 +151,21 @@ public class SparkGroupAlsoByWindowFn<K, InputT, W extends BoundedWindow>
       ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner) throws Exception
{
     List<TimerInternals.TimerData> timers = new ArrayList<>();
     while (true) {
-        TimerInternals.TimerData timer;
-        while ((timer = timerInternals.removeNextEventTimer()) != null) {
-          timers.add(timer);
-        }
-        while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
-          timers.add(timer);
-        }
-        while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null)
{
-          timers.add(timer);
-        }
-        if (timers.isEmpty()) {
-          break;
-        }
-        reduceFnRunner.onTimers(timers);
-        timers.clear();
+      TimerInternals.TimerData timer;
+      while ((timer = timerInternals.removeNextEventTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      while ((timer = timerInternals.removeNextSynchronizedProcessingTimer()) != null) {
+        timers.add(timer);
+      }
+      if (timers.isEmpty()) {
+        break;
+      }
+      reduceFnRunner.onTimers(timers);
+      timers.clear();
     }
   }
 


Mime
View raw message