beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] beam git commit: [BEAM-659] WindowFn#isCompatible should provide a meaningful reason
Date Thu, 18 May 2017 20:41:12 GMT
Repository: beam
Updated Branches:
  refs/heads/master 8ef812def -> 8c572ef0b


[BEAM-659] WindowFn#isCompatible should provide a meaningful reason


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a8d2125e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a8d2125e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a8d2125e

Branch: refs/heads/master
Commit: a8d2125e5783a556056e88dad8fe3c0a397920d5
Parents: 23731fe
Author: huafengw <fvunicorn@gmail.com>
Authored: Tue May 9 11:21:44 2017 +0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Thu May 18 13:27:45 2017 -0700

----------------------------------------------------------------------
 .../core/construction/PCollectionsTest.java     | 12 +++++++
 .../direct/WindowEvaluatorFactoryTest.java      | 10 ++++++
 .../apache/beam/sdk/testing/StaticWindows.java  | 12 +++++++
 .../transforms/windowing/CalendarWindows.java   | 36 +++++++++++++++++++
 .../sdk/transforms/windowing/FixedWindows.java  | 11 ++++++
 .../sdk/transforms/windowing/GlobalWindows.java | 11 ++++++
 .../windowing/IncompatibleWindowException.java  | 38 ++++++++++++++++++++
 .../transforms/windowing/InvalidWindows.java    | 11 ++++++
 .../beam/sdk/transforms/windowing/Sessions.java | 11 ++++++
 .../transforms/windowing/SlidingWindows.java    | 11 ++++++
 .../beam/sdk/transforms/windowing/WindowFn.java | 21 +++++++++++
 .../apache/beam/sdk/util/IdentityWindowFn.java  | 11 ++++++
 .../beam/sdk/testing/StaticWindowsTest.java     | 12 +++++++
 .../windowing/CalendarWindowsTest.java          | 24 +++++++++++++
 .../transforms/windowing/FixedWindowsTest.java  |  7 ++++
 .../sdk/transforms/windowing/SessionsTest.java  | 14 ++++++++
 .../windowing/SlidingWindowsTest.java           | 11 ++++++
 .../sdk/transforms/windowing/WindowTest.java    |  8 +++++
 .../sdk/util/IdentitySideInputWindowFn.java     |  4 +++
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 13 +++++++
 20 files changed, 288 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
index 2c45cbd..66700d0 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PCollectionsTest.java
@@ -44,6 +44,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -158,6 +159,17 @@ public class PCollectionsTest {
     }
 
     @Override
+    public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+      if (!this.isCompatible(other)) {
+        throw new IncompatibleWindowException(
+            other,
+            String.format(
+                "%s is only compatible with %s.",
+                CustomWindows.class.getSimpleName(), CustomWindows.class.getSimpleName()));
+      }
+    }
+
+    @Override
     public Coder<BoundedWindow> windowCoder() {
       return new AtomicCoder<BoundedWindow>() {
         @Override public void verifyDeterministic() {}

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index a91bab5..96fdfab 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -35,6 +35,7 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -304,6 +305,15 @@ public class WindowEvaluatorFactoryTest {
     }
 
     @Override
+    public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+      throw new IncompatibleWindowException(
+          other,
+          String.format(
+              "%s is not compatible with any other %s.",
+              EvaluatorTestWindowFn.class.getSimpleName(), WindowFn.class.getSimpleName()));
+    }
+
+    @Override
     public Coder<BoundedWindow> windowCoder() {
       @SuppressWarnings({"unchecked", "rawtypes"}) Coder coder =
           (Coder) GlobalWindow.Coder.INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
index fde1669..c11057a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.Objects;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
@@ -97,6 +98,17 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow>
{
   }
 
   @Override
+  public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+    if (!this.isCompatible(other)) {
+      throw new IncompatibleWindowException(
+          other,
+          String.format(
+              "Only %s objects with the same window supplier are compatible.",
+              StaticWindows.class.getSimpleName()));
+    }
+  }
+
+  @Override
   public Coder<BoundedWindow> windowCoder() {
     return coder;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java
index fada50a..989c431 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java
@@ -145,6 +145,18 @@ public class CalendarWindows {
     }
 
     @Override
+    public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+      if (!this.isCompatible(other)) {
+        throw new IncompatibleWindowException(
+            other,
+            String.format(
+                "Only %s objects with the same number of days, start date "
+                    + "and time zone are compatible.",
+                DaysWindows.class.getSimpleName()));
+      }
+    }
+
+    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
@@ -245,6 +257,18 @@ public class CalendarWindows {
     }
 
     @Override
+    public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+      if (!this.isCompatible(other)) {
+        throw new IncompatibleWindowException(
+            other,
+            String.format(
+                "Only %s objects with the same number of months, "
+                    + "day of month, start date and time zone are compatible.",
+                MonthsWindows.class.getSimpleName()));
+      }
+    }
+
+    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 
@@ -354,6 +378,18 @@ public class CalendarWindows {
     }
 
     @Override
