[FLINK-4496] Refactor the TimeServiceProvider to take a Trigerable instead of a Runnable.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4779c7ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4779c7ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4779c7ec
Branch: refs/heads/master
Commit: 4779c7eca0f7e91dd5ee38122baa3fe99c8b7bea
Parents: 568845a
Author: kl0u <kkloudas@gmail.com>
Authored: Thu Aug 25 17:38:49 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Sep 23 15:01:06 2016 +0200
----------------------------------------------------------------------
.../kafka/testutils/MockRuntimeContext.java | 41 +---
.../runtime/tasks/AsyncExceptionHandler.java | 31 +++
.../tasks/DefaultTimeServiceProvider.java | 76 ++++++-
.../streaming/runtime/tasks/StreamTask.java | 81 +++----
.../runtime/tasks/TestTimeServiceProvider.java | 44 ++--
.../runtime/tasks/TimeServiceProvider.java | 6 +-
.../operators/StreamSourceOperatorTest.java | 14 +-
.../runtime/operators/StreamTaskTimerTest.java | 53 -----
.../runtime/operators/TimeProviderTest.java | 214 +++++++++++++++++++
...AlignedProcessingTimeWindowOperatorTest.java | 12 +-
...AlignedProcessingTimeWindowOperatorTest.java | 2 +
.../runtime/tasks/StreamTaskTestHarness.java | 2 +-
.../util/OneInputStreamOperatorTestHarness.java | 85 ++++----
.../streaming/util/WindowingTestHarness.java | 2 -
14 files changed, 436 insertions(+), 227 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index 2d5e2d8..7a50569 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -58,38 +58,28 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
private final int indexOfThisSubtask;
private final ExecutionConfig execConfig;
- private final Object checkpointLock;
private final TimeServiceProvider timerService;
public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
- this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), null);
+ this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), new Object());
}
public MockRuntimeContext(
- int numberOfParallelSubtasks, int indexOfThisSubtask,
+ int numberOfParallelSubtasks,
+ int indexOfThisSubtask,
ExecutionConfig execConfig,
Object checkpointLock) {
- this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, checkpointLock,
- DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
- }
-
- public MockRuntimeContext(
- int numberOfParallelSubtasks, int indexOfThisSubtask,
- ExecutionConfig execConfig,
- Object checkpointLock,
- TimeServiceProvider timerService) {
-
super(new MockStreamOperator(),
- new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
- Collections.<String, Accumulator<?, ?>>emptyMap());
-
+ new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
+ Collections.<String, Accumulator<?, ?>>emptyMap());
+
this.numberOfParallelSubtasks = numberOfParallelSubtasks;
this.indexOfThisSubtask = indexOfThisSubtask;
this.execConfig = execConfig;
- this.checkpointLock = checkpointLock;
- this.timerService = timerService;
+ this.timerService = DefaultTimeServiceProvider.
+ createForTesting(Executors.newSingleThreadScheduledExecutor(), checkpointLock);
}
@Override
@@ -216,20 +206,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
@Override
public ScheduledFuture<?> registerTimer(final long time, final Triggerable target)
{
Preconditions.checkNotNull(timerService, "The processing time timer has not been initialized.");
-
- return timerService.registerTimer(time, new Runnable() {
- @Override
- public void run() {
- synchronized (checkpointLock) {
- try {
- target.trigger(time);
- } catch (Throwable t) {
- System.err.println("!!! Caught exception while processing timer. !!!");
- t.printStackTrace();
- }
- }
- }
- });
+ return timerService.registerTimer(time, target);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
new file mode 100644
index 0000000..85a4115
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncExceptionHandler.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+/**
+ * An interface marking a task as capable to register exceptions thrown by different
+ * threads, other than the one executing the taks itself.
+ */
+public interface AsyncExceptionHandler {
+
+ /**
+ * Registers to the main thread an exception that was thrown by another thread
+ * (e.g. a TriggerTask), other than the one executing the main task.
+ */
+ void registerAsyncException(String message, AsynchronousException exception);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
index b803b82..c7339b3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/DefaultTimeServiceProvider.java
@@ -17,6 +17,9 @@
package org.apache.flink.streaming.runtime.tasks;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -28,15 +31,26 @@ import java.util.concurrent.TimeUnit;
*/
public class DefaultTimeServiceProvider extends TimeServiceProvider {
+ /** The containing task that owns this time service provider. */
+ private final AsyncExceptionHandler task;
+
+ private final Object checkpointLock;
+
/** The executor service that schedules and calls the triggers of this task*/
private final ScheduledExecutorService timerService;
- public static DefaultTimeServiceProvider create (ScheduledExecutorService executor) {
- return new DefaultTimeServiceProvider(executor);
+ public static DefaultTimeServiceProvider create(AsyncExceptionHandler task,
+ ScheduledExecutorService executor,
+ Object checkpointLock) {
+ return new DefaultTimeServiceProvider(task, executor, checkpointLock);
}
- private DefaultTimeServiceProvider(ScheduledExecutorService threadPoolExecutor) {
+ private DefaultTimeServiceProvider(AsyncExceptionHandler task,
+ ScheduledExecutorService threadPoolExecutor,
+ Object checkpointLock) {
+ this.task = task;
this.timerService = threadPoolExecutor;
+ this.checkpointLock = checkpointLock;
}
@Override
@@ -45,16 +59,62 @@ public class DefaultTimeServiceProvider extends TimeServiceProvider {
}
@Override
- public ScheduledFuture<?> registerTimer(long timestamp, Runnable target) {
+ public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
long delay = Math.max(timestamp - getCurrentProcessingTime(), 0);
- return timerService.schedule(target, delay, TimeUnit.MILLISECONDS);
+ return timerService.schedule(new TriggerTask(task, checkpointLock, target, timestamp),
delay, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return timerService.isTerminated();
}
@Override
public void shutdownService() throws Exception {
- if (!timerService.isTerminated()) {
- StreamTask.LOG.info("Timer service is shutting down.");
- }
timerService.shutdownNow();
}
+
+ /**
+ * Internal task that is invoked by the timer service and triggers the target.
+ */
+ private static final class TriggerTask implements Runnable {
+
+ private final Object lock;
+ private final Triggerable target;
+ private final long timestamp;
+ private final AsyncExceptionHandler task;
+
+ TriggerTask(AsyncExceptionHandler task, final Object lock, Triggerable target, long timestamp)
{
+ this.task = task;
+ this.lock = lock;
+ this.target = target;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public void run() {
+ synchronized (lock) {
+ try {
+ target.trigger(timestamp);
+ } catch (Throwable t) {
+
+ if (task != null) {
+ // registers the exception with the calling task
+ // so that it can be logged and (later) detected
+ TimerException asyncException = new TimerException(t);
+ task.registerAsyncException("Caught exception while processing timer.", asyncException);
+ } else {
+ // this is for when we are in testing mode and we
+ // want to have real processing time.
+ t.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public static DefaultTimeServiceProvider createForTesting(ScheduledExecutorService executor,
Object checkpointLock) {
+ return new DefaultTimeServiceProvider(null, executor, checkpointLock);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 49bbee7..80d51a6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -110,13 +110,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
@Internal
public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
extends AbstractInvokable
- implements StatefulTask {
+ implements StatefulTask, AsyncExceptionHandler {
/** The thread group that holds all trigger timer threads */
public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
/** The logger used by the StreamTask and its subclasses */
- protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
+ private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
// ------------------------------------------------------------------------
@@ -207,7 +207,13 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
timerService = timeProvider;
}
+ /**
+ * Returns the current processing time.
+ */
public long getCurrentProcessingTime() {
+ if (timerService == null) {
+ throw new IllegalStateException("The timer service has not been initialized.");
+ }
return timerService.getCurrentProcessingTime();
}
@@ -237,7 +243,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
// that timestamp are removed by user
executor.setRemoveOnCancelPolicy(true);
- timerService = DefaultTimeServiceProvider.create(executor);
+ timerService = DefaultTimeServiceProvider.create(this, executor, getCheckpointLock());
}
headOperator = configuration.getStreamOperator(getUserCodeClassLoader());
@@ -319,7 +325,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
// stop all timers and threads
if (timerService != null) {
try {
- timerService.shutdownService();
+ if (!timerService.isTerminated()) {
+ LOG.info("Timer service is shutting down.");
+ timerService.shutdownService();
+ }
}
catch (Throwable t) {
// catch and log the exception to not replace the original exception
@@ -475,7 +484,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
protected void finalize() throws Throwable {
super.finalize();
if (timerService != null) {
- timerService.shutdownService();
+ if (!timerService.isTerminated()) {
+ LOG.info("Timer service is shutting down.");
+ timerService.shutdownService();
+ }
}
closeAllClosables();
@@ -819,7 +831,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
if (timerService == null) {
throw new IllegalStateException("The timer service has not been initialized.");
}
- return timerService.registerTimer(timestamp, new TriggerTask(this, lock, target, timestamp));
+ return timerService.registerTimer(timestamp, target);
}
/**
@@ -836,6 +848,17 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
+ @Override
+ public void registerAsyncException(String message, AsynchronousException exception) {
+ if (isRunning) {
+ LOG.error(message, exception);
+ }
+
+ if (this.asyncException == null) {
+ this.asyncException = exception;
+ }
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
@@ -863,42 +886,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
// ------------------------------------------------------------------------
-
- /**
- * Internal task that is invoked by the timer service and triggers the target.
- */
- private static final class TriggerTask implements Runnable {
-
- private final Object lock;
- private final Triggerable target;
- private final long timestamp;
- private final StreamTask<?, ?> task;
-
- TriggerTask(StreamTask<?, ?> task, final Object lock, Triggerable target, long timestamp)
{
- this.task = task;
- this.lock = lock;
- this.target = target;
- this.timestamp = timestamp;
- }
-
- @Override
- public void run() {
- synchronized (lock) {
- try {
- target.trigger(timestamp);
- } catch (Throwable t) {
- if (task.isRunning) {
- LOG.error("Caught exception while processing timer.", t);
- }
- if (task.asyncException == null) {
- task.asyncException = new TimerException(t);
- }
- }
- }
- }
- }
-
- // ------------------------------------------------------------------------
private static class AsyncCheckpointRunnable implements Runnable, Closeable {
@@ -961,12 +948,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
}
}
catch (Exception e) {
- if (owner.isRunning()) {
- LOG.error("Caught exception while materializing asynchronous checkpoints.", e);
- }
- if (owner.asyncException == null) {
- owner.asyncException = new AsynchronousException(e);
- }
+
+ // registers the exception and tries to fail the whole task
+ AsynchronousException asyncException = new AsynchronousException(e);
+ owner.registerAsyncException("Caught exception while materializing asynchronous checkpoints.",
asyncException);
}
finally {
synchronized (cancelables) {
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
index 2314deb..a21a2e1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TestTimeServiceProvider.java
@@ -17,11 +17,13 @@
package org.apache.flink.streaming.runtime.tasks;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
+
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.concurrent.ScheduledFuture;
/**
@@ -32,30 +34,34 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
private long currentTime = 0;
- private Map<Long, List<Runnable>> registeredTasks = new HashMap<>();
+ private boolean isTerminated = false;
+
+ // sorts the timers by timestamp so that they are processed in the correct order.
+ private Map<Long, List<Triggerable>> registeredTasks = new TreeMap<>();
- public void setCurrentTime(long timestamp) {
+ public void setCurrentTime(long timestamp) throws Exception {
this.currentTime = timestamp;
// decide which timers to fire and put them in a list
// we do not fire them here to be able to accommodate timers
- // that register other timers. The latter would through an exception.
+ // that register other timers.
- Iterator<Map.Entry<Long, List<Runnable>>> it = registeredTasks.entrySet().iterator();
- List<Runnable> toRun = new ArrayList<>();
+ Iterator<Map.Entry<Long, List<Triggerable>>> it = registeredTasks.entrySet().iterator();
+ List<Map.Entry<Long, List<Triggerable>>> toRun = new ArrayList<>();
while (it.hasNext()) {
- Map.Entry<Long, List<Runnable>> t = it.next();
+ Map.Entry<Long, List<Triggerable>> t = it.next();
if (t.getKey() <= this.currentTime) {
- for (Runnable r: t.getValue()) {
- toRun.add(r);
- }
+ toRun.add(t);
it.remove();
}
}
// now do the actual firing.
- for (Runnable r: toRun) {
- r.run();
+ for (Map.Entry<Long, List<Triggerable>> tasks: toRun) {
+ long now = tasks.getKey();
+ for (Triggerable task: tasks.getValue()) {
+ task.trigger(now);
+ }
}
}
@@ -65,8 +71,8 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
}
@Override
- public ScheduledFuture<?> registerTimer(long timestamp, Runnable target) {
- List<Runnable> tasks = registeredTasks.get(timestamp);
+ public ScheduledFuture<?> registerTimer(long timestamp, Triggerable target) {
+ List<Triggerable> tasks = registeredTasks.get(timestamp);
if (tasks == null) {
tasks = new ArrayList<>();
registeredTasks.put(timestamp, tasks);
@@ -75,9 +81,14 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
return null;
}
+ @Override
+ public boolean isTerminated() {
+ return isTerminated;
+ }
+
public int getNoOfRegisteredTimers() {
int count = 0;
- for (List<Runnable> tasks: registeredTasks.values()) {
+ for (List<Triggerable> tasks: registeredTasks.values()) {
count += tasks.size();
}
return count;
@@ -85,7 +96,6 @@ public class TestTimeServiceProvider extends TimeServiceProvider {
@Override
public void shutdownService() throws Exception {
- this.registeredTasks.clear();
- this.registeredTasks = null;
+ this.isTerminated = true;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
index f3e4f78..42a4fa4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TimeServiceProvider.java
@@ -16,6 +16,7 @@
*/
package org.apache.flink.streaming.runtime.tasks;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
import java.util.concurrent.ScheduledFuture;
/**
@@ -34,7 +35,10 @@ public abstract class TimeServiceProvider {
* the task to be executed
* @return the result to be returned.
*/
- public abstract ScheduledFuture<?> registerTimer(final long timestamp, final Runnable
target);
+ public abstract ScheduledFuture<?> registerTimer(final long timestamp, final Triggerable
target);
+
+ /** Returns <tt>true</tt> if the service has been shut down, <tt>false</tt>
otherwise. */
+ public abstract boolean isTerminated();
/** Shuts down and clean up the timer service provider. */
public abstract void shutdownService() throws Exception;
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index 9c06b49..d61fec9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -184,6 +184,8 @@ public class StreamSourceOperatorTest {
long watermarkInterval = 10;
TestTimeServiceProvider timeProvider = new TestTimeServiceProvider();
+ timeProvider.setCurrentTime(0);
+
setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, timeProvider);
final List<StreamElement> output = new ArrayList<>();
@@ -249,17 +251,7 @@ public class StreamSourceOperatorTest {
throw new RuntimeException("The time provider is null");
}
- timeProvider.registerTimer(execTime, new Runnable() {
-
- @Override
- public void run() {
- try {
- target.trigger(execTime);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- });
+ timeProvider.registerTimer(execTime, target);
return null;
}
}).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index c9f204d..b9435f5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -27,7 +27,6 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -49,58 +48,6 @@ import static org.junit.Assert.*;
public class StreamTaskTimerTest {
@Test
- public void testCustomTimeServiceProvider() throws Throwable {
- TestTimeServiceProvider tp = new TestTimeServiceProvider();
-
- final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
- mapTask.setTimeService(tp);
-
- final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
- mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
-
- StreamMap<String, String> mapOperator = new StreamMap<>(new DummyMapFunction<String>());
- streamConfig.setStreamOperator(mapOperator);
-
- testHarness.invoke();
-
- assertTrue(testHarness.getCurrentProcessingTime() == 0);
-
- tp.setCurrentTime(11);
- assertTrue(testHarness.getCurrentProcessingTime() == 11);
-
- tp.setCurrentTime(15);
- tp.setCurrentTime(16);
- assertTrue(testHarness.getCurrentProcessingTime() == 16);
-
- // register 2 tasks
- mapTask.registerTimer(30, new Triggerable() {
- @Override
- public void trigger(long timestamp) {
-
- }
- });
-
- mapTask.registerTimer(40, new Triggerable() {
- @Override
- public void trigger(long timestamp) {
-
- }
- });
-
- assertEquals(2, tp.getNoOfRegisteredTimers());
-
- tp.setCurrentTime(35);
- assertEquals(1, tp.getNoOfRegisteredTimers());
-
- tp.setCurrentTime(40);
- assertEquals(0, tp.getNoOfRegisteredTimers());
-
- tp.shutdownService();
- }
-
- @Test
public void testOpenCloseAndTimestamps() throws Exception {
final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
new file mode 100644
index 0000000..4d4f07b
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimeProviderTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.hadoop.shaded.org.apache.http.impl.conn.SystemDefaultDnsResolver;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
+import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ResultPartitionWriter.class)
+@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*"})
+public class TimeProviderTest {
+
+ @Test
+ public void testDefaultTimeProvider() throws InterruptedException {
+ final OneShotLatch latch = new OneShotLatch();
+
+ final Object lock = new Object();
+ TimeServiceProvider timeServiceProvider = DefaultTimeServiceProvider
+ .createForTesting(Executors.newSingleThreadScheduledExecutor(), lock);
+
+ final List<Long> timestamps = new ArrayList<>();
+
+ long start = System.currentTimeMillis();
+ long interval = 50L;
+
+ final long noOfTimers = 20;
+
+ // we add 2 timers per iteration minus the first that would have a negative timestamp
+ final long expectedNoOfTimers = 2 * noOfTimers - 1;
+
+ for (int i = 0; i < noOfTimers; i++) {
+ double nextTimer = start + i * interval;
+
+ timeServiceProvider.registerTimer((long) nextTimer, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ timestamps.add(timestamp);
+ if (timestamps.size() == expectedNoOfTimers) {
+ latch.trigger();
+ }
+ }
+ });
+
+ // add also out-of-order tasks to verify that eventually
+ // they will be executed in the correct order.
+
+ if (i > 0) {
+ timeServiceProvider.registerTimer((long) (nextTimer - 10), new Triggerable() {
+ @Override
+ public void trigger(long timestamp) throws Exception {
+ timestamps.add(timestamp);
+ if (timestamps.size() == expectedNoOfTimers) {
+ latch.trigger();
+ }
+ }
+ });
+ }
+ }
+
+ if (!latch.isTriggered()) {
+ latch.await();
+ }
+
+ Assert.assertEquals(timestamps.size(), expectedNoOfTimers);
+
+ // verify that the tasks are executed
+ // in ascending timestamp order
+
+ int counter = 0;
+ long lastTs = Long.MIN_VALUE;
+ for (long timestamp: timestamps) {
+ Assert.assertTrue(timestamp >= lastTs);
+ lastTs = timestamp;
+
+ long expectedTs = start + (counter/2) * interval;
+ Assert.assertEquals(timestamp, (expectedTs + ((counter % 2 == 0) ? 0 : 40)));
+ counter++;
+ }
+ }
+
+ @Test
+ public void testTimerSorting() throws Exception {
+
+ final List<Long> result = new ArrayList<>();
+
+ TestTimeServiceProvider provider = new TestTimeServiceProvider();
+
+ provider.registerTimer(45, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {
+ result.add(timestamp);
+ }
+ });
+
+ provider.registerTimer(50, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {
+ result.add(timestamp);
+ }
+ });
+
+ provider.registerTimer(30, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {
+ result.add(timestamp);
+ }
+ });
+
+ provider.registerTimer(50, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {
+ result.add(timestamp);
+ }
+ });
+
+ Assert.assertTrue(provider.getNoOfRegisteredTimers() == 4);
+
+ provider.setCurrentTime(100);
+ long seen = 0;
+ for (Long l: result) {
+ Assert.assertTrue(l >= seen);
+ seen = l;
+ }
+ }
+
+ @Test
+ public void testCustomTimeServiceProvider() throws Throwable {
+ TestTimeServiceProvider tp = new TestTimeServiceProvider();
+
+ final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<>();
+ mapTask.setTimeService(tp);
+
+ final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<>(
+ mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+
+ StreamMap<String, String> mapOperator = new StreamMap<>(new StreamTaskTimerTest.DummyMapFunction<String>());
+ streamConfig.setStreamOperator(mapOperator);
+
+ testHarness.invoke();
+
+ assertTrue(testHarness.getCurrentProcessingTime() == 0);
+
+ tp.setCurrentTime(11);
+ assertTrue(testHarness.getCurrentProcessingTime() == 11);
+
+ tp.setCurrentTime(15);
+ tp.setCurrentTime(16);
+ assertTrue(testHarness.getCurrentProcessingTime() == 16);
+
+ // register 2 tasks
+ mapTask.registerTimer(30, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {
+
+ }
+ });
+
+ mapTask.registerTimer(40, new Triggerable() {
+ @Override
+ public void trigger(long timestamp) {
+
+ }
+ });
+
+ assertEquals(2, tp.getNoOfRegisteredTimers());
+
+ tp.setCurrentTime(35);
+ assertEquals(1, tp.getNoOfRegisteredTimers());
+
+ tp.setCurrentTime(40);
+ assertEquals(0, tp.getNoOfRegisteredTimers());
+
+ tp.shutdownService();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 40a6c79..9849bd7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -493,7 +493,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
for (int i = 0; i < 300; i++) {
testHarness.processElement(new StreamRecord<>(i + numElementsFirst));
}
-
+
op.dispose();
// re-create the operator and restore the state
@@ -502,9 +502,8 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, IntSerializer.INSTANCE,
windowSize, windowSize);
- testHarness =
- new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(), timerService);
-
+ timerService = new TestTimeServiceProvider();
+ testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(),
timerService);
testHarness.setup();
testHarness.restore(state);
@@ -580,15 +579,16 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
for (int i = numElementsFirst; i < numElements; i++) {
testHarness.processElement(new StreamRecord<>(i));
}
-
+
op.dispose();
-
+
// re-create the operator and restore the state
op = new AccumulatingProcessingTimeWindowOperator<>(
validatingIdentityFunction, identitySelector,
IntSerializer.INSTANCE, IntSerializer.INSTANCE,
windowSize, windowSlide);
+ timerService = new TestTimeServiceProvider();
testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(),
timerService);
testHarness.setup();
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 59bfe6f..3dfa395 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -601,6 +601,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSize);
+ timerService = new TestTimeServiceProvider();
testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(),
timerService);
testHarness.setup();
@@ -692,6 +693,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
IntSerializer.INSTANCE, tupleSerializer,
windowSize, windowSlide);
+ timerService = new TestTimeServiceProvider();
testHarness = new OneInputStreamOperatorTestHarness<>(op, new ExecutionConfig(),
timerService);
testHarness.setup();
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index cb10c5c..ce634f0 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -115,7 +115,7 @@ public class StreamTaskTestHarness<OUT> {
public long getCurrentProcessingTime() {
if (!(task instanceof StreamTask)) {
- System.currentTimeMillis();
+ throw new UnsupportedOperationException("getCurrentProcessingTime() only supported on
StreamTasks.");
}
return ((StreamTask) task).getCurrentProcessingTime();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 15074a7..6c637bf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -37,8 +37,10 @@ import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -87,7 +89,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
*/
private boolean setupCalled = false;
-
public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator)
{
this(operator, new ExecutionConfig());
}
@@ -95,27 +96,35 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
public OneInputStreamOperatorTestHarness(
OneInputStreamOperator<IN, OUT> operator,
ExecutionConfig executionConfig) {
- this(operator, executionConfig, DefaultTimeServiceProvider.create(Executors.newSingleThreadScheduledExecutor()));
+ this(operator, executionConfig, null);
+ }
+
+ public OneInputStreamOperatorTestHarness(
+ OneInputStreamOperator<IN, OUT> operator,
+ ExecutionConfig executionConfig,
+ TimeServiceProvider testTimeProvider) {
+ this(operator, executionConfig, new Object(), testTimeProvider);
}
public OneInputStreamOperatorTestHarness(
OneInputStreamOperator<IN, OUT> operator,
ExecutionConfig executionConfig,
+ Object checkpointLock,
TimeServiceProvider testTimeProvider) {
+
this.operator = operator;
this.outputList = new ConcurrentLinkedQueue<Object>();
Configuration underlyingConfig = new Configuration();
this.config = new StreamConfig(underlyingConfig);
this.config.setCheckpointingEnabled(true);
this.executionConfig = executionConfig;
- this.checkpointLock = new Object();
+ this.checkpointLock = checkpointLock;
final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(),
1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0);
mockTask = mock(StreamTask.class);
- timeServiceProvider = testTimeProvider;
when(mockTask.getName()).thenReturn("Mock Task");
- when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
+ when(mockTask.getCheckpointLock()).thenReturn(this.checkpointLock);
when(mockTask.getConfiguration()).thenReturn(config);
when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
when(mockTask.getEnvironment()).thenReturn(env);
@@ -125,21 +134,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
- final long execTime = (Long) invocation.getArguments()[0];
- final Triggerable target = (Triggerable) invocation.getArguments()[1];
-
- timeServiceProvider.registerTimer(
- execTime, new TriggerTask(checkpointLock, target, execTime));
+ // do nothing
return null;
}
- }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
-
- doAnswer(new Answer<Long>() {
- @Override
- public Long answer(InvocationOnMock invocation) throws Throwable {
- return timeServiceProvider.getCurrentProcessingTime();
- }
- }).when(mockTask).getCurrentProcessingTime();
+ }).when(mockTask).registerAsyncException(any(String.class), any(AsynchronousException.class));
try {
doAnswer(new Answer<CheckpointStreamFactory>() {
@@ -154,6 +152,26 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
throw new RuntimeException(e.getMessage(), e);
}
+ timeServiceProvider = testTimeProvider != null ? testTimeProvider :
+ DefaultTimeServiceProvider.create(mockTask, Executors.newSingleThreadScheduledExecutor(),
this.checkpointLock);
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ final long execTime = (Long) invocation.getArguments()[0];
+ final Triggerable target = (Triggerable) invocation.getArguments()[1];
+
+ timeServiceProvider.registerTimer(execTime, target);
+ return null;
+ }
+ }).when(mockTask).registerTimer(anyLong(), any(Triggerable.class));
+
+ doAnswer(new Answer<Long>() {
+ @Override
+ public Long answer(InvocationOnMock invocation) throws Throwable {
+ return timeServiceProvider.getCurrentProcessingTime();
+ }
+ }).when(mockTask).getCurrentProcessingTime();
}
public void setStateBackend(AbstractStateBackend stateBackend) {
@@ -216,7 +234,6 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
operator.notifyOfCompletedCheckpoint(checkpointId);
}
-
/**
* Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(org.apache.flink.core.fs.FSDataInputStream)}
()}
*/
@@ -275,32 +292,4 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
// ignore
}
}
-
- private static final class TriggerTask implements Runnable {
-
- private final Object lock;
- private final Triggerable target;
- private final long timestamp;
-
- TriggerTask(final Object lock, Triggerable target, long timestamp) {
- this.lock = lock;
- this.target = target;
- this.timestamp = timestamp;
- }
-
- @Override
- public void run() {
- synchronized (lock) {
- try {
- target.trigger(timestamp);
- } catch (Throwable t) {
- try {
- throw t;
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4779c7ec/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
index af1f3fa..d47136c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/WindowingTestHarness.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
|