beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/3] incubator-beam git commit: [flink] improve readability of processElement function
Date Thu, 31 Mar 2016 12:01:30 GMT
[flink] improve readability of processElement function


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63a7c3d0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63a7c3d0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63a7c3d0

Branch: refs/heads/master
Commit: 63a7c3d0cb51caf65dc82141671cf28d47c2be39
Parents: 033b924
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed Mar 30 12:02:01 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Wed Mar 30 12:05:04 2016 +0200

----------------------------------------------------------------------
 .../streaming/FlinkGroupAlsoByWindowWrapper.java    | 16 ++++++++++++----
 1 file changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/63a7c3d0/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index 751d44c..3dc5a79 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -258,10 +258,18 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT>
 
   @Override
   public void processElement(StreamRecord<WindowedValue<KV<K, VIN>>> element)
throws Exception {
-    ArrayList<WindowedValue<VIN>> elements = new ArrayList<>();
-    elements.add(WindowedValue.of(element.getValue().getValue().getValue(), element.getValue().getTimestamp(),
-        element.getValue().getWindows(), element.getValue().getPane()));
-    processKeyedWorkItem(KeyedWorkItems.elementsWorkItem(element.getValue().getValue().getKey(),
elements));
+    final WindowedValue<KV<K, VIN>> windowedValue = element.getValue();
+    final KV<K, VIN> kv = windowedValue.getValue();
+
+    final WindowedValue<VIN> updatedWindowedValue = WindowedValue.of(kv.getValue(),
+        windowedValue.getTimestamp(),
+        windowedValue.getWindows(),
+        windowedValue.getPane());
+
+    processKeyedWorkItem(
+        KeyedWorkItems.elementsWorkItem(
+            kv.getKey(),
+            Collections.singletonList(updatedWindowedValue)));
   }
 
   @Override


Mime
View raw message