flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-2936] Fix ClassCastException for Event-Time source
Date Fri, 11 Dec 2015 15:26:32 GMT
Repository: flink
Updated Branches:
  refs/heads/master c4a2d60c3 -> 6bd5714d2


[FLINK-2936] Fix ClassCastException for Event-Time source

Before, would throw a ClassCastException when emitting watermarks with
timestamp/watermark multiplexing disabled.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b648870
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b648870
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b648870

Branch: refs/heads/master
Commit: 4b648870b4673c5a9c4d80f185e7de679967098e
Parents: c4a2d60
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Dec 9 16:00:12 2015 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Dec 11 10:45:17 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/operators/StreamSource.java   | 14 ++++--
 .../streaming/timestamp/TimestampITCase.java    | 46 ++++++++++++++++----
 .../streaming/util/SourceFunctionUtil.java      |  2 +-
 3 files changed, 48 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4b648870/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index e80654a..91c846f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -46,7 +46,7 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T,
SourceFunction
 		final ExecutionConfig executionConfig = getExecutionConfig();
 		
 		if (userFunction instanceof EventTimeSourceFunction) {
-			ctx = new ManualWatermarkContext<T>(lockingObject, collector);
+			ctx = new ManualWatermarkContext<T>(lockingObject, collector, getRuntimeContext().getExecutionConfig().areTimestampsEnabled());
 		} else if (executionConfig.getAutoWatermarkInterval() > 0) {
 			ctx = new AutomaticWatermarkContext<T>(lockingObject, collector, executionConfig);
 		} else if (executionConfig.areTimestampsEnabled()) {
@@ -261,11 +261,13 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T,
SourceFunction
 		private final Object lockingObject;
 		private final Output<StreamRecord<T>> output;
 		private final StreamRecord<T> reuse;
+		private final boolean watermarkMultiplexingEnabled;
 
-		public ManualWatermarkContext(Object lockingObject, Output<StreamRecord<T>>
output) {
+		public ManualWatermarkContext(Object lockingObject, Output<StreamRecord<T>>
output, boolean watermarkMultiplexingEnabled) {
 			this.lockingObject = lockingObject;
 			this.output = output;
 			this.reuse = new StreamRecord<T>(null);
+			this.watermarkMultiplexingEnabled = watermarkMultiplexingEnabled;
 		}
 
 		@Override
@@ -283,7 +285,9 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T,
SourceFunction
 
 		@Override
 		public void emitWatermark(Watermark mark) {
-			output.emitWatermark(mark);
+			if (watermarkMultiplexingEnabled) {
+				output.emitWatermark(mark);
+			}
 		}
 
 		@Override
@@ -296,7 +300,9 @@ public class StreamSource<T> extends AbstractUdfStreamOperator<T,
SourceFunction
 			// emit one last +Inf watermark to make downstream watermark processing work
 			// when some sources close early
 			synchronized (lockingObject) {
-				output.emitWatermark(new Watermark(Long.MAX_VALUE));
+				if (watermarkMultiplexingEnabled) {
+					output.emitWatermark(new Watermark(Long.MAX_VALUE));
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4b648870/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
index 5113b45..8e7ada4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.taskmanager.MultiShotLatch;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
@@ -133,7 +134,7 @@ public class TimestampITCase {
 		source1.union(source2)
 				.map(new IdentityMap())
 				.connect(source2).map(new IdentityCoMap())
-				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
 				.addSink(new NoOpSink<Integer>());
 
 		env.execute();
@@ -293,7 +294,7 @@ public class TimestampITCase {
 				});
 
 		extractOp
-				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
 				.transform("Timestamp Check",
 						BasicTypeInfo.INT_TYPE_INFO,
 						new TimestampCheckingOperator());
@@ -362,7 +363,7 @@ public class TimestampITCase {
 				return Long.MIN_VALUE;
 			}
 		})
-				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
 				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
 
 
@@ -429,7 +430,7 @@ public class TimestampITCase {
 				return Long.MIN_VALUE;
 			}
 		})
-				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
+				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
 				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
 
 
@@ -503,23 +504,50 @@ public class TimestampITCase {
 		env.execute();
 	}
 
+	/**
+	 * This verifies that an event time source works when setting stream time characteristic
to
+	 * processing time. In this case, the watermarks should just be swallowed.
+	 */
+	@Test
+	public void testEventTimeSourceWithProcessingTime() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
cluster.getLeaderRPCPort());
+		env.setParallelism(2);
+		env.getConfig().disableSysoutLogging();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
+		env.getConfig().disableTimestamps();
+
+		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0, 10));
+
+		source1
+			.map(new IdentityMap())
+			.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(false));
+
+		env.execute();
+
+		// verify that we don't get any watermarks, the source is used as watermark source in
+		// other tests, so it normally emits watermarks
+		Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 0);
+	}
+
 	@SuppressWarnings("unchecked")
 	public static class CustomOperator extends AbstractStreamOperator<Integer> implements
OneInputStreamOperator<Integer, Integer> {
 
 		List<Watermark> watermarks;
 		public static List<Watermark>[] finalWatermarks = new List[PARALLELISM];
-		private long oldTimestamp;
+		private final boolean timestampsEnabled;
 
-		public CustomOperator() {
+		public CustomOperator(boolean timestampsEnabled) {
 			setChainingStrategy(ChainingStrategy.ALWAYS);
+			this.timestampsEnabled = timestampsEnabled;
 		}
 
 		@Override
 		public void processElement(StreamRecord<Integer> element) throws Exception {
-			if (element.getTimestamp() != element.getValue()) {
-				Assert.fail("Timestamps are not properly handled.");
+			if (timestampsEnabled) {
+				if (element.getTimestamp() != element.getValue()) {
+					Assert.fail("Timestamps are not properly handled.");
+				}
 			}
-			oldTimestamp = element.getTimestamp();
 			output.collect(element);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4b648870/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
index 2afdc40..8895b6e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
@@ -62,7 +62,7 @@ public class SourceFunctionUtil<T> {
 			final Object lockingObject = new Object();
 			SourceFunction.SourceContext<T> ctx;
 			if (sourceFunction instanceof EventTimeSourceFunction) {
-				ctx = new StreamSource.ManualWatermarkContext<T>(lockingObject, collector);
+				ctx = new StreamSource.ManualWatermarkContext<T>(lockingObject, collector, true);
 			} else {
 				ctx = new StreamSource.NonWatermarkContext<T>(lockingObject, collector);
 			}


Mime
View raw message