flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/5] flink git commit: [FLINK-3375] [Kafka Connector] Add tests for per-kafka-partition watermarks.
Date Wed, 13 Apr 2016 18:51:26 GMT
[FLINK-3375] [Kafka Connector] Add tests for per-kafka-partition watermarks.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2dcd27f4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2dcd27f4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2dcd27f4

Branch: refs/heads/master
Commit: 2dcd27f403c2a7f10791bfe21c45e2a326aa46a1
Parents: e40e29d
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Apr 13 15:45:51 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Apr 13 20:50:49 2016 +0200

----------------------------------------------------------------------
 .../kafka/internals/AbstractFetcher.java        |  77 +++--
 .../kafka/internals/ExceptionProxy.java         |  60 +++-
 .../connectors/kafka/util/KafkaUtils.java       |  33 +-
 .../AbstractFetcherTimestampsTest.java          | 306 +++++++++++++++++++
 .../kafka/testutils/MockRuntimeContext.java     |  46 ++-
 5 files changed, 478 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 594aa66..8183575 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -328,45 +328,66 @@ public abstract class AbstractFetcher<T, KPH> {
 			ClassLoader userCodeClassLoader)
 		throws IOException, ClassNotFoundException
 	{
-		@SuppressWarnings("unchecked")
-		KafkaTopicPartitionState<KPH>[] partitions =
-				(KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
-
-		int pos = 0;
-		for (KafkaTopicPartition partition : assignedPartitions) {
-			// create the kafka version specific partition handle
-			KPH kafkaHandle = createKafkaPartitionHandle(partition);
+		switch (timestampWatermarkMode) {
 			
-			// create the partition state
-			KafkaTopicPartitionState<KPH> partitionState;
-			switch (timestampWatermarkMode) {
-				case NO_TIMESTAMPS_WATERMARKS:
-					partitionState = new KafkaTopicPartitionState<>(partition, kafkaHandle);
-					break;
-				case PERIODIC_WATERMARKS: {
+			case NO_TIMESTAMPS_WATERMARKS: {
+				@SuppressWarnings("unchecked")
+				KafkaTopicPartitionState<KPH>[] partitions =
+						(KafkaTopicPartitionState<KPH>[]) new KafkaTopicPartitionState<?>[assignedPartitions.size()];
+
+				int pos = 0;
+				for (KafkaTopicPartition partition : assignedPartitions) {
+					// create the kafka version specific partition handle
+					KPH kafkaHandle = createKafkaPartitionHandle(partition);
+					partitions[pos++] = new KafkaTopicPartitionState<>(partition, kafkaHandle);
+				}
+
+				return partitions;
+			}
+
+			case PERIODIC_WATERMARKS: {
+				@SuppressWarnings("unchecked")
+				KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[] partitions =
+						(KafkaTopicPartitionStateWithPeriodicWatermarks<T, KPH>[])
+								new KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[assignedPartitions.size()];
+
+				int pos = 0;
+				for (KafkaTopicPartition partition : assignedPartitions) {
+					KPH kafkaHandle = createKafkaPartitionHandle(partition);
+
 					AssignerWithPeriodicWatermarks<T> assignerInstance =
 							watermarksPeriodic.deserializeValue(userCodeClassLoader);
-					partitionState = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
+					
+					partitions[pos++] = new KafkaTopicPartitionStateWithPeriodicWatermarks<>(
 							partition, kafkaHandle, assignerInstance);
-					break;
 				}
-					
-				case PUNCTUATED_WATERMARKS: {
+
+				return partitions;
+			}
+
+			case PUNCTUATED_WATERMARKS: {
+				@SuppressWarnings("unchecked")
+				KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[] partitions =
+						(KafkaTopicPartitionStateWithPunctuatedWatermarks<T, KPH>[])
+								new KafkaTopicPartitionStateWithPunctuatedWatermarks<?, ?>[assignedPartitions.size()];
+
+				int pos = 0;
+				for (KafkaTopicPartition partition : assignedPartitions) {
+					KPH kafkaHandle = createKafkaPartitionHandle(partition);
+
 					AssignerWithPunctuatedWatermarks<T> assignerInstance =
 							watermarksPunctuated.deserializeValue(userCodeClassLoader);
-					partitionState = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
+
+					partitions[pos++] = new KafkaTopicPartitionStateWithPunctuatedWatermarks<>(
 							partition, kafkaHandle, assignerInstance);
-					break;
 				}
-				default:
-					// cannot happen, add this as a guard for the future
-					throw new RuntimeException();
-			}
 
-			partitions[pos++] = partitionState;
+				return partitions;
+			}
+			default:
+				// cannot happen, add this as a guard for the future
+				throw new RuntimeException();
 		}
-		
-		return partitions;
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
index 9a0e4e3..c736493 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/ExceptionProxy.java
@@ -22,7 +22,48 @@ import javax.annotation.Nullable;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
+ * A proxy that communicates exceptions between threads. Typically used if an exception
+ * from a spawned thread needs to be recognized by the "parent" (spawner) thread.
  * 
+ * <p>The spawned thread would set the exception via {@link #reportError(Throwable)}.
+ * The parent would check (at certain points) for exceptions via {@link #checkAndThrowException()}.
+ * Optionally, the parent can pass itself in the constructor to be interrupted as soon as
+ * an exception occurs.
+ * 
+ * <pre>
+ * {@code
+ * 
+ * final ExceptionProxy errorProxy = new ExceptionProxy(Thread.currentThread());
+ * 
+ * Thread subThread = new Thread() {
+ * 
+ *     public void run() {
+ *         try {
+ *             doSomething();
+ *         } catch (Throwable t) {
+ *             errorProxy.reportError(
+ *         } finally {
+ *             doSomeCleanup();
+ *         }
+ *     }
+ * };
+ * subThread.start();
+ * 
+ * doSomethingElse();
+ * errorProxy.checkAndThrowException();
+ * 
+ * doSomethingMore();
+ * errorProxy.checkAndThrowException();
+ * 
+ * try {
+ *     subThread.join();
+ * } catch (InterruptedException e) {
+ *     errorProxy.checkAndThrowException();
+ *     // restore interrupted status, if not caused by an exception
+ *     Thread.currentThread().interrupt();
+ * }
+ * }
+ * </pre>
  */
 public class ExceptionProxy {
 	
@@ -33,6 +74,8 @@ public class ExceptionProxy {
 	private final AtomicReference<Throwable> exception;
 
 	/**
+	 * Creates an exception proxy that interrupts the given thread upon
+	 * report of an exception. The thread to interrupt may be null.
 	 * 
 	 * @param toInterrupt The thread to interrupt upon an exception. May be null.
 	 */
@@ -44,18 +87,27 @@ public class ExceptionProxy {
 	// ------------------------------------------------------------------------
 	
 	/**
-	 * Sets the exception occurred and interrupts the target thread,
+	 * Sets the exception and interrupts the target thread,
 	 * if no other exception has occurred so far.
 	 * 
+	 * <p>The exception is only set (and the interruption is only triggered),
+	 * if no other exception was set before.
+	 * 
 	 * @param t The exception that occurred
 	 */
 	public void reportError(Throwable t) {
-		// set the exception, if it is the first
-		if (exception.compareAndSet(null, t) && toInterrupt != null) {
+		// set the exception, if it is the first (and the exception is non null)
+		if (t != null && exception.compareAndSet(null, t) && toInterrupt != null)
{
 			toInterrupt.interrupt();
 		}
 	}
-	
+
+	/**
+	 * Checks whether an exception has been set via {@link #reportError(Throwable)}.
+	 * If yes, that exception if re-thrown by this method.
+	 * 
+	 * @throws Exception This method re-throws the exception, if set.
+	 */
 	public void checkAndThrowException() throws Exception {
 		Throwable t = exception.get();
 		if (t != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
index bda90bd..fc07247 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
@@ -19,23 +19,36 @@ package org.apache.flink.streaming.connectors.kafka.util;
 
 import java.util.Properties;
 
+/**
+ * Simple utilities, used by the Flink Kafka Consumers.
+ */
 public class KafkaUtils {
 
 	public static int getIntFromConfig(Properties config, String key, int defaultValue) {
-		try {
-			return Integer.parseInt(config.getProperty(key, Integer.toString(defaultValue)));
-		} catch(NumberFormatException nfe) {
-			throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set
correctly. " +
-					"Entered value='" + config.getProperty(key) + "'. Default value='" + defaultValue +
"'");
+		String val = config.getProperty(key);
+		if (val == null) {
+			return defaultValue;
+		} else {
+			try {
+				return Integer.parseInt(val);
+			} catch (NumberFormatException nfe) {
+				throw new IllegalArgumentException("Value for configuration key='" + key + "' is not
set correctly. " +
+						"Entered value='" + val + "'. Default value='" + defaultValue + "'");
+			}
 		}
 	}
 
 	public static long getLongFromConfig(Properties config, String key, long defaultValue) {
-		try {
-			return Long.parseLong(config.getProperty(key, Long.toString(defaultValue)));
-		} catch(NumberFormatException nfe) {
-			throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set
correctly. " +
-					"Entered value='" + config.getProperty(key) + "'. Default value='" + defaultValue +
"'");
+		String val = config.getProperty(key);
+		if (val == null) {
+			return defaultValue;
+		} else {
+			try {
+				return Long.parseLong(val);
+			} catch (NumberFormatException nfe) {
+				throw new IllegalArgumentException("Value for configuration key='" + key + "' is not
set correctly. " +
+						"Entered value='" + val + "'. Default value='" + defaultValue + "'");
+			}
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
new file mode 100644
index 0000000..c073a04
--- /dev/null
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.internals;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
+import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.SerializedValue;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class AbstractFetcherTimestampsTest {
+	
+	@Test
+	public void testPunctuatedWatermarks() throws Exception {
+		List<KafkaTopicPartition> originalPartitions = Arrays.asList(
+				new KafkaTopicPartition("test topic name", 7),
+				new KafkaTopicPartition("test topic name", 13),
+				new KafkaTopicPartition("test topic name", 21));
+
+		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+		TestFetcher<Long> fetcher = new TestFetcher<>(
+				sourceContext, originalPartitions, null,
+				new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()),
+				new MockRuntimeContext(17, 3));
+
+		final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
+		final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
+		final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+
+		// elements generate a watermark if the timestamp is a multiple of three
+		
+		// elements for partition 1
+		fetcher.emitRecord(1L, part1, 1L);
+		fetcher.emitRecord(2L, part1, 2L);
+		fetcher.emitRecord(3L, part1, 3L);
+		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+		assertFalse(sourceContext.hasWatermark());
+
+		// elements for partition 2
+		fetcher.emitRecord(12L, part2, 1L);
+		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+		assertFalse(sourceContext.hasWatermark());
+
+		// elements for partition 3
+		fetcher.emitRecord(101L, part3, 1L);
+		fetcher.emitRecord(102L, part3, 2L);
+		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+		
+		// now, we should have a watermark
+		assertTrue(sourceContext.hasWatermark());
+		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+		
+		// advance partition 3
+		fetcher.emitRecord(1003L, part3, 3L);
+		fetcher.emitRecord(1004L, part3, 4L);
+		fetcher.emitRecord(1005L, part3, 5L);
+		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+		// advance partition 1 beyond partition 2 - this bumps the watermark
+		fetcher.emitRecord(30L, part1, 4L);
+		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+		assertTrue(sourceContext.hasWatermark());
+		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+		// advance partition 2 again - this bumps the watermark
+		fetcher.emitRecord(13L, part2, 2L);
+		assertFalse(sourceContext.hasWatermark());
+		fetcher.emitRecord(14L, part2, 3L);
+		assertFalse(sourceContext.hasWatermark());
+		fetcher.emitRecord(15L, part2, 3L);
+		assertTrue(sourceContext.hasWatermark());
+		assertEquals(15L, sourceContext.getLatestWatermark().getTimestamp());
+	}
+	
+	@Test
+	public void testPeriodicWatermarks() throws Exception {
+		ExecutionConfig config = new ExecutionConfig();
+		config.setAutoWatermarkInterval(10);
+		
+		List<KafkaTopicPartition> originalPartitions = Arrays.asList(
+				new KafkaTopicPartition("test topic name", 7),
+				new KafkaTopicPartition("test topic name", 13),
+				new KafkaTopicPartition("test topic name", 21));
+
+		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
+
+		TestFetcher<Long> fetcher = new TestFetcher<>(
+				sourceContext, originalPartitions,
+				new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
+				null, new MockRuntimeContext(17, 3, config, sourceContext.getCheckpointLock()));
+
+		final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
+		final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
+		final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+
+		// elements generate a watermark if the timestamp is a multiple of three
+
+		// elements for partition 1
+		fetcher.emitRecord(1L, part1, 1L);
+		fetcher.emitRecord(2L, part1, 2L);
+		fetcher.emitRecord(3L, part1, 3L);
+		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+
+		// elements for partition 2
+		fetcher.emitRecord(12L, part2, 1L);
+		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+
+		// elements for partition 3
+		fetcher.emitRecord(101L, part3, 1L);
+		fetcher.emitRecord(102L, part3, 2L);
+		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+
+		// now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
+		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+		// advance partition 3
+		fetcher.emitRecord(1003L, part3, 3L);
+		fetcher.emitRecord(1004L, part3, 4L);
+		fetcher.emitRecord(1005L, part3, 5L);
+		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+		// advance partition 1 beyond partition 2 - this bumps the watermark
+		fetcher.emitRecord(30L, part1, 4L);
+		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+		
+		// this blocks until the periodic thread emitted the watermark
+		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+		// advance partition 2 again - this bumps the watermark
+		fetcher.emitRecord(13L, part2, 2L);
+		fetcher.emitRecord(14L, part2, 3L);
+		fetcher.emitRecord(15L, part2, 3L);
+
+		// this blocks until the periodic thread emitted the watermark
+		long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
+		assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Test mocks
+	// ------------------------------------------------------------------------
+
+	private static final class TestFetcher<T> extends AbstractFetcher<T, Object>
{
+
+		protected TestFetcher(
+				SourceContext<T> sourceContext,
+				List<KafkaTopicPartition> assignedPartitions,
+				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
+				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
+				StreamingRuntimeContext runtimeContext) throws Exception
+		{
+			super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext);
+		}
+
+		@Override
+		public void runFetchLoop() throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void cancel() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public Object createKafkaPartitionHandle(KafkaTopicPartition partition) {
+			return new Object();
+		}
+
+		@Override
+		public void commitSpecificOffsetsToKafka(Map<KafkaTopicPartition, Long> offsets)
throws Exception {
+			throw new UnsupportedOperationException();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class TestSourceContext<T> implements SourceContext<T>
{
+
+		private final Object checkpointLock = new Object();
+		private final Object watermarkLock = new Object();
+
+		private volatile StreamRecord<T> latestElement;
+		private volatile Watermark currentWatermark;
+
+		@Override
+		public void collect(T element) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			this.latestElement = new StreamRecord<T>(element, timestamp);
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			synchronized (watermarkLock) {
+				currentWatermark = mark;
+				watermarkLock.notifyAll();
+			}
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return checkpointLock;
+		}
+
+		@Override
+		public void close() {}
+
+		public StreamRecord<T> getLatestElement() {
+			return latestElement;
+		}
+
+		public boolean hasWatermark() {
+			return currentWatermark != null;
+		}
+		
+		public Watermark getLatestWatermark() throws InterruptedException {
+			synchronized (watermarkLock) {
+				while (currentWatermark == null) {
+					watermarkLock.wait();
+				}
+				Watermark wm = currentWatermark;
+				currentWatermark = null;
+				return wm;
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class PeriodicTestExtractor implements AssignerWithPeriodicWatermarks<Long>
{
+
+		private volatile long maxTimestamp = Long.MIN_VALUE;
+		
+		@Override
+		public long extractTimestamp(Long element, long previousElementTimestamp) {
+			maxTimestamp = Math.max(maxTimestamp, element);
+			return element;
+		}
+
+		@Nullable
+		@Override
+		public Watermark getCurrentWatermark() {
+			return new Watermark(maxTimestamp);
+		}
+	}
+
+	private static class PunctuatedTestExtractor implements AssignerWithPunctuatedWatermarks<Long>
{
+
+		@Override
+		public long extractTimestamp(Long element, long previousElementTimestamp) {
+			return element;
+		}
+
+		@Nullable
+		@Override
+		public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
+			return extractedTimestamp % 3 == 0 ? new Watermark(extractedTimestamp) : null;
+		}
+		
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2dcd27f4/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
index e74eee4..3e46503 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
@@ -37,24 +37,43 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.streaming.runtime.operators.Triggerable;
 
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings("deprecation")
 public class MockRuntimeContext extends StreamingRuntimeContext {
 
 	private final int numberOfParallelSubtasks;
 	private final int indexOfThisSubtask;
+	
+	private final ExecutionConfig execConfig;
+	private final Object checkpointLock;
 
+	private ScheduledExecutorService timer;
+	
 	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
+		this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig(), null);
+	}
+	
+	public MockRuntimeContext(
+			int numberOfParallelSubtasks, int indexOfThisSubtask, 
+			ExecutionConfig execConfig,
+			Object checkpointLock) {
 		super(new MockStreamOperator(),
 				new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
 				Collections.<String, Accumulator<?, ?>>emptyMap());
+		
 		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
 		this.indexOfThisSubtask = indexOfThisSubtask;
+		this.execConfig = execConfig;
+		this.checkpointLock = checkpointLock;
 	}
 
 	@Override
@@ -64,7 +83,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 
 	@Override
 	public String getTaskName() {
-		return null;
+		return "mock task";
 	}
 
 	@Override
@@ -84,7 +103,7 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 
 	@Override
 	public ExecutionConfig getExecutionConfig() {
-		throw new UnsupportedOperationException();
+		return execConfig;
 	}
 
 	@Override
@@ -167,6 +186,29 @@ public class MockRuntimeContext extends StreamingRuntimeContext {
 		throw new UnsupportedOperationException();
 	}
 	
+	@Override
+	public void registerTimer(final long time, final Triggerable target) {
+		if (timer == null) {
+			timer = Executors.newSingleThreadScheduledExecutor();
+		}
+		
+		final long delay = Math.max(time - System.currentTimeMillis(), 0);
+
+		timer.schedule(new Runnable() {
+			@Override
+			public void run() {
+				synchronized (checkpointLock) {
+					try {
+						target.trigger(time);
+					} catch (Throwable t) {
+						System.err.println("!!! Caught exception while processing timer. !!!");
+						t.printStackTrace();
+					}
+				}
+			}
+		}, delay, TimeUnit.MILLISECONDS);
+	}
+
 	// ------------------------------------------------------------------------
 
 	private static class MockStreamOperator extends AbstractStreamOperator<Integer> {


Mime
View raw message