beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [33/51] [abbrv] incubator-beam git commit: Require TimeDomain to delete a timer
Date Wed, 21 Dec 2016 22:50:07 GMT
Require TimeDomain to delete a timer


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35a02740
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35a02740
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35a02740

Branch: refs/heads/python-sdk
Commit: 35a02740748182ee52729d8bfb621a3c342b8312
Parents: 0d0a5e2
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Dec 20 20:09:25 2016 -0800
Committer: Kenneth Knowles <klk@google.com>
Committed: Wed Dec 21 08:20:28 2016 -0800

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java          |  8 ++++++++
 .../beam/runners/core/InMemoryTimerInternals.java  |  8 ++++++++
 .../beam/runners/direct/DirectTimerInternals.java  |  8 ++++++++
 .../wrappers/streaming/WindowDoFnOperator.java     |  9 +++++++++
 .../org/apache/beam/sdk/util/TimerInternals.java   | 17 +++++++++++++++--
 5 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 48ac177..49ec1c8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -425,12 +425,19 @@ public class ApexGroupByKeyOperator<K, V> implements Operator
{
    */
   public class ApexTimerInternals implements TimerInternals {
 
+    @Deprecated
     @Override
     public void setTimer(TimerData timerData) {
       registerActiveTimer(context.element().key(), timerData);
     }
 
     @Override
+    public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain)
{
+      throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
+    }
+
+    @Deprecated
+    @Override
     public void deleteTimer(TimerData timerKey) {
       unregisterActiveTimer(context.element().key(), timerKey);
     }
@@ -463,6 +470,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
       throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
     }
 
+    @Deprecated
     @Override
     public void deleteTimer(StateNamespace namespace, String timerId) {
       throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 5fcd088..5ddd5a7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -107,6 +107,7 @@ public class InMemoryTimerInternals implements TimerInternals {
     throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
   }
 
+  @Deprecated
   @Override
   public void setTimer(TimerData timerData) {
     WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
@@ -116,10 +117,17 @@ public class InMemoryTimerInternals implements TimerInternals {
   }
 
   @Override
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain)
{
+    throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
+  }
+
+  @Deprecated
+  @Override
   public void deleteTimer(StateNamespace namespace, String timerId) {
     throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
   }
 
+  @Deprecated
   @Override
   public void deleteTimer(TimerData timer) {
     WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 8970b4b..5ca276d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -52,16 +52,24 @@ class DirectTimerInternals implements TimerInternals {
     throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
   }
 
+  @Deprecated
   @Override
   public void setTimer(TimerData timerData) {
     timerUpdateBuilder.setTimer(timerData);
   }
 
   @Override
+  public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain)
{
+    throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
+  }
+
+  @Deprecated
+  @Override
   public void deleteTimer(StateNamespace namespace, String timerId) {
     throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
   }
 
+  @Deprecated
   @Override
   public void deleteTimer(TimerData timerKey) {
     timerUpdateBuilder.deletedTimer(timerKey);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 9cea529..5398d7b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -455,6 +455,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
           throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
         }
 
+        @Deprecated
         @Override
         public void setTimer(TimerData timerKey) {
           if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
@@ -468,11 +469,19 @@ public class WindowDoFnOperator<K, InputT, OutputT>
         }
 
         @Override
+        public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain)
{
+          throw new UnsupportedOperationException(
+              "Canceling of a timer by ID is not yet supported.");
+        }
+
+        @Deprecated
+        @Override
         public void deleteTimer(StateNamespace namespace, String timerId) {
           throw new UnsupportedOperationException(
               "Canceling of a timer by ID is not yet supported.");
         }
 
+        @Deprecated
         @Override
         public void deleteTimer(TimerData timerKey) {
           if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index c3e498e..0bfcddc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StandardCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.util.state.StateNamespaces;
@@ -61,18 +62,30 @@ public interface TimerInternals {
   void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain);
 
   /**
-   * Sets the timer described by {@code timerData}.
+   * @deprecated use {@link #setTimer(StateNamespace, String, Instant, TimeDomain)}.
    */
+  @Deprecated
   void setTimer(TimerData timerData);
 
   /**
    * Deletes the given timer.
+   *
+   * <p>A timer's ID is enforced to be unique in validation of a {@link DoFn}, but
runners
+   * often manage timers for different time domains in very different ways, thus the
+   * {@link TimeDomain} is a required parameter.
+   */
+  void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain);
+
+  /**
+   * @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}.
    */
+  @Deprecated
   void deleteTimer(StateNamespace namespace, String timerId);
 
   /**
-   * Deletes the timer with the ID contained in the provided {@link TimerData}.
+   * @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}.
    */
+  @Deprecated
   void deleteTimer(TimerData timerKey);
 
   /**


Mime
View raw message