flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [1/2] flink git commit: [FLINK-3660] Measure latency and exposes them via a metric
Date Fri, 14 Oct 2016 12:34:11 GMT
Repository: flink
Updated Branches:
  refs/heads/master a648f88fb -> a612b9966


http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index 10b30d0..42087b4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -33,12 +33,14 @@ import org.apache.flink.streaming.api.operators.StoppableStreamSource;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -65,7 +67,7 @@ public class StreamSourceOperatorTest {
 		
 		final List<StreamElement> output = new ArrayList<>();
 		
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
 		operator.run(new Object(), new CollectorOutput<String>(output));
 		
 		assertEquals(1, output.size());
@@ -82,7 +84,7 @@ public class StreamSourceOperatorTest {
 				new StreamSource<>(new InfiniteSource<String>());
 
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
 		operator.cancel();
 
 		// run and exit
@@ -102,7 +104,7 @@ public class StreamSourceOperatorTest {
 				new StreamSource<>(new InfiniteSource<String>());
 
 		
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
 		
 		// trigger an async cancel in a bit
 		new Thread("canceler") {
@@ -135,7 +137,7 @@ public class StreamSourceOperatorTest {
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
 		operator.stop();
 
 		// run and stop
@@ -154,7 +156,7 @@ public class StreamSourceOperatorTest {
 				new StoppableStreamSource<>(new InfiniteSource<String>());
 
 
-		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, null);
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 0, null);
 
 		// trigger an async cancel in a bit
 		new Thread("canceler") {
@@ -172,7 +174,49 @@ public class StreamSourceOperatorTest {
 
 		assertTrue(output.isEmpty());
 	}
-	
+
+	/**
+	 * Test that latency marks are emitted
+	 */
+	@Test
+	public void testLatencyMarkEmission() throws Exception {
+		final long now = System.currentTimeMillis();
+
+		final List<StreamElement> output = new ArrayList<>();
+
+		// regular stream source operator
+		final StoppableStreamSource<String, InfiniteSource<String>> operator =
+				new StoppableStreamSource<>(new InfiniteSource<String>());
+
+		// emit latency marks every 10 milliseconds.
+		setupSourceOperator(operator, TimeCharacteristic.EventTime, 0, 10, null);
+
+		// trigger an async cancel in a bit
+		new Thread("canceler") {
+			@Override
+			public void run() {
+				try {
+					Thread.sleep(200);
+				} catch (InterruptedException ignored) {}
+				operator.stop();
+			}
+		}.start();
+
+		// run and wait to be stopped
+		operator.run(new Object(), new CollectorOutput<String>(output));
+
+		// ensure that there has been some output
+		assertTrue(output.size() > 0);
+		// and that its only latency markers
+		for(StreamElement se: output) {
+			Assert.assertTrue(se.isLatencyMarker());
+			Assert.assertEquals(-1, se.asLatencyMarker().getVertexID());
+			Assert.assertEquals(0, se.asLatencyMarker().getSubtaskIndex());
+			Assert.assertTrue(se.asLatencyMarker().getMarkedTime() >= now);
+		}
+	}
+
+
 	@Test
 	public void testAutomaticWatermarkContext() throws Exception {
 
@@ -184,7 +228,7 @@ public class StreamSourceOperatorTest {
 		TestTimeServiceProvider timeProvider = new TestTimeServiceProvider();
 		timeProvider.setCurrentTime(0);
 
-		setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, timeProvider);
+		setupSourceOperator(operator, TimeCharacteristic.IngestionTime, watermarkInterval, 0, timeProvider);
 
 		final List<StreamElement> output = new ArrayList<>();
 
@@ -218,10 +262,12 @@ public class StreamSourceOperatorTest {
 	private static <T> void setupSourceOperator(StreamSource<T, ?> operator,
 												TimeCharacteristic timeChar,
 												long watermarkInterval,
-												final TestTimeServiceProvider timeProvider) {
+												long latencyMarkInterval,
+												final TimeServiceProvider timeProvider) {
 
 		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.setAutoWatermarkInterval(watermarkInterval);
+		executionConfig.setLatencyTrackingInterval(latencyMarkInterval);
 
 		StreamConfig cfg = new StreamConfig(new Configuration());
 		cfg.setStateBackend(new MemoryStateBackend());
@@ -300,6 +346,11 @@ public class StreamSourceOperatorTest {
 		}
 
 		@Override
+		public void emitLatencyMarker(LatencyMarker latencyMarker) {
+			list.add(latencyMarker);
+		}
+
+		@Override
 		public void collect(StreamRecord<T> record) {
 			list.add(record);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 8a3d919..51e61a1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -794,6 +794,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 		when(env.getTaskInfo()).thenReturn(new TaskInfo("Test task name", 1, 0, 1, 0));
 		when(env.getUserClassLoader()).thenReturn(AggregatingAlignedProcessingTimeWindowOperatorTest.class.getClassLoader());
 		when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
+		when(env.getTaskManagerInfo()).thenReturn(new TaskManagerRuntimeInfo("foo", new Configuration(),
"foo"));
 
 		when(task.getEnvironment()).thenReturn(env);
 		return task;

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
index 3c1c24b..42be131 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CollectingOutput.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.ArrayList;
@@ -60,7 +61,12 @@ public class CollectingOutput<T> implements Output<StreamRecord<T>>
{
 	
 	@Override
 	public void emitWatermark(Watermark mark) {
-		throw new UnsupportedOperationException("the output should not emit watermarks");
+		throw new UnsupportedOperationException("The output should not emit watermarks");
+	}
+
+	@Override
+	public void emitLatencyMarker(LatencyMarker latencyMarker) {
+		throw new UnsupportedOperationException("The output should not emit latency markers");
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
index 7abd2f9..bf3a488 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/MockOutput.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class MockOutput<T> implements Output<StreamRecord<T>> {
@@ -44,6 +45,11 @@ public class MockOutput<T> implements Output<StreamRecord<T>>
{
 	}
 
 	@Override
+	public void emitLatencyMarker(LatencyMarker latencyMarker) {
+		throw new RuntimeException();
+	}
+
+	@Override
 	public void close() {
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 9d8e6a5..d1622ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.DefaultTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -291,6 +292,11 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		}
 
 		@Override
+		public void emitLatencyMarker(LatencyMarker latencyMarker) {
+			outputList.add(latencyMarker);
+		}
+
+		@Override
 		public void collect(StreamRecord<OUT> element) {
 			if (outputSerializer == null) {
 				outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
index d848d2a..32b4c77 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 
@@ -127,6 +128,11 @@ public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
 		}
 
 		@Override
+		public void emitLatencyMarker(LatencyMarker latencyMarker) {
+			outputList.add(latencyMarker);
+		}
+
+		@Override
 		@SuppressWarnings("unchecked")
 		public void collect(StreamRecord<OUT> element) {
 			if (outputSerializer == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a612b996/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 2ed759d..5855214 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -777,6 +777,7 @@ public class TimestampITCase extends TestLogger {
 
 		@Override
 		public void processWatermark(Watermark mark) throws Exception {}
+
 	}
 
 	public static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer>
{


Mime
View raw message