flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [09/11] flink git commit: [FLINK-4877] Rename Triggerable to ProcessingTimeCallback
Date Fri, 21 Oct 2016 17:14:24 GMT
[FLINK-4877] Rename Triggerable to ProcessingTimeCallback

This more accurately describes what the interface is for.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/94a3f251
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/94a3f251
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/94a3f251

Branch: refs/heads/master
Commit: 94a3f251cd3eed54c7d8220db119eecbfb11c3b9
Parents: 81b19e5
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Oct 18 11:08:58 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Oct 21 19:03:05 2016 +0200

----------------------------------------------------------------------
 .../connectors/fs/bucketing/BucketingSink.java  |  4 +-
 .../kafka/internals/AbstractFetcher.java        |  4 +-
 .../api/operators/HeapInternalTimerService.java |  4 +-
 .../api/operators/StreamSourceContexts.java     |  4 +-
 .../operators/ExtractTimestampsOperator.java    |  3 +-
 ...TimestampsAndPeriodicWatermarksOperator.java |  3 +-
 .../runtime/operators/Triggerable.java          | 40 --------------------
 ...ractAlignedProcessingTimeWindowOperator.java |  4 +-
 .../runtime/tasks/ProcessingTimeCallback.java   | 40 ++++++++++++++++++++
 .../runtime/tasks/ProcessingTimeService.java    | 12 +++---
 .../tasks/SystemProcessingTimeService.java      |  7 ++--
 .../tasks/TestProcessingTimeService.java        | 16 ++++----
 .../runtime/operators/StreamTaskTimerTest.java  | 19 +++++-----
 .../TestProcessingTimeServiceTest.java          |  5 ++-
 .../tasks/SystemProcessingTimeServiceTest.java  | 19 +++++-----
 .../runtime/StreamTaskTimerITCase.java          |  6 +--
 16 files changed, 94 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
index 6f8a739..66e704c 100644
--- a/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
+++ b/flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.connectors.fs.Clock;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.connectors.fs.Writer;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -138,7 +138,7 @@ import java.util.Iterator;
  */
 public class BucketingSink<T>
 		extends RichSinkFunction<T>