+    public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+      if (!this.isCompatible(other)) {
+        throw new IncompatibleWindowException(
+            other,
+            String.format(
+                "Only %s objects with the same number of years, month of year, "
+                    + "day of month, start date and time zone are compatible.",
+                YearsWindows.class.getSimpleName()));
+      }
+    }
+
+    @Override
     public void populateDisplayData(DisplayData.Builder builder) {
       super.populateDisplayData(builder);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
index 8683a60..8b16916 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java
@@ -101,6 +101,17 @@ public class FixedWindows extends PartitioningWindowFn<Object, IntervalWindow>
{
     return this.equals(other);
   }
 
+  @Override
+  public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+    if (!this.isCompatible(other)) {
+      throw new IncompatibleWindowException(
+          other,
+          String.format(
+              "Only %s objects with the same size and offset are compatible.",
+              FixedWindows.class.getSimpleName()));
+    }
+  }
+
   public Duration getSize() {
     return size;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
index 400be1f..b49328b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
@@ -41,6 +41,17 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow>
{
   }
 
   @Override
+  public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+    if (!this.isCompatible(other)) {
+      throw new IncompatibleWindowException(
+          other,
+          String.format(
+              "%s is only compatible with %s.",
+              GlobalWindows.class.getSimpleName(), GlobalWindows.class.getSimpleName()));
+    }
+  }
+
+  @Override
   public Coder<GlobalWindow> windowCoder() {
     return GlobalWindow.Coder.INSTANCE;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
new file mode 100644
index 0000000..b7b96ad
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IncompatibleWindowException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+/**
+ * Exception thrown by {@link WindowFn#verifyCompatibility(WindowFn)} if two compared
+ * WindowFns are not compatible, including the explanation of incompatibility.
+ */
+public class IncompatibleWindowException extends Exception {
+  private WindowFn<?, ?> givenWindowFn;
+  private String reason;
+
+  public IncompatibleWindowException(WindowFn<?, ?> windowFn, String reason) {
+    this.givenWindowFn = windowFn;
+    this.reason = reason;
+  }
+
+  @Override
+  public String getMessage() {
+    String windowFn = givenWindowFn == null ? "null" : givenWindowFn.getClass().getSimpleName();
+    return String.format("The given WindowFn is %s. %s", windowFn, reason);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java
index 92041fc..a8084f4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/InvalidWindows.java
@@ -75,6 +75,17 @@ public class InvalidWindows<W extends BoundedWindow> extends WindowFn<Object,
W>
   }
 
   @Override
+  public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+    if (!this.isCompatible(other)) {
+      throw new IncompatibleWindowException(
+          other,
+          String.format(
+              "Only %s objects with the same originalWindowFn are compatible.",
+              InvalidWindows.class.getSimpleName()));
+    }
+  }
+
+  @Override
   public WindowMappingFn<W> getDefaultWindowMappingFn() {
     throw new UnsupportedOperationException("InvalidWindows is not allowed in side inputs");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
index 5cc7c65..115a964 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java
@@ -80,6 +80,17 @@ public class Sessions extends WindowFn<Object, IntervalWindow> {
   }
 
   @Override
+  public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+    if (!this.isCompatible(other)) {
+      throw new IncompatibleWindowException(
+          other,
+          String.format(
+              "%s is only compatible with %s.",
+              Sessions.class.getSimpleName(), Sessions.class.getSimpleName()));
+    }
+  }
+
+  @Override
   public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
     throw new UnsupportedOperationException("Sessions is not allowed in side inputs");
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
index 650dc37..f657884 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
@@ -148,6 +148,17 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow>
{
   }
 
   @Override
+  public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+    if (!this.isCompatible(other)) {
+      throw new IncompatibleWindowException(
+          other,
+          String.format(
+              "Only %s objects with the same size, period and offset are compatible.",
+              SlidingWindows.class.getSimpleName()));
+    }
+  }
+
+  @Override
   public void populateDisplayData(DisplayData.Builder builder) {
     super.populateDisplayData(builder);
     builder

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
index 5ebbb41..e329c1d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
@@ -114,10 +114,31 @@ public abstract class WindowFn<T, W extends BoundedWindow>
   /**
    * Returns whether this performs the same merging as the given
    * {@code WindowFn}.
+   *
+   * @deprecated please override verifyCompatibility to throw a useful error message;
+   *     we will remove isCompatible at version 3.0.0
    */
+  @Deprecated
   public abstract boolean isCompatible(WindowFn<?, ?> other);
 
   /**
+   * Throw {@link IncompatibleWindowException} if this WindowFn does not perform the same
merging as
+   * the given ${@code WindowFn}.
+   *
+   * @throws IncompatibleWindowException if compared WindowFns are not compatible.
+   */
+  public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+    if (!this.isCompatible(other)) {
+      throw new IncompatibleWindowException(
+          other,
+          String.format(
+              "%s is not compatible with %s",
+              this.getClass().getSimpleName(),
+              other.getClass().getSimpleName()));
+    }
+  }
+
+  /**
    * Returns the {@link Coder} used for serializing the windows used
    * by this windowFn.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
index a61e3a6..a4bfdda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -84,6 +85,16 @@ public class IdentityWindowFn<T> extends NonMergingWindowFn<T,
BoundedWindow> {
   }
 
   @Override
+  public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+    throw new UnsupportedOperationException(
+        String.format(
+            "%s.verifyCompatibility() should never be called."
+                + " It is a private implementation detail of sdk utilities."
+                + " This message indicates a bug in the Beam SDK.",
+            getClass().getCanonicalName()));
+  }
+
+  @Override
   public Coder<BoundedWindow> windowCoder() {
     // Safe because the prior WindowFn provides both the windows and the coder.
     // The Coder is _not_ actually a coder for an arbitrary BoundedWindow.

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
index 7ee48c8..2969ca6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat;
 import com.google.common.collect.ImmutableList;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.hamcrest.Matchers;
@@ -93,4 +94,15 @@ public class StaticWindowsTest {
     thrown.expectMessage("may not be empty");
     StaticWindows.of(GlobalWindow.Coder.INSTANCE, ImmutableList.<GlobalWindow>of());
   }
+
+  @Test
+  public void testCompatibility() throws IncompatibleWindowException {
+    StaticWindows staticWindows =
+        StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(first, second));
+    staticWindows.verifyCompatibility(
+        StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(first, second)));
+    thrown.expect(IncompatibleWindowException.class);
+    staticWindows.verifyCompatibility(
+        StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(first)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java
index cd562e9..c8c01f5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java
@@ -91,6 +91,14 @@ public class CalendarWindowsTest {
   }
 
   @Test
+  public void testDaysCompatibility() throws IncompatibleWindowException {
+    CalendarWindows.DaysWindows daysWindows = CalendarWindows.days(10);
+    daysWindows.verifyCompatibility(CalendarWindows.days(10));
+    thrown.expect(IncompatibleWindowException.class);
+    daysWindows.verifyCompatibility(CalendarWindows.days(9));
+  }
+
+  @Test
   public void testWeeks() throws Exception {
     Map<IntervalWindow, Set<String>> expected = new HashMap<>();
 
@@ -165,6 +173,14 @@ public class CalendarWindowsTest {
   }
 
   @Test
+  public void testMonthsCompatibility() throws IncompatibleWindowException {
+    CalendarWindows.MonthsWindows monthsWindows = CalendarWindows.months(10).beginningOnDay(15);
+    monthsWindows.verifyCompatibility(CalendarWindows.months(10).beginningOnDay(15));
+    thrown.expect(IncompatibleWindowException.class);
+    monthsWindows.verifyCompatibility(CalendarWindows.months(10).beginningOnDay(30));
+  }
+
+  @Test
   public void testMultiMonths() throws Exception {
     Map<IntervalWindow, Set<String>> expected = new HashMap<>();
 
@@ -239,6 +255,14 @@ public class CalendarWindowsTest {
   }
 
   @Test
+  public void testYearsCompatibility() throws IncompatibleWindowException {
+    CalendarWindows.YearsWindows yearsWindows = CalendarWindows.years(2017).beginningOnDay(1,
1);
+    yearsWindows.verifyCompatibility(CalendarWindows.years(2017).beginningOnDay(1, 1));
+    thrown.expect(IncompatibleWindowException.class);
+    yearsWindows.verifyCompatibility(CalendarWindows.years(2017).beginningOnDay(1, 2));
+  }
+
+  @Test
   public void testTimeZone() throws Exception {
     Map<IntervalWindow, Set<String>> expected = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
index 47c273a..80a534c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java
@@ -149,6 +149,13 @@ public class FixedWindowsTest {
   }
 
   @Test
+  public void testVerifyCompatibility() throws IncompatibleWindowException {
+    FixedWindows.of(new Duration(10)).verifyCompatibility(FixedWindows.of(new Duration(10)));
+    thrown.expect(IncompatibleWindowException.class);
+    FixedWindows.of(new Duration(10)).verifyCompatibility(FixedWindows.of(new Duration(20)));
+  }
+
+  @Test
   public void testValidOutputTimes() throws Exception {
     for (long timestamp : Arrays.asList(200, 800, 700)) {
       WindowFnTestUtils.validateGetOutputTimestamp(

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
index 9d94928..42c15b5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java
@@ -36,7 +36,9 @@ import org.apache.beam.sdk.testing.WindowFnTestUtils;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -45,6 +47,8 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class SessionsTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
 
   @Test
   public void testSimple() throws Exception {
@@ -106,6 +110,16 @@ public class SessionsTest {
             Sessions.withGapDuration(new Duration(20))));
   }
 
+  @Test
+  public void testVerifyCompatibility() throws IncompatibleWindowException {
+    Sessions.withGapDuration(new Duration(10))
+        .verifyCompatibility(Sessions.withGapDuration(new Duration(10)));
+
+    thrown.expect(IncompatibleWindowException.class);
+    Sessions.withGapDuration(new Duration(10))
+        .verifyCompatibility(FixedWindows.of(new Duration(10)));
+  }
+
   /**
    * Validates that the output timestamp for aggregate data falls within the acceptable range.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
index dd673d3..b14e221 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java
@@ -34,7 +34,9 @@ import org.apache.beam.sdk.testing.WindowFnTestUtils;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -43,6 +45,8 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class SlidingWindowsTest {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
 
   @Test
   public void testSimple() throws Exception {
@@ -153,6 +157,13 @@ public class SlidingWindowsTest {
   }
 
   @Test
+  public void testVerifyCompatibility() throws IncompatibleWindowException {
+    SlidingWindows.of(new Duration(10)).verifyCompatibility(SlidingWindows.of(new Duration(10)));
+    thrown.expect(IncompatibleWindowException.class);
+    SlidingWindows.of(new Duration(10)).verifyCompatibility(SlidingWindows.of(new Duration(20)));
+  }
+
+  @Test
   public void testDefaultWindowMappingFn() {
     // [40, 1040), [340, 1340), [640, 1640) ...
     SlidingWindows slidingWindows = SlidingWindows.of(new Duration(1000))

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 92f6a9c..f536a9a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -301,6 +301,14 @@ public class WindowTest implements Serializable {
     }
 
     @Override
+    public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+      if (!this.isCompatible(other)) {
+        throw new IncompatibleWindowException(
+            other, "WindowOddEvenBuckets is only compatible with WindowOddEvenBuckets.");
+      }
+    }
+
+    @Override
     public Coder<IntervalWindow> windowCoder() {
       return new IntervalWindow.IntervalWindowCoder();
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
index 2171466..32e23da 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
@@ -43,6 +44,9 @@ public class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer,
Bound
   }
 
   @Override
+  public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{}
+
+  @Override
   public Coder<BoundedWindow> windowCoder() {
     // not used
     return (Coder) GlobalWindow.Coder.INSTANCE;

http://git-wip-us.apache.org/repos/asf/beam/blob/a8d2125e/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index d60c721..ba0cea8 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -115,6 +115,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -698,6 +699,18 @@ public class BigQueryIOTest implements Serializable {
     }
 
     @Override
+    public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException
{
+      if (!this.isCompatible(other)) {
+        throw new IncompatibleWindowException(
+            other,
+            String.format(
+                "%s is only compatible with %s.",
+                PartitionedGlobalWindows.class.getSimpleName(),
+                PartitionedGlobalWindows.class.getSimpleName()));
+      }
+    }
+
+    @Override
     public Coder<PartitionedGlobalWindow> windowCoder() {
       return new PartitionedGlobalWindowCoder();
     }


Mime
View raw message