flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [05/17] flink git commit: [FLINK-4329] [streaming api] Fix Streaming File Source Timestamps/Watermarks Handling
Date Wed, 05 Oct 2016 22:16:55 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
index 247edd6..5275a39 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/KeyedOneInputStreamOperatorTestHarness.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -92,7 +92,7 @@ public class KeyedOneInputStreamOperatorTestHarness<K, IN, OUT>
 
 	public KeyedOneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator,
 			ExecutionConfig executionConfig,
-			TimeServiceProvider testTimeProvider,
+			TestTimeServiceProvider testTimeProvider,
 			KeySelector<IN, K> keySelector,
 			TypeInformation<K> keyType) {
 		super(operator, executionConfig, testTimeProvider);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ff451be/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index d6f46fd..d8a0ee2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -31,15 +31,16 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.operators.StreamCheckpointedOperator;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -87,6 +88,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	 */
 	private boolean setupCalled = false;
 
+	private volatile boolean wasFailedExternally = false;
+
 	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator)
{
 		this(operator, new ExecutionConfig());
 	}
@@ -100,7 +103,7 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	public OneInputStreamOperatorTestHarness(
 			OneInputStreamOperator<IN, OUT> operator,
 			ExecutionConfig executionConfig,
-			TimeServiceProvider testTimeProvider) {
+			TestTimeServiceProvider testTimeProvider) {
 		this(operator, executionConfig, new Object(), testTimeProvider);
 	}
 
@@ -132,10 +135,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		doAnswer(new Answer<Void>() {
 			@Override
 			public Void answer(InvocationOnMock invocation) throws Throwable {
-				// do nothing
+				wasFailedExternally = true;
 				return null;
 			}
-		}).when(mockTask).registerAsyncException(any(AsynchronousException.class));
+		}).when(mockTask).handleAsyncException(any(String.class), any(Throwable.class));
 
 		try {
 			doAnswer(new Answer<CheckpointStreamFactory>() {
@@ -161,6 +164,18 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		}).when(mockTask).getTimerService();
 	}
 
+	public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
+		this.config.setTimeCharacteristic(timeCharacteristic);
+	}
+
+	public TimeCharacteristic getTimeCharacteristic() {
+		return this.config.getTimeCharacteristic();
+	}
+
+	public boolean wasFailedExternally() {
+		return wasFailedExternally;
+	}
+
 	public void setStateBackend(AbstractStateBackend stateBackend) {
 		this.stateBackend = stateBackend;
 	}


Mime
View raw message