beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bchamb...@apache.org
Subject [1/2] incubator-beam git commit: Make IdentityWindowFn and NeverTrigger available
Date Thu, 14 Apr 2016 22:23:04 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 6511ba28e -> 5bdea1e2b


Make IdentityWindowFn and NeverTrigger available

This will be used as part of the new PAssert

IdentityWindowFn remains package-private to restrict usage.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9a1efae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9a1efae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9a1efae

Branch: refs/heads/master
Commit: c9a1efae25455af1e47675938e296c716ec0fa0f
Parents: 6511ba2
Author: Thomas Groh <tgroh@google.com>
Authored: Thu Mar 31 14:34:06 2016 -0700
Committer: bchambers <bchambers@google.com>
Committed: Thu Apr 14 15:09:47 2016 -0700

----------------------------------------------------------------------
 .../transforms/windowing/AfterWatermark.java    |  46 +-------
 .../beam/sdk/transforms/windowing/Never.java    |  76 ++++++++++++
 .../apache/beam/sdk/util/IdentityWindowFn.java  | 116 +++++++++++++++++++
 .../org/apache/beam/sdk/util/Reshuffle.java     |  80 ++-----------
 .../sdk/transforms/windowing/NeverTest.java     |  56 +++++++++
 5 files changed, 259 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
index 5aca093..05c6eb8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
 import org.apache.beam.sdk.util.ExecutableTrigger;
 import org.apache.beam.sdk.util.TimeDomain;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
 import org.joda.time.Instant;
@@ -96,43 +95,6 @@ public class AfterWatermark {
     TriggerBuilder withEarlyFirings(OnceTrigger earlyTrigger);
   }
 
-  /**
-   * A trigger which never fires. Used for the "early" trigger when only a late trigger was
-   * specified.
-   */
-  private static class NeverTrigger extends OnceTrigger {
-
-    protected NeverTrigger() {
-      super(null);
-    }
-
-    @Override
-    public void onElement(OnElementContext c) throws Exception { }
-
-    @Override
-    public void onMerge(OnMergeContext c) throws Exception { }
-
-    @Override
-    protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-      return this;
-    }
-
-    @Override
-    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-      return BoundedWindow.TIMESTAMP_MAX_VALUE;
-    }
-
-    @Override
-    public boolean shouldFire(Trigger.TriggerContext context) throws Exception {
-      return false;
-    }
-
-    @Override
-    protected void onOnlyFiring(Trigger.TriggerContext context) throws Exception {
-      throw new UnsupportedOperationException(
-          String.format("%s should never fire", getClass().getSimpleName()));
-    }
-  }
 
   private static class AfterWatermarkEarlyAndLate
       extends Trigger
