beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/2] incubator-beam git commit: Directly implement ReifyTimestampsAndWindows in SparkRunner
Date Fri, 28 Oct 2016 08:18:14 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 215980ad3 -> 9c3e3e7a3


Directly implement ReifyTimestampsAndWindows in SparkRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/597e3955
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/597e3955
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/597e3955

Branch: refs/heads/master
Commit: 597e3955c219a7c50df124a0689b99b98dfbbbc9
Parents: 215980a
Author: Kenneth Knowles <klk@google.com>
Authored: Thu Oct 27 22:18:19 2016 -0700
Committer: Sela <ansela@paypal.com>
Committed: Fri Oct 28 10:56:44 2016 +0300

----------------------------------------------------------------------
 .../translation/GroupCombineFunctions.java      |  5 +--
 .../ReifyTimestampsAndWindowsFunction.java      | 47 ++++++++++++++++++++
 2 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/597e3955/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
index e2a0f87..421b1b0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/GroupCombineFunctions.java
@@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.CombineWithContext;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.ReifyTimestampAndWindowsDoFn;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
@@ -48,7 +47,6 @@ import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
 import org.apache.spark.api.java.function.Function2;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
-
 import scala.Tuple2;
 
 
@@ -77,8 +75,7 @@ public class GroupCombineFunctions {
     // Use coders to convert objects in the PCollection to byte arrays, so they
     // can be transferred over the network for the shuffle.
     JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<V>>>>>
groupedByKey =
-        rdd.mapPartitions(new DoFnFunction<KV<K, V>, KV<K, WindowedValue<V>>>(null,
-                new ReifyTimestampAndWindowsDoFn<K, V>(), runtimeContext, null, null))
+        rdd.map(new ReifyTimestampsAndWindowsFunction<K, V>())
             .map(WindowingHelpers.<KV<K, WindowedValue<V>>>unwindowFunction())
             .mapToPair(TranslationUtils.<K, WindowedValue<V>>toPairFunction())
             .mapToPair(CoderHelpers.toByteFunction(keyCoder, wvCoder))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/597e3955/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java
new file mode 100644
index 0000000..8281c17
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/ReifyTimestampsAndWindowsFunction.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.translation;
+
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.spark.api.java.function.Function;
+
+/**
+ * Simple {@link Function} to bring the windowing information into the value from the implicit
+ * background representation of the {@link PCollection}.
+ */
+public class ReifyTimestampsAndWindowsFunction<K, V>
+    implements Function<WindowedValue<KV<K, V>>, WindowedValue<KV<K,
WindowedValue<V>>>> {
+  @Override
+  public WindowedValue<KV<K, WindowedValue<V>>> call(WindowedValue<KV<K,
V>> elem)
+      throws Exception {
+    return WindowedValue.of(
+        KV.of(
+            elem.getValue().getKey(),
+            WindowedValue.of(
+                elem.getValue().getValue(),
+                elem.getTimestamp(),
+                elem.getWindows(),
+                elem.getPane())),
+        elem.getTimestamp(),
+        elem.getWindows(),
+        elem.getPane());
+  }
+}


Mime
View raw message