beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: Do not ever shrink allowed lateness
Date Wed, 24 May 2017 18:25:28 GMT
Repository: beam
Updated Branches:
  refs/heads/master 924dd6335 -> 6dd5585b3


Do not ever shrink allowed lateness

This does not seem to have any use except to cause data loss.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7fb2938e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7fb2938e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7fb2938e

Branch: refs/heads/master
Commit: 7fb2938e66e48160e9df878ccf9d0c2a67790151
Parents: 924dd63
Author: Borisa Zivkovic <borisa.zivkovic@huawei.com>
Authored: Tue May 16 14:16:07 2017 +0100
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed May 24 11:15:39 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/windowing/Window.java   |  6 +++-
 .../sdk/transforms/windowing/WindowTest.java    | 36 ++++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7fb2938e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index dc4863b..105ebfb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.windowing;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Ordering;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
@@ -141,6 +142,7 @@ import org.joda.time.Duration;
  */
 @AutoValue
 public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T>>
 {
+
   /**
    * Specifies the conditions under which a final pane will be created when a window is permanently
    * closed.
@@ -313,7 +315,8 @@ public abstract class Window<T> extends PTransform<PCollection<T>,
PCollection<T
       result = result.withMode(getAccumulationMode());
     }
     if (getAllowedLateness() != null) {
-      result = result.withAllowedLateness(getAllowedLateness());
+      result = result.withAllowedLateness(Ordering.natural().max(getAllowedLateness(),
+          inputStrategy.getAllowedLateness()));
     }
     if (getClosingBehavior() != null) {
       result = result.withClosingBehavior(getClosingBehavior());
@@ -366,6 +369,7 @@ public abstract class Window<T> extends PTransform<PCollection<T>,
PCollection<T
 
     WindowingStrategy<?, ?> outputStrategy =
         getOutputStrategyInternal(input.getWindowingStrategy());
+
     if (getWindowFn() == null) {
       // A new PCollection must be created in case input is reused in a different location
as the
       // two PCollections will, in general, have a different windowing strategy.

http://git-wip-us.apache.org/repos/asf/beam/blob/7fb2938e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index f536a9a..65af7a1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -165,6 +165,42 @@ public class WindowTest implements Serializable {
     assertEquals(fixed25, strategy.getWindowFn());
   }
 
+  @Test
+  public void testWindowIntoAssignesLongerAllowedLateness() {
+
+    FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
+    FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25));
+
+    PCollection<String> notChanged = pipeline
+        .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
+        .apply("WindowInto25", Window.<String>into(fixed25)
+            .withAllowedLateness(Duration.standardDays(1))
+            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
+            .accumulatingFiredPanes())
+        .apply("WindowInto10", Window.<String>into(fixed10)
+            .withAllowedLateness(Duration.standardDays(2)));
+
+    assertEquals(Duration.standardDays(2), notChanged.getWindowingStrategy()
+        .getAllowedLateness());
+
+    PCollection<String> data = pipeline
+        .apply("createChanged", Create.of("hello", "world").withCoder(StringUtf8Coder.of()));
+
+    PCollection<String> longWindow = data.apply("WindowInto25c", Window.<String>into(fixed25)
+            .withAllowedLateness(Duration.standardDays(1))
+            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
+            .accumulatingFiredPanes());
+
+    assertEquals(Duration.standardDays(1), longWindow.getWindowingStrategy()
+        .getAllowedLateness());
+
+    PCollection<String> autoCorrectedWindow = longWindow.apply("WindowInto10c",
+        Window.<String>into(fixed10).withAllowedLateness(Duration.standardHours(1)));
+
+    assertEquals(Duration.standardDays(1), autoCorrectedWindow.getWindowingStrategy()
+        .getAllowedLateness());
+  }
+
   /**
    * With {@link #testWindowIntoNullWindowFnNoAssign()}, demonstrates that the expansions
of the
    * {@link Window} transform depends on if it actually assigns elements to windows.


Mime
View raw message