beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] beam git commit: Do not Shift Timestamps in Reshuffle
Date Thu, 16 Mar 2017 23:40:02 GMT
Do not Shift Timestamps in Reshuffle

Explicitly reify input timestamps and restore them after the output of
Reshuffle.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/58cc3597
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/58cc3597
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/58cc3597

Branch: refs/heads/master
Commit: 58cc35970665af99a9ba95d3f28e0974149d8f72
Parents: 960f3e6
Author: Thomas Groh <tgroh@google.com>
Authored: Tue Mar 14 14:05:44 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Thu Mar 16 16:39:47 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/util/ReifyTimestamps.java   |  76 +++++++++++++
 .../org/apache/beam/sdk/util/Reshuffle.java     |  19 ++--
 .../beam/sdk/util/ReifyTimestampsTest.java      | 109 +++++++++++++++++++
 .../org/apache/beam/sdk/util/ReshuffleTest.java |  70 +++++++++++-
 4 files changed, 265 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/58cc3597/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestamps.java
new file mode 100644
index 0000000..3b291af
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReifyTimestamps.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+
+/**
+ * {@link PTransform PTransforms} for reifying the timestamp of values and reemitting the
original
+ * value with the original timestamp.
+ */
+public class ReifyTimestamps {
+  private ReifyTimestamps() {}
+
+  /**
+   * Create a {@link PTransform} that will output all input {@link KV KVs} with the timestamp
inside
+   * the value.
+   */
+  public static <K, V>
+      PTransform<PCollection<? extends KV<K, V>>, PCollection<KV<K,
TimestampedValue<V>>>>
+          inValues() {
+    return ParDo.of(new ReifyValueTimestampDoFn<K, V>());
+  }
+
+  /**
+   * Create a {@link PTransform} that consumes {@link KV KVs} with a {@link TimestampedValue}
as the
+   * value, and outputs the {@link KV} of the input key and value at the timestamp specified
by the
+   * {@link TimestampedValue}.
+   */
+  public static <K, V>
+      PTransform<PCollection<? extends KV<K, TimestampedValue<V>>>,
PCollection<KV<K, V>>>
+          extractFromValues() {
+    return ParDo.of(new ExtractTimestampedValueDoFn<K, V>());
+  }
+
+  private static class ReifyValueTimestampDoFn<K, V>
+      extends DoFn<KV<K, V>, KV<K, TimestampedValue<V>>> {
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      context.output(
+          KV.of(
+              context.element().getKey(),
+              TimestampedValue.of(context.element().getValue(), context.timestamp())));
+    }
+  }
+
+  private static class ExtractTimestampedValueDoFn<K, V>
+      extends DoFn<KV<K, TimestampedValue<V>>, KV<K, V>> {
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      KV<K, TimestampedValue<V>> kv = context.element();
+      context.outputWithTimestamp(
+          KV.of(kv.getKey(), kv.getValue().getValue()), kv.getValue().getTimestamp());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/58cc3597/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 4d86c74..e80bc17 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -22,9 +22,11 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
 
 /**
@@ -55,28 +57,31 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K,
V>>, PCollecti
     // If the input has already had its windows merged, then the GBK that performed the merge
     // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
     // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
+    // The OutputTimeFn is set to ensure the GroupByKey does not shift elements forwards
in time.
+    // Because this outputs as fast as possible, this should not hold the watermark.
     Window.Bound<KV<K, V>> rewindow =
-        Window.<KV<K, V>>into(
-                new IdentityWindowFn<>(
-                    originalStrategy.getWindowFn().windowCoder()))
+        Window.<KV<K, V>>into(new IdentityWindowFn<>(originalStrategy.getWindowFn().windowCoder()))
             .triggering(new ReshuffleTrigger<>())
             .discardingFiredPanes()
+            .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
             .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 
     return input.apply(rewindow)
-        .apply(GroupByKey.<K, V>create())
+        .apply("ReifyOriginalTimestamps", ReifyTimestamps.<K, V>inValues())
+        .apply(GroupByKey.<K, TimestampedValue<V>>create())
         // Set the windowing strategy directly, so that it doesn't get counted as the user
having
         // set allowed lateness.
         .setWindowingStrategyInternal(originalStrategy)
         .apply("ExpandIterable", ParDo.of(
-            new DoFn<KV<K, Iterable<V>>, KV<K, V>>() {
+            new DoFn<KV<K, Iterable<TimestampedValue<V>>>, KV<K,
TimestampedValue<V>>>() {
               @ProcessElement
               public void processElement(ProcessContext c) {
                 K key = c.element().getKey();
-                for (V value : c.element().getValue()) {
+                for (TimestampedValue<V> value : c.element().getValue()) {
                   c.output(KV.of(key, value));
                 }
               }
-            }));
+            }))
+        .apply("RestoreOriginalTimestamps", ReifyTimestamps.<K, V>extractFromValues());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/58cc3597/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
new file mode 100644
index 0000000..b78de8e
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReifyTimestampsTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ReifyTimestamps}.
+ */
+@RunWith(JUnit4.class)
+public class ReifyTimestampsTest implements Serializable {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void inValuesSucceeds() {
+    PCollection<KV<String, Integer>> timestamped =
+        pipeline
+            .apply(Create.of(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz",
3)))
+            .apply(
+                WithTimestamps.of(
+                    new SerializableFunction<KV<String, Integer>, Instant>()
{
+                      @Override
+                      public Instant apply(KV<String, Integer> input) {
+                        return new Instant(input.getValue().longValue());
+                      }
+                    }));
+
+    PCollection<KV<String, TimestampedValue<Integer>>> reified =
+        timestamped.apply(ReifyTimestamps.<String, Integer>inValues());
+
+    PAssert.that(reified)
+        .containsInAnyOrder(
+            KV.of("foo", TimestampedValue.of(0, new Instant(0))),
+            KV.of("foo", TimestampedValue.of(1, new Instant(1))),
+            KV.of("bar", TimestampedValue.of(2, new Instant(2))),
+            KV.of("baz", TimestampedValue.of(3, new Instant(3))));
+
+    pipeline.run();
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void extractFromValuesSucceeds() {
+    PCollection<KV<String, TimestampedValue<Integer>>> preified =
+        pipeline.apply(
+            Create.of(
+                KV.of("foo", TimestampedValue.of(0, new Instant((0)))),
+                KV.of("foo", TimestampedValue.of(1, new Instant(1))),
+                KV.of("bar", TimestampedValue.of(2, new Instant(2))),
+                KV.of("baz", TimestampedValue.of(3, new Instant(3)))));
+
+    PCollection<KV<String, Integer>> timestamped =
+        preified.apply(ReifyTimestamps.<String, Integer>extractFromValues());
+
+    PAssert.that(timestamped)
+        .containsInAnyOrder(KV.of("foo", 0), KV.of("foo", 1), KV.of("bar", 2), KV.of("baz",
3));
+
+    timestamped.apply(
+        "AssertElementTimestamps",
+        ParDo.of(
+            new DoFn<KV<String, Integer>, Void>() {
+              @ProcessElement
+              public void verifyTimestampsEqualValue(ProcessContext context) {
+                assertThat(
+                    new Instant(context.element().getValue().longValue()),
+                    equalTo(context.timestamp()));
+              }
+            }));
+
+    pipeline.run();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/58cc3597/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
index d47cddc..81a6d82 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTest.java
@@ -17,9 +17,12 @@
  */
 package org.apache.beam.sdk.util;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
 
 import com.google.common.collect.ImmutableList;
+import java.io.Serializable;
 import java.util.List;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -29,12 +32,19 @@ import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
 import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -45,7 +55,7 @@ import org.junit.runners.JUnit4;
  * Tests for {@link Reshuffle}.
  */
 @RunWith(JUnit4.class)
-public class ReshuffleTest {
+public class ReshuffleTest implements Serializable {
 
   private static final List<KV<String, Integer>> ARBITRARY_KVS = ImmutableList.of(
         KV.of("k1", 3),
@@ -66,7 +76,7 @@ public class ReshuffleTest {
         KV.of("k2", (Iterable<Integer>) ImmutableList.of(4)));
 
   @Rule
-  public final TestPipeline pipeline = TestPipeline.create();
+  public final transient TestPipeline pipeline = TestPipeline.create();
 
   @Test
   @Category(RunnableOnService.class)
@@ -88,6 +98,62 @@ public class ReshuffleTest {
     pipeline.run();
   }
 
+  /**
+   * Tests that timestamps are preserved after applying a {@link Reshuffle} with the default
+   * {@link WindowingStrategy}.
+   */
+  @Test
+  @Category(RunnableOnService.class)
+  public void testReshufflePreservesTimestamps() {
+    PCollection<KV<String, TimestampedValue<String>>> input =
+        pipeline
+            .apply(
+                Create.timestamped(
+                        TimestampedValue.of("foo", BoundedWindow.TIMESTAMP_MIN_VALUE),
+                        TimestampedValue.of("foo", new Instant(0)),
+                        TimestampedValue.of("bar", new Instant(33)),
+                        TimestampedValue.of("bar", GlobalWindow.INSTANCE.maxTimestamp()))
+                    .withCoder(StringUtf8Coder.of()))
+            .apply(
+                WithKeys.of(
+                    new SerializableFunction<String, String>() {
+                      @Override
+                      public String apply(String input) {
+                        return input;
+                      }
+                    }))
+            .apply("ReifyOriginalTimestamps", ReifyTimestamps.<String, String>inValues());
+
+    // The outer TimestampedValue is the reified timestamp post-reshuffle. The inner
+    // TimestampedValue is the pre-reshuffle timestamp.
+    PCollection<TimestampedValue<TimestampedValue<String>>> output =
+        input
+            .apply(Reshuffle.<String, TimestampedValue<String>>of())
+            .apply(
+                "ReifyReshuffledTimestamps",
+                ReifyTimestamps.<String, TimestampedValue<String>>inValues())
+            .apply(Values.<TimestampedValue<TimestampedValue<String>>>create());
+
+    PAssert.that(output)
+        .satisfies(
+            new SerializableFunction<Iterable<TimestampedValue<TimestampedValue<String>>>,
Void>() {
+              @Override
+              public Void apply(Iterable<TimestampedValue<TimestampedValue<String>>>
input) {
+                for (TimestampedValue<TimestampedValue<String>> elem : input)
{
+                  Instant originalTimestamp = elem.getValue().getTimestamp();
+                  Instant afterReshuffleTimestamp = elem.getTimestamp();
+                  assertThat(
+                      "Reshuffle must preserve element timestamps",
+                      afterReshuffleTimestamp,
+                      equalTo(originalTimestamp));
+                }
+                return null;
+              }
+            });
+
+    pipeline.run();
+  }
+
   @Test
   @Category(RunnableOnService.class)
   public void testReshuffleAfterSessionsAndGroupByKey() {


Mime
View raw message