@@ -314,8 +276,7 @@ public class AfterWatermark {
      * the given {@code Trigger} fires before the watermark has passed the end of the window.
      */
     public AfterWatermarkEarly withEarlyFirings(OnceTrigger earlyFirings) {
-      Preconditions.checkNotNull(earlyFirings,
-          "Must specify the trigger to use for early firings");
+      checkNotNull(earlyFirings, "Must specify the trigger to use for early firings");
       return new AfterWatermarkEarlyAndLate(earlyFirings, null);
     }
 
@@ -324,9 +285,8 @@ public class AfterWatermark {
      * the given {@code Trigger} fires after the watermark has passed the end of the window.
      */
     public AfterWatermarkLate withLateFirings(OnceTrigger lateFirings) {
-      Preconditions.checkNotNull(lateFirings,
-          "Must specify the trigger to use for late firings");
-      return new AfterWatermarkEarlyAndLate(new NeverTrigger(), lateFirings);
+      checkNotNull(lateFirings, "Must specify the trigger to use for late firings");
+      return new AfterWatermarkEarlyAndLate(Never.ever(), lateFirings);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
new file mode 100644
index 0000000..809e841
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.joda.time.Instant;
+
+import java.util.List;
+
+/**
+ * A trigger which never fires.
+ *
+ * <p>
+ * Using this trigger will only produce output when the watermark passes the end of the
+ * {@link BoundedWindow window} plus the {@link Window#withAllowedLateness() allowed
+ * lateness}.
+ */
+public final class Never {
+  /**
+   * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey}
+   * when the {@link BoundedWindow} closes.
+   */
+  public static OnceTrigger ever() {
+    // NeverTrigger ignores all inputs and is Window-type independent.
+    return new NeverTrigger();
+  }
+
+  private static class NeverTrigger extends OnceTrigger {
+    protected NeverTrigger() {
+      super(null);
+    }
+
+    @Override
+    public void onElement(OnElementContext c) {}
+
+    @Override
+    public void onMerge(OnMergeContext c) {}
+
+    @Override
+    protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+      return this;
+    }
+
+    @Override
+    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+      return BoundedWindow.TIMESTAMP_MAX_VALUE;
+    }
+
+    @Override
+    public boolean shouldFire(Trigger.TriggerContext context) {
+      return false;
+    }
+
+    @Override
+    protected void onOnlyFiring(Trigger.TriggerContext context) {
+      throw new UnsupportedOperationException(
+          String.format("%s should never fire", getClass().getSimpleName()));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
new file mode 100644
index 0000000..91e5609
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+/**
+ * A {@link WindowFn} that leaves all associations between elements and windows unchanged.
+ *
+ * <p>This {@link WindowFn} is applied when elements must be passed through a {@link
GroupByKey},
+ * but should maintain their existing {@link Window} assignments. Because windows may have
been
+ * merged, the earlier {@link WindowFn} may not appropriately maintain the existing window
+ * assignments. For example, if the earlier {@link WindowFn} merges windows, after a
+ * {@link GroupByKey} the {@link WindowingStrategy} uses {@link InvalidWindows}, and no further
+ * {@link GroupByKey} can be applied without applying a new {@link WindowFn}. This {@link
WindowFn}
+ * allows existing window assignments to be maintained across a single group by key, at which
point
+ * the earlier {@link WindowingStrategy} should be restored.
+ *
+ * <p>This {@link WindowFn} is an internal implementation detail of sdk-provided utilities,
and
+ * should not be used by {@link Pipeline} writers.
+ */
+class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
+
+  /**
+   * The coder of the type of windows of the input {@link PCollection}. This is not an arbitrary
+   * {@link BoundedWindow} {@link Coder}, but is safe to use for all windows assigned by
this
+   * transform, as it should be the same coder used by the {@link WindowFn} that initially
assigned
+   * these windows.
+   */
+  private final Coder<BoundedWindow> coder;
+  private final boolean assignsToSingleWindow;
+
+  public IdentityWindowFn(Coder<? extends BoundedWindow> coder, boolean assignsToSingleWindow)
{
+    // Safe because it is only used privately here.
+    // At every point where a window is returned or accepted, it has been provided
+    // by priorWindowFn, so it is of the expected type.
+    @SuppressWarnings("unchecked")
+    Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) coder;
+    this.coder = windowCoder;
+    this.assignsToSingleWindow = assignsToSingleWindow;
+  }
+
+  @Override
+  public Collection<BoundedWindow> assignWindows(WindowFn<T, BoundedWindow>.AssignContext
c)
+      throws Exception {
+    // The windows are provided by priorWindowFn, which also provides the coder for them
+    @SuppressWarnings("unchecked")
+    Collection<BoundedWindow> priorWindows = (Collection<BoundedWindow>) c.windows();
+    return priorWindows;
+  }
+
+  @Override
+  public boolean isCompatible(WindowFn<?, ?> other) {
+    throw new UnsupportedOperationException(
+        String.format(
+            "%s.isCompatible() should never be called."
+                + " It is a private implementation detail of sdk utilities."
+                + " This message indicates a bug in the Beam SDK.",
+            getClass().getCanonicalName()));
+  }
+
+  @Override
+  public Coder<BoundedWindow> windowCoder() {
+    // Safe because the previous WindowFn provides both the windows and the coder.
+    // The Coder is _not_ actually a coder for an arbitrary BoundedWindow.
+    return coder;
+  }
+
+  @Override
+  public boolean assignsToSingleWindow() {
+    return assignsToSingleWindow;
+  }
+
+  @Override
+  public BoundedWindow getSideInputWindow(BoundedWindow window) {
+    throw new UnsupportedOperationException(
+        String.format(
+            "%s.getSideInputWindow() should never be called."
+                + " It is a private implementation detail of sdk utilities."
+                + " This message indicates a bug in the Beam SDK.",
+            getClass().getCanonicalName()));
+  }
+
+  @Deprecated
+  @Override
+  public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) {
+    return inputTimestamp;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
index 09b2222..5c91326 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java
@@ -17,22 +17,15 @@
  */
 package org.apache.beam.sdk.util;
 
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
-
 import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.Collection;
 
 /**
  * A {@link PTransform} that returns a {@link PCollection} equivalent to its input but operationally
@@ -62,11 +55,14 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K,
V>>, PCollecti
     // If the input has already had its windows merged, then the GBK that performed the merge
     // will have set originalStrategy.getWindowFn() to InvalidWindows, causing the GBK contained
     // here to fail. Instead, we install a valid WindowFn that leaves all windows unchanged.
-    Window.Bound<KV<K, V>> rewindow = Window
-        .<KV<K, V>>into(new PassThroughWindowFn<>(originalStrategy.getWindowFn()))
-        .triggering(new ReshuffleTrigger<>())
-        .discardingFiredPanes()
-        .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
+    Window.Bound<KV<K, V>> rewindow =
+        Window.<KV<K, V>>into(
+                new IdentityWindowFn<>(
+                    originalStrategy.getWindowFn().windowCoder(),
+                    originalStrategy.getWindowFn().assignsToSingleWindow()))
+            .triggering(new ReshuffleTrigger<>())
+            .discardingFiredPanes()
+            .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()));
 
     return input.apply(rewindow)
         .apply(GroupByKey.<K, V>create())
@@ -84,64 +80,4 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K,
V>>, PCollecti
               }
             }));
   }
-
-  /**
-   * A {@link WindowFn} that leaves all associations between elements and windows unchanged.
-   *
-   * <p>In order to implement all the abstract methods of {@link WindowFn}, this requires
the
-   * prior {@link WindowFn}, to which all auxiliary functionality is delegated.
-   */
-  private static class PassThroughWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow>
{
-
-    /** The WindowFn prior to this. Used for its windowCoder, etc. */
-    private final WindowFn<?, BoundedWindow> priorWindowFn;
-
-    public PassThroughWindowFn(WindowFn<?, ?> priorWindowFn) {
-      // Safe because it is only used privately here.
-      // At every point where a window is returned or accepted, it has been provided
-      // by priorWindowFn, so it is of the type expected.
-      @SuppressWarnings("unchecked")
-      WindowFn<?, BoundedWindow> internalWindowFn = (WindowFn<?, BoundedWindow>)
priorWindowFn;
-      this.priorWindowFn = internalWindowFn;
-    }
-
-    @Override
-    public Collection<BoundedWindow> assignWindows(WindowFn<T, BoundedWindow>.AssignContext
c)
-        throws Exception {
-      // The windows are provided by priorWindowFn, which also provides the coder for them
-      @SuppressWarnings("unchecked")
-      Collection<BoundedWindow> priorWindows = (Collection<BoundedWindow>) c.windows();
-      return priorWindows;
-    }
-
-    @Override
-    public boolean isCompatible(WindowFn<?, ?> other) {
-      throw new UnsupportedOperationException(
-          String.format("%s.isCompatible() should never be called."
-              + " It is a private implementation detail of Reshuffle."
-              + " This message indicates a bug in the Dataflow SDK.",
-              getClass().getCanonicalName()));
-    }
-
-    @Override
-    public Coder<BoundedWindow> windowCoder() {
-      // Safe because priorWindowFn provides the windows also.
-      // The Coder is _not_ actually a coder for an arbitrary BoundedWindow.
-      return priorWindowFn.windowCoder();
-    }
-
-    @Override
-    public BoundedWindow getSideInputWindow(BoundedWindow window) {
-      throw new UnsupportedOperationException(
-          String.format("%s.getSideInputWindow() should never be called."
-              + " It is a private implementation detail of Reshuffle."
-              + " This message indicates a bug in the Dataflow SDK.",
-              getClass().getCanonicalName()));
-    }
-
-    @Override
-    public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) {
-      return inputTimestamp;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9a1efae/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
new file mode 100644
index 0000000..222fe4e
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/NeverTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.windowing;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.sdk.util.TriggerTester;
+import org.apache.beam.sdk.util.TriggerTester.SimpleTriggerTester;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link Never}.
+ */
+@RunWith(JUnit4.class)
+public class NeverTest {
+  private SimpleTriggerTester<IntervalWindow> triggerTester;
+
+  @Before
+  public void setup() throws Exception {
+    triggerTester =
+        TriggerTester.forTrigger(
+            Never.<IntervalWindow>ever(), FixedWindows.of(Duration.standardMinutes(5)));
+  }
+
+  @Test
+  public void falseAfterEndOfWindow() throws Exception {
+    triggerTester.injectElements(TimestampedValue.of(1, new Instant(1)));
+    IntervalWindow window =
+        new IntervalWindow(new Instant(0), new Instant(0).plus(Duration.standardMinutes(5)));
+    assertThat(triggerTester.shouldFire(window), is(false));
+    triggerTester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE);
+    assertThat(triggerTester.shouldFire(window), is(false));
+  }
+}


Mime
View raw message