-		implements InputTypeConfigurable, Checkpointed<BucketingSink.State<T>>, CheckpointListener,
Triggerable {
+		implements InputTypeConfigurable, Checkpointed<BucketingSink.State<T>>, CheckpointListener,
ProcessingTimeCallback {
 	private static final long serialVersionUID = 1L;
 
 	private static Logger LOG = LoggerFactory.getLogger(BucketingSink.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 321991a..58bca52 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
@@ -461,7 +461,7 @@ public abstract class AbstractFetcher<T, KPH> {
 	 * The periodic watermark emitter. In its given interval, it checks all partitions for
 	 * the current event time watermark, and possibly emits the next watermark.
 	 */
-	private static class PeriodicWatermarkEmitter implements Triggerable {
+	private static class PeriodicWatermarkEmitter implements ProcessingTimeCallback {
 
 		private final KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[] allPartitions;
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
index c77b634..15258cf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.InstantiationUtil;
 
@@ -37,7 +37,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * {@link InternalTimerService} that stores timers on the Java heap.
  */
-public class HeapInternalTimerService<K, N> implements InternalTimerService<N>,
Triggerable {
+public class HeapInternalTimerService<K, N> implements InternalTimerService<N>,
ProcessingTimeCallback {
 
 	private final TypeSerializer<K> keySerializer;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index 01ae55c..66d2ac2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.Preconditions;
@@ -199,7 +199,7 @@ public class StreamSourceContexts {
 			}
 		}
 
-		private class WatermarkEmittingTask implements Triggerable {
+		private class WatermarkEmittingTask implements ProcessingTimeCallback {
 
 			private final ProcessingTimeService timeService;
 			private final Object lock;

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
index 0798ed4..5f5028a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/ExtractTimestampsOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 
 /**
  * A {@link org.apache.flink.streaming.api.operators.StreamOperator} for extracting timestamps
@@ -36,7 +37,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 @Deprecated
 public class ExtractTimestampsOperator<T>
 		extends AbstractUdfStreamOperator<T, TimestampExtractor<T>>
-		implements OneInputStreamOperator<T, T>, Triggerable {
+		implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
 
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
index b1402ed..ba72659 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 
 /**
  * A stream operator that extracts timestamps from stream elements and
@@ -32,7 +33,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
  */
 public class TimestampsAndPeriodicWatermarksOperator<T>
 		extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
-		implements OneInputStreamOperator<T, T>, Triggerable {
+		implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
 
 	private static final long serialVersionUID = 1L;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
deleted file mode 100644
index 9ca3f33..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.operators;
-
-import org.apache.flink.annotation.Internal;
-
-/**
- * This interface must be implemented by objects that are triggered by the timer service
available
- * to stream operators in {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}.
- */
-@Internal
-public interface Triggerable {
-
-	/**
-	 * This method is invoked with the timestamp for which the trigger was scheduled.
-	 * <p>
-	 * If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled
due
-	 * to a garbage collection), the timestamp supplied to this function will still be the original
-	 * timestamp for which the trigger was scheduled.
-	 * 
-	 * @param timestamp The timestamp for which the trigger event was scheduled.
-	 */
-	void trigger(long timestamp) throws Exception ;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 2a77c0a..80a317e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import static java.util.Objects.requireNonNull;
@@ -41,7 +41,7 @@ import static java.util.Objects.requireNonNull;
 @Internal
 public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, STATE,
F extends Function> 
 		extends AbstractUdfStreamOperator<OUT, F> 
-		implements OneInputStreamOperator<IN, OUT>, Triggerable {
+		implements OneInputStreamOperator<IN, OUT>, ProcessingTimeCallback {
 	
 	private static final long serialVersionUID = 3245500864882459867L;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
new file mode 100644
index 0000000..aca1718
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Interface for processing-time callbacks that can be registered at a
+ * {@link ProcessingTimeService}.
+ */
+@Internal
+public interface ProcessingTimeCallback {
+
+	/**
+	 * This method is invoked with the timestamp for which the trigger was scheduled.
+	 * <p>
+	 * If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled
due
+	 * to a garbage collection), the timestamp supplied to this function will still be the original
+	 * timestamp for which the trigger was scheduled.
+	 * 
+	 * @param timestamp The timestamp for which the trigger event was scheduled.
+	 */
+	void trigger(long timestamp) throws Exception ;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
index 15c3ebb..f64bead 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeService.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
 import java.util.concurrent.ScheduledFuture;
 
 /**
@@ -32,10 +30,10 @@ import java.util.concurrent.ScheduledFuture;
  * <ol>
  *     <li>In the initial state, it accepts timer registrations and triggers when the
time is reached.</li>
  *     <li>After calling {@link #quiesceAndAwaitPending()}, further calls to
- *         {@link #registerTimer(long, Triggerable)} will not register any further timers,
and will
+ *         {@link #registerTimer(long, ProcessingTimeCallback)} will not register any further
timers, and will
  *         return a "dummy" future as a result. This is used for clean shutdown, where currently
firing
  *         timers are waited for and no future timers can be scheduled, without causing hard
exceptions.</li>
- *     <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long,
Triggerable)}
+ *     <li>After a call to {@link #shutdownService()}, all calls to {@link #registerTimer(long,
ProcessingTimeCallback)}
  *         will result in a hard exception.</li>
  * </ol>
  */
@@ -55,7 +53,7 @@ public abstract class ProcessingTimeService {
 	 * @return The future that represents the scheduled task. This always returns some future,
 	 *         even if the timer was shut down
 	 */
-	public abstract ScheduledFuture<?> registerTimer(long timestamp, Triggerable target);
+	public abstract ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback
target);
 
 	/**
 	 * Returns <tt>true</tt> if the service has been shut down, <tt>false</tt>
otherwise.
@@ -64,7 +62,7 @@ public abstract class ProcessingTimeService {
 
 	/**
 	 * This method puts the service into a state where it does not register new timers, but
-	 * returns for each call to {@link #registerTimer(long, Triggerable)} only a "mock" future.
+	 * returns for each call to {@link #registerTimer(long, ProcessingTimeCallback)} only a
"mock" future.
 	 * Furthermore, the method clears all not yet started timers, and awaits the completion
 	 * of currently executing timers.
 	 * 
@@ -76,7 +74,7 @@ public abstract class ProcessingTimeService {
 
 	/**
 	 * Shuts down and clean up the timer service provider hard and immediately. This does not
wait
-	 * for any timer to complete. Any further call to {@link #registerTimer(long, Triggerable)}
+	 * for any timer to complete. Any further call to {@link #registerTimer(long, ProcessingTimeCallback)}
 	 * will result in a hard exception.
 	 */
 	public abstract void shutdownService();

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
index 3fd4202..b433f8d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import javax.annotation.Nonnull;
 import java.util.concurrent.BlockingQueue;
@@ -92,7 +91,7 @@ public class SystemProcessingTimeService extends ProcessingTimeService {
 	}
 
 	@Override
-	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
+	public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target)
{
 		long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);
 
 		// we directly try to register the timer and only react to the status on exception
@@ -165,11 +164,11 @@ public class SystemProcessingTimeService extends ProcessingTimeService
{
 	private static final class TriggerTask implements Runnable {
 
 		private final Object lock;
-		private final Triggerable target;
+		private final ProcessingTimeCallback target;
 		private final long timestamp;
 		private final AsyncExceptionHandler exceptionHandler;
 
-		TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, Triggerable target,
long timestamp) {
+		TriggerTask(AsyncExceptionHandler exceptionHandler, final Object lock, ProcessingTimeCallback
target, long timestamp) {
 			this.exceptionHandler = exceptionHandler;
 			this.lock = lock;
 			this.target = target;

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
index d0a2ea9..3e6c273 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestProcessingTimeService.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -69,7 +67,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 			for (Map.Entry<Long, List<ScheduledTimerFuture>> tasks: toRun) {
 				long now = tasks.getKey();
 				for (ScheduledTimerFuture task: tasks.getValue()) {
-					task.getTriggerable().trigger(now);
+					task.getProcessingTimeCallback().trigger(now);
 				}
 			}
 		}
@@ -81,7 +79,7 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 	}
 
 	@Override
-	public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
+	public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target)
{
 		if (isTerminated) {
 			throw new IllegalStateException("terminated");
 		}
@@ -149,12 +147,12 @@ public class TestProcessingTimeService extends ProcessingTimeService
{
 
 	private class ScheduledTimerFuture implements ScheduledFuture<Object> {
 
-		private final Triggerable triggerable;
+		private final ProcessingTimeCallback processingTimeCallback;
 
 		private final long timestamp;
 
-		public ScheduledTimerFuture(Triggerable triggerable, long timestamp) {
-			this.triggerable = triggerable;
+		public ScheduledTimerFuture(ProcessingTimeCallback processingTimeCallback, long timestamp)
{
+			this.processingTimeCallback = processingTimeCallback;
 			this.timestamp = timestamp;
 		}
 
@@ -197,8 +195,8 @@ public class TestProcessingTimeService extends ProcessingTimeService {
 			throw new UnsupportedOperationException();
 		}
 
-		public Triggerable getTriggerable() {
-			return triggerable;
+		public ProcessingTimeCallback getProcessingTimeCallback() {
+			return processingTimeCallback;
 		}
 
 		public long getTimestamp() {

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index fb1fab5..87241dd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -64,7 +65,7 @@ public class StreamTaskTimerTest {
 		testHarness.waitForTaskRunning();
 
 		// first one spawns thread
-		mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), new Triggerable()
{
+		mapTask.getProcessingTimeService().registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback()
{
 			@Override
 			public void trigger(long timestamp) {
 			}
@@ -107,14 +108,14 @@ public class StreamTaskTimerTest {
 			final long t4 = System.currentTimeMillis() + 200;
 
 			ProcessingTimeService timeService = mapTask.getProcessingTimeService();
-			timeService.registerTimer(t1, new ValidatingTriggerable(errorRef, t1, 0));
-			timeService.registerTimer(t2, new ValidatingTriggerable(errorRef, t2, 1));
-			timeService.registerTimer(t3, new ValidatingTriggerable(errorRef, t3, 2));
-			timeService.registerTimer(t4, new ValidatingTriggerable(errorRef, t4, 3));
+			timeService.registerTimer(t1, new ValidatingProcessingTimeCallback(errorRef, t1, 0));
+			timeService.registerTimer(t2, new ValidatingProcessingTimeCallback(errorRef, t2, 1));
+			timeService.registerTimer(t3, new ValidatingProcessingTimeCallback(errorRef, t3, 2));
+			timeService.registerTimer(t4, new ValidatingProcessingTimeCallback(errorRef, t4, 3));
 
 			long deadline = System.currentTimeMillis() + 20000;
 			while (errorRef.get() == null &&
-					ValidatingTriggerable.numInSequence < 4 &&
+					ValidatingProcessingTimeCallback.numInSequence < 4 &&
 					System.currentTimeMillis() < deadline)
 			{
 				Thread.sleep(100);
@@ -126,7 +127,7 @@ public class StreamTaskTimerTest {
 				fail(errorRef.get().getMessage());
 			}
 
-			assertEquals(4, ValidatingTriggerable.numInSequence);
+			assertEquals(4, ValidatingProcessingTimeCallback.numInSequence);
 
 			testHarness.endInput();
 			testHarness.waitForTaskCompletion();
@@ -146,7 +147,7 @@ public class StreamTaskTimerTest {
 		}
 	}
 
-	private static class ValidatingTriggerable implements Triggerable {
+	private static class ValidatingProcessingTimeCallback implements ProcessingTimeCallback
{
 		
 		static int numInSequence;
 		
@@ -155,7 +156,7 @@ public class StreamTaskTimerTest {
 		private final long expectedTimestamp;
 		private final int expectedInSequence;
 
-		private ValidatingTriggerable(AtomicReference<Throwable> errorRef, long expectedTimestamp,
int expectedInSequence) {
+		private ValidatingProcessingTimeCallback(AtomicReference<Throwable> errorRef, long
expectedTimestamp, int expectedInSequence) {
 			this.errorRef = errorRef;
 			this.expectedTimestamp = expectedTimestamp;
 			this.expectedInSequence = expectedInSequence;

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
index 9c2cee3..db56717 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TestProcessingTimeServiceTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
 import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
 
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
@@ -70,14 +71,14 @@ public class TestProcessingTimeServiceTest {
 		assertEquals(testHarness.getProcessingTimeService().getCurrentProcessingTime(), 16);
 
 		// register 2 tasks
-		mapTask.getProcessingTimeService().registerTimer(30, new Triggerable() {
+		mapTask.getProcessingTimeService().registerTimer(30, new ProcessingTimeCallback() {
 			@Override
 			public void trigger(long timestamp) {
 
 			}
 		});
 
-		mapTask.getProcessingTimeService().registerTimer(40, new Triggerable() {
+		mapTask.getProcessingTimeService().registerTimer(40, new ProcessingTimeCallback() {
 			@Override
 			public void trigger(long timestamp) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
index e7944df..dc679ab 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import org.junit.Test;
 
@@ -51,7 +50,7 @@ public class SystemProcessingTimeServiceTest {
 			assertEquals(0, timer.getNumTasksScheduled());
 
 			// schedule something
-			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new
Triggerable() {
+			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis(), new
ProcessingTimeCallback() {
 				@Override
 				public void trigger(long timestamp) {
 					assertTrue(Thread.holdsLock(lock));
@@ -87,7 +86,7 @@ public class SystemProcessingTimeServiceTest {
 			final OneShotLatch latch = new OneShotLatch();
 
 			// the task should trigger immediately and should block until terminated with interruption
-			timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+			timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
 				@Override
 				public void trigger(long timestamp) throws Exception {
 					latch.trigger();
@@ -105,7 +104,7 @@ public class SystemProcessingTimeServiceTest {
 			}
 
 			try {
-				timer.registerTimer(System.currentTimeMillis() + 1000, new Triggerable() {
+				timer.registerTimer(System.currentTimeMillis() + 1000, new ProcessingTimeCallback() {
 					@Override
 					public void trigger(long timestamp) {}
 				});
@@ -141,7 +140,7 @@ public class SystemProcessingTimeServiceTest {
 
 			final ReentrantLock scopeLock = new ReentrantLock();
 
-			timer.registerTimer(System.currentTimeMillis() + 20, new Triggerable() {
+			timer.registerTimer(System.currentTimeMillis() + 20, new ProcessingTimeCallback() {
 				@Override
 				public void trigger(long timestamp) throws Exception {
 					scopeLock.lock();
@@ -163,7 +162,7 @@ public class SystemProcessingTimeServiceTest {
 			assertTrue(scopeLock.tryLock());
 
 			// should be able to schedule more tasks (that never get executed)
-			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5,
new Triggerable() {
+			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() - 5,
new ProcessingTimeCallback() {
 				@Override
 				public void trigger(long timestamp) throws Exception {
 					throw new Exception("test");
@@ -198,7 +197,7 @@ public class SystemProcessingTimeServiceTest {
 			assertEquals(0, timer.getNumTasksScheduled());
 
 			// schedule something
-			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000,
new Triggerable() {
+			ScheduledFuture<?> future = timer.registerTimer(System.currentTimeMillis() + 100000000,
new ProcessingTimeCallback() {
 				@Override
 				public void trigger(long timestamp) {}
 			});
@@ -233,7 +232,7 @@ public class SystemProcessingTimeServiceTest {
 					}
 				}, lock);
 		
-		timeServiceProvider.registerTimer(System.currentTimeMillis(), new Triggerable() {
+		timeServiceProvider.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback()
{
 			@Override
 			public void trigger(long timestamp) throws Exception {
 				throw new Exception("Exception in Timer");
@@ -257,7 +256,7 @@ public class SystemProcessingTimeServiceTest {
 
 			// we block the timer execution to make sure we have all the time
 			// to register some additional timers out of order
-			timer.registerTimer(System.currentTimeMillis(), new Triggerable() {
+			timer.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() {
 				@Override
 				public void trigger(long timestamp) throws Exception {
 					sync.await();
@@ -272,7 +271,7 @@ public class SystemProcessingTimeServiceTest {
 			final long time4 = now - 2;
 
 			final ArrayBlockingQueue<Long> timestamps = new ArrayBlockingQueue<>(4);
-			Triggerable trigger = new Triggerable() {
+			ProcessingTimeCallback trigger = new ProcessingTimeCallback() {
 				@Override
 				public void trigger(long timestamp) {
 					timestamps.add(timestamp);

http://git-wip-us.apache.org/repos/asf/flink/blob/94a3f251/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index e7f62fd..c0cd0be 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.TimerException;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
@@ -171,7 +171,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase
{
 		Assert.assertTrue(testSuccess);
 	}
 
-	public static class TimerOperator extends AbstractStreamOperator<String> implements
OneInputStreamOperator<String, String>, Triggerable {
+	public static class TimerOperator extends AbstractStreamOperator<String> implements
OneInputStreamOperator<String, String>, ProcessingTimeCallback {
 		private static final long serialVersionUID = 1L;
 
 		int numTimers = 0;
@@ -230,7 +230,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase
{
 		}
 	}
 
-	public static class TwoInputTimerOperator extends AbstractStreamOperator<String> implements
TwoInputStreamOperator<String, String, String>, Triggerable {
+	public static class TwoInputTimerOperator extends AbstractStreamOperator<String> implements
TwoInputStreamOperator<String, String, String>, ProcessingTimeCallback {
 		private static final long serialVersionUID = 1L;
 
 		int numTimers = 0;


Mime
View raw message