flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [01/11] flink git commit: [FLINK-4877] Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests
Date Fri, 21 Oct 2016 17:14:16 GMT
Repository: flink
Updated Branches:
  refs/heads/master 71d2e3ef1 -> 770f2f83a


[FLINK-4877] Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0859a698
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0859a698
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0859a698

Branch: refs/heads/master
Commit: 0859a698253f07a28442ee7232e1adb76013dbd3
Parents: 3055475
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Wed Sep 28 15:10:35 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Oct 21 19:03:04 2016 +0200

----------------------------------------------------------------------
 .../flink-connector-kafka-0.10/pom.xml          |   8 +
 .../connectors/kafka/FlinkKafkaConsumer010.java |  19 +-
 .../kafka/internal/Kafka010Fetcher.java         |  26 ++-
 .../connectors/kafka/Kafka010FetcherTest.java   |  33 ++-
 .../kafka/KafkaTestEnvironmentImpl.java         |   9 +
 .../flink-connector-kafka-0.8/pom.xml           |  10 +-
 .../kafka/internals/Kafka08Fetcher.java         |  10 +-
 .../connectors/kafka/KafkaProducerTest.java     |  32 +--
 .../kafka/KafkaTestEnvironmentImpl.java         |  16 ++
 .../flink-connector-kafka-0.9/pom.xml           |   8 +
 .../connectors/kafka/FlinkKafkaConsumer09.java  |  19 +-
 .../kafka/internal/Kafka09Fetcher.java          |  38 +++-
 .../connectors/kafka/Kafka09FetcherTest.java    |  38 +++-
 .../connectors/kafka/KafkaProducerTest.java     |  30 +--
 .../kafka/KafkaTestEnvironmentImpl.java         |  12 ++
 .../kafka/internals/AbstractFetcher.java        |  13 +-
 .../kafka/AtLeastOnceProducerTest.java          |  25 ++-
 .../connectors/kafka/KafkaTestEnvironment.java  |   5 +
 .../AbstractFetcherTimestampsTest.java          | 151 +++++++-------
 .../kafka/testutils/DataGenerators.java         |  25 ++-
 .../kafka/testutils/MockRuntimeContext.java     | 209 -------------------
 .../util/OneInputStreamOperatorTestHarness.java |  14 +-
 22 files changed, 383 insertions(+), 367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
index 0b426b5..8108afc 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/pom.xml
@@ -72,6 +72,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-kafka-0.9_2.10</artifactId>
 			<version>${project.version}</version>
 			<exclusions>

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
index 267ff57..a9ce336 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
@@ -134,9 +134,20 @@ public class FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> {
 
 		boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
 
-		return new Kafka010Fetcher<>(sourceContext, thisSubtaskPartitions,
-				watermarksPeriodic, watermarksPunctuated,
-				runtimeContext, deserializer,
-				properties, pollTimeout, useMetrics);
+		return new Kafka010Fetcher<>(
+				sourceContext,
+				thisSubtaskPartitions,
+				watermarksPeriodic,
+				watermarksPunctuated,
+				runtimeContext.getProcessingTimeService(),
+				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+				runtimeContext.getUserCodeClassLoader(),
+				runtimeContext.isCheckpointingEnabled(),
+				runtimeContext.getTaskNameWithSubtasks(),
+				runtimeContext.getMetricGroup(),
+				deserializer,
+				properties,
+				pollTimeout,
+				useMetrics);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
index 47bee22..4a1f5f6 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010Fetcher.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.streaming.connectors.kafka.internal;
 
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 
@@ -46,13 +47,32 @@ public class Kafka010Fetcher<T> extends Kafka09Fetcher<T> {
 			List<KafkaTopicPartition> assignedPartitions,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			StreamingRuntimeContext runtimeContext,
+			ProcessingTimeService processingTimeProvider,
+			long autoWatermarkInterval,
+			ClassLoader userCodeClassLoader,
+			boolean enableCheckpointing,
+			String taskNameWithSubtasks,
+			MetricGroup metricGroup,
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long pollTimeout,
 			boolean useMetrics) throws Exception
 	{
-		super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, deserializer, kafkaProperties, pollTimeout, useMetrics);
+		super(
+				sourceContext,
+				assignedPartitions,
+				watermarksPeriodic,
+				watermarksPunctuated,
+				processingTimeProvider,
+				autoWatermarkInterval,
+				userCodeClassLoader,
+				enableCheckpointing,
+				taskNameWithSubtasks,
+				metricGroup,
+				deserializer,
+				kafkaProperties,
+				pollTimeout,
+				useMetrics);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
index 76e3950..718db48 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010FetcherTest.java
@@ -20,10 +20,12 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -115,7 +117,20 @@ public class Kafka010FetcherTest {
         StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
 
         final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-                sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
+                sourceContext,
+                topics,
+                null, /* periodic assigner */
+                null, /* punctuated assigner */
+                new TestProcessingTimeService(),
+                10,
+                getClass().getClassLoader(),
+                false, /* checkpointing */
+                "taskname-with-subtask",
+                mock(MetricGroup.class),
+                schema,
+                new Properties(),
+                0L,
+                false);
 
         // ----- run the fetcher -----
 
@@ -235,7 +250,21 @@ public class Kafka010FetcherTest {
         StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
 
         final Kafka010Fetcher<String> fetcher = new Kafka010Fetcher<>(
-                sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
+                sourceContext,
+                topics,
+                null, /* periodic assigner */
+                null, /* punctuated assigner */
+                new TestProcessingTimeService(),
+                10,
+                getClass().getClassLoader(),
+                false, /* checkpointing */
+                "taskname-with-subtask",
+                mock(MetricGroup.class),
+                schema,
+                new Properties(),
+                0L,
+                false);
+
 
         // ----- run the fetcher -----
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 7d12cde..c30a4dd 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -29,6 +29,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -113,6 +114,14 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> StreamSink<T> getProducerSink(String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
+		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
+		prod.setFlushOnCheckpoint(true);
+		return new StreamSink<>(prod);
+	}
+
+
+	@Override
 	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer010<T> prod = new FlinkKafkaProducer010<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
index 888208e..f17f9ae 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/pom.xml
@@ -119,7 +119,15 @@ under the License.
 		</dependency>
 
 		<!-- test dependencies -->
-		
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
 		<dependency>
 			<groupId>org.apache.curator</groupId>
 			<artifactId>curator-test</artifactId>

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
index 5861058..fbcb19c 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/Kafka08Fetcher.java
@@ -98,7 +98,15 @@ public class Kafka08Fetcher<T> extends AbstractFetcher<T, TopicAndPartition> {
 			long autoCommitInterval,
 			boolean useMetrics) throws Exception
 	{
-		super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, useMetrics);
+		super(
+				sourceContext,
+				assignedPartitions,
+				watermarksPeriodic,
+				watermarksPunctuated,
+				runtimeContext.getProcessingTimeService(),
+				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+				runtimeContext.getUserCodeClassLoader(),
+				useMetrics);
 
 		this.deserializer = checkNotNull(deserializer);
 		this.kafkaConfig = checkNotNull(kafkaProperties);

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index 8602ffe..7efa94e 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
 
@@ -80,12 +81,14 @@ public class KafkaProducerTest extends TestLogger {
 			FlinkKafkaProducer08<String> producerPropagating = new FlinkKafkaProducer08<>(
 					"mock_topic", new SimpleStringSchema(), new Properties(), null);
 
-			producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3));
-			producerPropagating.open(new Configuration());
-			
+			OneInputStreamOperatorTestHarness<String, Object> testHarness =
+					new OneInputStreamOperatorTestHarness<>(new StreamSink<>(producerPropagating));
+
+			testHarness.open();
+
 			try {
-				producerPropagating.invoke("value");
-				producerPropagating.invoke("value");
+				testHarness.processElement(new StreamRecord<>("value"));
+				testHarness.processElement(new StreamRecord<>("value"));
 				fail("This should fail with an exception");
 			}
 			catch (Exception e) {
@@ -94,17 +97,22 @@ public class KafkaProducerTest extends TestLogger {
 				assertTrue(e.getCause().getMessage().contains("Test error"));
 			}
 
+			testHarness.close();
+
 			// (2) producer that only logs errors
 
 			FlinkKafkaProducer08<String> producerLogging = new FlinkKafkaProducer08<>(
 					"mock_topic", new SimpleStringSchema(), new Properties(), null);
 			producerLogging.setLogFailuresOnly(true);
-			
-			producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3));
-			producerLogging.open(new Configuration());
 
-			producerLogging.invoke("value");
-			producerLogging.invoke("value");
+			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
+
+			testHarness.open();
+
+			testHarness.processElement(new StreamRecord<>("value"));
+			testHarness.processElement(new StreamRecord<>("value"));
+
+			testHarness.close();
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 567d22d..6235449 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -34,6 +34,7 @@ import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionLeader;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
@@ -105,6 +106,21 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> StreamSink<T> getProducerSink(
+			String topic,
+			KeyedSerializationSchema<T> serSchema,
+			Properties props,
+			KafkaPartitioner<T> partitioner) {
+		FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(
+				topic,
+				serSchema,
+				props,
+				partitioner);
+		prod.setFlushOnCheckpoint(true);
+		return new StreamSink<>(prod);
+	}
+
+	@Override
 	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer08<T> prod = new FlinkKafkaProducer08<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
index bfcde82..f638c7a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/pom.xml
@@ -83,6 +83,14 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-java_2.10</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-connector-kafka-base_2.10</artifactId>
 			<version>${project.version}</version>
 			<exclusions>

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
index a97476a..29bb8e4 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java
@@ -177,10 +177,21 @@ public class FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> {
 
 		boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));
 
-		return new Kafka09Fetcher<>(sourceContext, thisSubtaskPartitions,
-				watermarksPeriodic, watermarksPunctuated,
-				runtimeContext, deserializer,
-				properties, pollTimeout, useMetrics);
+		return new Kafka09Fetcher<>(
+				sourceContext,
+				thisSubtaskPartitions,
+				watermarksPeriodic,
+				watermarksPunctuated,
+				runtimeContext.getProcessingTimeService(),
+				runtimeContext.getExecutionConfig().getAutoWatermarkInterval(),
+				runtimeContext.getUserCodeClassLoader(),
+				runtimeContext.isCheckpointingEnabled(),
+				runtimeContext.getTaskNameWithSubtasks(),
+				runtimeContext.getMetricGroup(),
+				deserializer,
+				properties,
+				pollTimeout,
+				useMetrics);
 		
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
index af3b199..a8c0397 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java
@@ -18,17 +18,16 @@
 
 package org.apache.flink.streaming.connectors.kafka.internal;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.ExceptionProxy;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
 import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.util.SerializedValue;
 
@@ -67,9 +66,6 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 	/** The schema to convert between Kafka's byte messages, and Flink's objects */
 	private final KeyedDeserializationSchema<T> deserializer;
 
-	/** The subtask's runtime context */
-	private final RuntimeContext runtimeContext;
-
 	/** The configuration for the Kafka consumer */
 	private final Properties kafkaProperties;
 
@@ -94,6 +90,12 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 	/** Flag tracking whether the latest commit request has completed */
 	private volatile boolean commitInProgress;
 
+	/** For Debug output **/
+	private String taskNameWithSubtasks;
+
+	/** We get this from the outside to publish metrics. **/
+	private MetricGroup metricGroup;
+
 	// ------------------------------------------------------------------------
 
 	public Kafka09Fetcher(
@@ -101,24 +103,38 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 			List<KafkaTopicPartition> assignedPartitions,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			StreamingRuntimeContext runtimeContext,
+			ProcessingTimeService processingTimeProvider,
+			long autoWatermarkInterval,
+			ClassLoader userCodeClassLoader,
+			boolean enableCheckpointing,
+			String taskNameWithSubtasks,
+			MetricGroup metricGroup,
 			KeyedDeserializationSchema<T> deserializer,
 			Properties kafkaProperties,
 			long pollTimeout,
 			boolean useMetrics) throws Exception
 	{
-		super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, useMetrics);
+		super(
+				sourceContext,
+				assignedPartitions,
+				watermarksPeriodic,
+				watermarksPunctuated,
+				processingTimeProvider,
+				autoWatermarkInterval,
+				userCodeClassLoader,
+				useMetrics);
 
 		this.deserializer = deserializer;
-		this.runtimeContext = runtimeContext;
 		this.kafkaProperties = kafkaProperties;
 		this.pollTimeout = pollTimeout;
 		this.nextOffsetsToCommit = new AtomicReference<>();
 		this.offsetCommitCallback = new CommitCallback();
+		this.taskNameWithSubtasks = taskNameWithSubtasks;
+		this.metricGroup = metricGroup;
 
 		// if checkpointing is enabled, we are not automatically committing to Kafka.
 		kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
-				Boolean.toString(!runtimeContext.isCheckpointingEnabled()));
+				Boolean.toString(!enableCheckpointing));
 	}
 
 	// ------------------------------------------------------------------------
@@ -131,7 +147,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 
 		// rather than running the main fetch loop directly here, we spawn a dedicated thread
 		// this makes sure that no interrupt() call upon canceling reaches the Kafka consumer code
-		Thread runner = new Thread(this, getFetcherName() + " for " + runtimeContext.getTaskNameWithSubtasks());
+		Thread runner = new Thread(this, getFetcherName() + " for " + taskNameWithSubtasks);
 		runner.setDaemon(true);
 		runner.start();
 
@@ -187,7 +203,7 @@ public class Kafka09Fetcher<T> extends AbstractFetcher<T, TopicPartition> implem
 
 
 			if (useMetrics) {
-				final MetricGroup kafkaMetricGroup = runtimeContext.getMetricGroup().addGroup("KafkaConsumer");
+				final MetricGroup kafkaMetricGroup = metricGroup.addGroup("KafkaConsumer");
 				addOffsetStateGauge(kafkaMetricGroup);
 				// register Kafka metrics to Flink
 				Map<MetricName, ? extends Metric> metrics = consumer.metrics();

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
index c5cf0cc..1162599 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java
@@ -20,10 +20,11 @@ package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -112,10 +113,22 @@ public class Kafka09FetcherTest {
 		SourceContext<String> sourceContext = mock(SourceContext.class);
 		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
 		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-		StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
-		
+
 		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
-				sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
+				sourceContext,
+				topics,
+				null, /* periodic watermark extractor */
+				null, /* punctuated watermark extractor */
+				new TestProcessingTimeService(),
+				10, /* watermark interval */
+				this.getClass().getClassLoader(),
+				true, /* checkpointing */
+				"task_name",
+				mock(MetricGroup.class),
+				schema,
+				new Properties(),
+				0L,
+				false);
 
 		// ----- run the fetcher -----
 
@@ -236,10 +249,23 @@ public class Kafka09FetcherTest {
 		SourceContext<String> sourceContext = mock(SourceContext.class);
 		List<KafkaTopicPartition> topics = Collections.singletonList(new KafkaTopicPartition("test", 42));
 		KeyedDeserializationSchema<String> schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema());
-		StreamingRuntimeContext context = mock(StreamingRuntimeContext.class);
 
 		final Kafka09Fetcher<String> fetcher = new Kafka09Fetcher<>(
-				sourceContext, topics, null, null, context, schema, new Properties(), 0L, false);
+				sourceContext,
+				topics,
+				null, /* periodic watermark extractor */
+				null, /* punctuated watermark extractor */
+				new TestProcessingTimeService(),
+				10, /* watermark interval */
+				this.getClass().getClassLoader(),
+				true, /* checkpointing */
+				"task_name",
+				mock(MetricGroup.class),
+				schema,
+				new Properties(),
+				0L,
+				false);
+
 
 		// ----- run the fetcher -----
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
index b80a231..31691d5 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java
@@ -18,8 +18,9 @@
 
 package org.apache.flink.streaming.connectors.kafka;
 
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.util.TestLogger;
 
@@ -85,12 +86,14 @@ public class KafkaProducerTest extends TestLogger {
 			FlinkKafkaProducer09<String> producerPropagating = new FlinkKafkaProducer09<>(
 					"mock_topic", new SimpleStringSchema(), new Properties(), null);
 
-			producerPropagating.setRuntimeContext(new MockRuntimeContext(17, 3));
-			producerPropagating.open(new Configuration());
-			
+			OneInputStreamOperatorTestHarness<String, Object> testHarness =
+					new OneInputStreamOperatorTestHarness<>(new StreamSink(producerPropagating));
+
+			testHarness.open();
+
 			try {
-				producerPropagating.invoke("value");
-				producerPropagating.invoke("value");
+				testHarness.processElement(new StreamRecord<>("value"));
+				testHarness.processElement(new StreamRecord<>("value"));
 				fail("This should fail with an exception");
 			}
 			catch (Exception e) {
@@ -104,12 +107,15 @@ public class KafkaProducerTest extends TestLogger {
 			FlinkKafkaProducer09<String> producerLogging = new FlinkKafkaProducer09<>(
 					"mock_topic", new SimpleStringSchema(), new Properties(), null);
 			producerLogging.setLogFailuresOnly(true);
-			
-			producerLogging.setRuntimeContext(new MockRuntimeContext(17, 3));
-			producerLogging.open(new Configuration());
 
-			producerLogging.invoke("value");
-			producerLogging.invoke("value");
+			testHarness = new OneInputStreamOperatorTestHarness<>(new StreamSink(producerLogging));
+
+			testHarness.open();
+
+			testHarness.processElement(new StreamRecord<>("value"));
+			testHarness.processElement(new StreamRecord<>("value"));
+
+			testHarness.close();
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 223dacb..1802e0c 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -31,6 +31,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.testutils.ZooKeeperStringSerializer;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -99,6 +100,17 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	}
 
 	@Override
+	public <T> StreamSink<T> getProducerSink(
+			String topic,
+			KeyedSerializationSchema<T> serSchema,
+			Properties props,
+			KafkaPartitioner<T> partitioner) {
+		FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
+		prod.setFlushOnCheckpoint(true);
+		return new StreamSink<>(prod);
+	}
+
+	@Override
 	public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, KafkaPartitioner<T> partitioner) {
 		FlinkKafkaProducer09<T> prod = new FlinkKafkaProducer09<>(topic, serSchema, props, partitioner);
 		prod.setFlushOnCheckpoint(true);

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
index 065b54f..321991a 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
@@ -23,7 +23,6 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
@@ -81,7 +80,9 @@ public abstract class AbstractFetcher<T, KPH> {
 			List<KafkaTopicPartition> assignedPartitions,
 			SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 			SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-			StreamingRuntimeContext runtimeContext,
+			ProcessingTimeService processingTimeProvider,
+			long autoWatermarkInterval,
+			ClassLoader userCodeClassLoader,
 			boolean useMetrics) throws Exception
 	{
 		this.sourceContext = checkNotNull(sourceContext);
@@ -110,7 +111,7 @@ public abstract class AbstractFetcher<T, KPH> {
 				assignedPartitions,
 				timestampWatermarkMode,
 				watermarksPeriodic, watermarksPunctuated,
-				runtimeContext.getUserCodeClassLoader());
+				userCodeClassLoader);
 		
 		// if we have periodic watermarks, kick off the interval scheduler
 		if (timestampWatermarkMode == PERIODIC_WATERMARKS) {
@@ -118,7 +119,7 @@ public abstract class AbstractFetcher<T, KPH> {
 					(KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?>[]) allPartitions;
 			
 			PeriodicWatermarkEmitter periodicEmitter = 
-					new PeriodicWatermarkEmitter(parts, sourceContext, runtimeContext.getProcessingTimeService(), runtimeContext.getExecutionConfig().getAutoWatermarkInterval());
+					new PeriodicWatermarkEmitter(parts, sourceContext, processingTimeProvider, autoWatermarkInterval);
 			periodicEmitter.start();
 		}
 	}
@@ -495,9 +496,7 @@ public abstract class AbstractFetcher<T, KPH> {
 		
 		@Override
 		public void trigger(long timestamp) throws Exception {
-			// sanity check
-			assert Thread.holdsLock(emitter.getCheckpointLock());
-			
+
 			long minAcrossAll = Long.MAX_VALUE;
 			for (KafkaTopicPartitionStateWithPeriodicWatermarks<?, ?> state : allPartitions) {
 				

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
index 5e9bacc..6d92f9b 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/AtLeastOnceProducerTest.java
@@ -19,10 +19,12 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.operators.StreamSink;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
@@ -69,16 +71,21 @@ public class AtLeastOnceProducerTest {
 	private void runTest(boolean flushOnCheckpoint) throws Throwable {
 		Properties props = new Properties();
 		final AtomicBoolean snapshottingFinished = new AtomicBoolean(false);
+
 		final TestingKafkaProducer<String> producer = new TestingKafkaProducer<>("someTopic", new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()), props,
 				snapshottingFinished);
+
 		producer.setFlushOnCheckpoint(flushOnCheckpoint);
-		producer.setRuntimeContext(new MockRuntimeContext(0, 1));
 
-		producer.open(new Configuration());
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+				new OneInputStreamOperatorTestHarness<>(new StreamSink(producer));
+
+		testHarness.open();
 
 		for (int i = 0; i < 100; i++) {
-			producer.invoke("msg-" + i);
+			testHarness.processElement(new StreamRecord<>("msg-" + i));
 		}
+
 		// start a thread confirming all pending records
 		final Tuple1<Throwable> runnableError = new Tuple1<>(null);
 		final Thread threadA = Thread.currentThread();
@@ -113,8 +120,10 @@ public class AtLeastOnceProducerTest {
 		};
 		Thread threadB = new Thread(confirmer);
 		threadB.start();
+
 		// this should block:
-		producer.snapshotState(new StateSnapshotContextSynchronousImpl(0, 0));
+		testHarness.snapshot(0, 0);
+
 		synchronized (threadA) {
 			threadA.notifyAll(); // just in case, to let the test fail faster
 		}
@@ -128,14 +137,14 @@ public class AtLeastOnceProducerTest {
 			throw runnableError.f0;
 		}
 
-		producer.close();
+		testHarness.close();
 	}
 
 
 	private static class TestingKafkaProducer<T> extends FlinkKafkaProducerBase<T> {
-		private static final long serialVersionUID = -1759403646061180067L;
+		private static final long serialVersionUID = 1L;
 
-		private MockProducer prod;
+		private transient MockProducer prod;
 		private AtomicBoolean snapshottingFinished;
 
 		public TestingKafkaProducer(String defaultTopicId, KeyedSerializationSchema<T> serializationSchema, Properties producerConfig, AtomicBoolean snapshottingFinished) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 806d342..10c7b86 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -21,6 +21,7 @@ import kafka.server.KafkaServer;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
@@ -81,6 +82,10 @@ public abstract class KafkaTestEnvironment {
 
 	public abstract <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KeyedDeserializationSchema<T> readSchema, Properties props);
 
+	public abstract <T> StreamSink<T> getProducerSink(String topic,
+			KeyedSerializationSchema<T> serSchema, Properties props,
+			KafkaPartitioner<T> partitioner);
+
 	public abstract <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic,
 														KeyedSerializationSchema<T> serSchema, Properties props,
 														KafkaPartitioner<T> partitioner);

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
index 0782cb9..5801c24 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcherTimestampsTest.java
@@ -18,16 +18,12 @@
 
 package org.apache.flink.streaming.connectors.kafka.internals;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.connectors.kafka.testutils.MockRuntimeContext;
-import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
+import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.SerializedValue;
 
@@ -54,10 +50,15 @@ public class AbstractFetcherTimestampsTest {
 
 		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
 
+		TestProcessingTimeService processingTimeProvider = new TestProcessingTimeService();
+
 		TestFetcher<Long> fetcher = new TestFetcher<>(
-				sourceContext, originalPartitions, null,
+				sourceContext,
+				originalPartitions,
+				null, /* periodic watermark assigner */
 				new SerializedValue<AssignerWithPunctuatedWatermarks<Long>>(new PunctuatedTestExtractor()),
-				new MockRuntimeContext(17, 3));
+				processingTimeProvider,
+				0);
 
 		final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
 		final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
@@ -115,10 +116,6 @@ public class AbstractFetcherTimestampsTest {
 	
 	@Test
 	public void testPeriodicWatermarks() throws Exception {
-
-		ExecutionConfig config = new ExecutionConfig();
-		config.setAutoWatermarkInterval(10);
-
 		final String testTopic = "test topic name";
 		List<KafkaTopicPartition> originalPartitions = Arrays.asList(
 				new KafkaTopicPartition(testTopic, 7),
@@ -127,70 +124,71 @@ public class AbstractFetcherTimestampsTest {
 
 		TestSourceContext<Long> sourceContext = new TestSourceContext<>();
 
-		final AtomicReference<Throwable> errorRef = new AtomicReference<>();
-		final ProcessingTimeService timerService = new SystemProcessingTimeService(
-				new ReferenceSettingExceptionHandler(errorRef), sourceContext.getCheckpointLock());
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
 
-		try {
-			TestFetcher<Long> fetcher = new TestFetcher<>(
-					sourceContext, originalPartitions,
-					new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
-					null, new MockRuntimeContext(17, 3, config, timerService));
-	
-			final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
-			final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
-			final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
-	
-			// elements generate a watermark if the timestamp is a multiple of three
-	
-			// elements for partition 1
-			fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
-			fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
-			fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
-			assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
-			assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
-	
-			// elements for partition 2
-			fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
-			assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
-			assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
-	
-			// elements for partition 3
-			fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
-			fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
-			assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
-			assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
-	
-			// now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
-			assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
-	
-			// advance partition 3
-			fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
-			fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
-			fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
-			assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
-			assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
-	
-			// advance partition 1 beyond partition 2 - this bumps the watermark
-			fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
-			assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
-			assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
-			
-			// this blocks until the periodic thread emitted the watermark
-			assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
-	
-			// advance partition 2 again - this bumps the watermark
-			fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
-			fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
-			fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
-	
-			// this blocks until the periodic thread emitted the watermark
-			long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
-			assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
-		}
-		finally {
-			timerService.shutdownService();
-		}
+		TestFetcher<Long> fetcher = new TestFetcher<>(
+				sourceContext,
+				originalPartitions,
+				new SerializedValue<AssignerWithPeriodicWatermarks<Long>>(new PeriodicTestExtractor()),
+				null, /* punctuated watermarks assigner*/
+				processingTimeService,
+				10);
+
+		final KafkaTopicPartitionState<Object> part1 = fetcher.subscribedPartitions()[0];
+		final KafkaTopicPartitionState<Object> part2 = fetcher.subscribedPartitions()[1];
+		final KafkaTopicPartitionState<Object> part3 = fetcher.subscribedPartitions()[2];
+
+		// elements generate a watermark if the timestamp is a multiple of three
+
+		// elements for partition 1
+		fetcher.emitRecord(1L, part1, 1L, Long.MIN_VALUE);
+		fetcher.emitRecord(2L, part1, 2L, Long.MIN_VALUE);
+		fetcher.emitRecord(3L, part1, 3L, Long.MIN_VALUE);
+		assertEquals(3L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(3L, sourceContext.getLatestElement().getTimestamp());
+
+		// elements for partition 2
+		fetcher.emitRecord(12L, part2, 1L, Long.MIN_VALUE);
+		assertEquals(12L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(12L, sourceContext.getLatestElement().getTimestamp());
+
+		// elements for partition 3
+		fetcher.emitRecord(101L, part3, 1L, Long.MIN_VALUE);
+		fetcher.emitRecord(102L, part3, 2L, Long.MIN_VALUE);
+		assertEquals(102L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(102L, sourceContext.getLatestElement().getTimestamp());
+
+		processingTimeService.setCurrentTime(10);
+
+		// now, we should have a watermark (this blocks until the periodic thread emitted the watermark)
+		assertEquals(3L, sourceContext.getLatestWatermark().getTimestamp());
+
+		// advance partition 3
+		fetcher.emitRecord(1003L, part3, 3L, Long.MIN_VALUE);
+		fetcher.emitRecord(1004L, part3, 4L, Long.MIN_VALUE);
+		fetcher.emitRecord(1005L, part3, 5L, Long.MIN_VALUE);
+		assertEquals(1005L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(1005L, sourceContext.getLatestElement().getTimestamp());
+
+		// advance partition 1 beyond partition 2 - this bumps the watermark
+		fetcher.emitRecord(30L, part1, 4L, Long.MIN_VALUE);
+		assertEquals(30L, sourceContext.getLatestElement().getValue().longValue());
+		assertEquals(30L, sourceContext.getLatestElement().getTimestamp());
+
+		processingTimeService.setCurrentTime(20);
+
+		// this blocks until the periodic thread emitted the watermark
+		assertEquals(12L, sourceContext.getLatestWatermark().getTimestamp());
+
+		// advance partition 2 again - this bumps the watermark
+		fetcher.emitRecord(13L, part2, 2L, Long.MIN_VALUE);
+		fetcher.emitRecord(14L, part2, 3L, Long.MIN_VALUE);
+		fetcher.emitRecord(15L, part2, 3L, Long.MIN_VALUE);
+
+		processingTimeService.setCurrentTime(30);
+		// this blocks until the periodic thread emitted the watermark
+		long watermarkTs = sourceContext.getLatestWatermark().getTimestamp();
+		assertTrue(watermarkTs >= 13L && watermarkTs <= 15L);
 	}
 
 	// ------------------------------------------------------------------------
@@ -204,9 +202,10 @@ public class AbstractFetcherTimestampsTest {
 				List<KafkaTopicPartition> assignedPartitions,
 				SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
 				SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
-				StreamingRuntimeContext runtimeContext) throws Exception
+				ProcessingTimeService processingTimeProvider,
+				long autoWatermarkInterval) throws Exception
 		{
-			super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, runtimeContext, false);
+			super(sourceContext, assignedPartitions, watermarksPeriodic, watermarksPunctuated, processingTimeProvider, autoWatermarkInterval, TestFetcher.class.getClassLoader(), false);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index ba75212..9e8e1d9 100644
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.testutils;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeInfoParser;
@@ -36,6 +37,7 @@ import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FixedPartitioner;
 import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
 import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
 import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
@@ -44,6 +46,8 @@ import java.util.Collection;
 import java.util.Properties;
 import java.util.Random;
 
+import static org.mockito.Mockito.mock;
+
 @SuppressWarnings("serial")
 public class DataGenerators {
 
@@ -145,12 +149,17 @@ public class DataGenerators {
 				producerProperties.setProperty("retries", "3");
 				StreamTransformation<String> mockTransform = new MockStreamTransformation();
 				DataStream<String> stream = new DataStream<>(new DummyStreamExecutionEnvironment(), mockTransform);
-				DataStreamSink<String> sink = server.produceIntoKafka(stream, topic, new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
-						producerProperties, new FixedPartitioner<String>());
-				StreamSink<String> producerOperator = sink.getTransformation().getOperator();
-				producer = (RichFunction) producerOperator.getUserFunction();
-				producer.setRuntimeContext(new MockRuntimeContext(1,0));
-				producer.open(new Configuration());
+
+				StreamSink<String> sink = server.getProducerSink(
+						topic,
+						new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema()),
+						producerProperties,
+						new FixedPartitioner<String>());
+
+				OneInputStreamOperatorTestHarness<String, Object> testHarness =
+						new OneInputStreamOperatorTestHarness<>(sink);
+
+				testHarness.open();
 
 				final StringBuilder bld = new StringBuilder();
 				final Random rnd = new Random();
@@ -164,7 +173,7 @@ public class DataGenerators {
 					}
 
 					String next = bld.toString();
-					producerOperator.processElement(new StreamRecord<>(next));
+					testHarness.processElement(new StreamRecord<>(next));
 				}
 			}
 			catch (Throwable t) {
@@ -215,4 +224,4 @@ public class DataGenerators {
 			}
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
deleted file mode 100644
index f16eacd..0000000
--- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.testutils;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.accumulators.DoubleCounter;
-import org.apache.flink.api.common.accumulators.Histogram;
-import org.apache.flink.api.common.accumulators.IntCounter;
-import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.api.common.cache.DistributedCache;
-import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-@SuppressWarnings("deprecation")
-public class MockRuntimeContext extends StreamingRuntimeContext {
-
-	private final int numberOfParallelSubtasks;
-	private final int indexOfThisSubtask;
-
-	private final ExecutionConfig execConfig;
-
-	private final ProcessingTimeService timeServiceProvider;
-	
-	public MockRuntimeContext(int numberOfParallelSubtasks, int indexOfThisSubtask) {
-		this(numberOfParallelSubtasks, indexOfThisSubtask, new ExecutionConfig());
-	}
-
-	public MockRuntimeContext(
-			int numberOfParallelSubtasks,
-			int indexOfThisSubtask,
-			ExecutionConfig execConfig) {
-		this(numberOfParallelSubtasks, indexOfThisSubtask, execConfig, null);
-	}
-	
-	public MockRuntimeContext(
-			int numberOfParallelSubtasks,
-			int indexOfThisSubtask,
-			ExecutionConfig execConfig,
-			ProcessingTimeService timeServiceProvider) {
-		
-		super(new MockStreamOperator(),
-			new MockEnvironment("no", 4 * MemoryManager.DEFAULT_PAGE_SIZE, null, 16),
-			Collections.<String, Accumulator<?, ?>>emptyMap());
-
-		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
-		this.indexOfThisSubtask = indexOfThisSubtask;
-		this.execConfig = execConfig;
-		this.timeServiceProvider = timeServiceProvider;
-	}
-
-	@Override
-	public boolean isCheckpointingEnabled() {
-		return true;
-	}
-
-	@Override
-	public String getTaskName() {
-		return "mock task";
-	}
-
-	@Override
-	public int getNumberOfParallelSubtasks() {
-		return numberOfParallelSubtasks;
-	}
-
-	@Override
-	public int getIndexOfThisSubtask() {
-		return indexOfThisSubtask;
-	}
-
-	@Override
-	public int getAttemptNumber() {
-		return 0;
-	}
-
-	@Override
-	public ExecutionConfig getExecutionConfig() {
-		return execConfig;
-	}
-
-	@Override
-	public ClassLoader getUserCodeClassLoader() {
-		return getClass().getClassLoader();
-	}
-
-	@Override
-	public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {
-		// noop
-	}
-
-	@Override
-	public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Map<String, Accumulator<?, ?>> getAllAccumulators() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public IntCounter getIntCounter(String name) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public LongCounter getLongCounter(String name) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public DoubleCounter getDoubleCounter(String name) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Histogram getHistogram(String name) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public MetricGroup getMetricGroup() {
-		return new UnregisteredTaskMetricsGroup.DummyIOMetricGroup();
-	}
-
-	@Override
-	public <RT> List<RT> getBroadcastVariable(String name) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public <T, C> C getBroadcastVariableWithInitializer(String name, BroadcastVariableInitializer<T, C> initializer) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public DistributedCache getDistributedCache() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public ProcessingTimeService getProcessingTimeService() {
-		if (timeServiceProvider == null) {
-			throw new UnsupportedOperationException();
-		} else {
-			return timeServiceProvider;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static class MockStreamOperator extends AbstractStreamOperator<Integer> {
-		private static final long serialVersionUID = -1153976702711944427L;
-
-		@Override
-		public ExecutionConfig getExecutionConfig() {
-			return new ExecutionConfig();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0859a698/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 8041a7c..5b277bf 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -47,14 +47,16 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.InstantiationUtil;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.RunnableFuture;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.doAnswer;
@@ -90,6 +92,8 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	// use this as default for tests
 	AbstractStateBackend stateBackend = new MemoryStateBackend();
 
+	private final Object checkpointLock;
+
 	/**
 	 * Whether setup() was called on the operator. This is reset when calling close().
 	 */
@@ -113,13 +117,15 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 		this.executionConfig = executionConfig;
 		this.closableRegistry = new ClosableRegistry();
 
+		this.checkpointLock = new Object();
+
 		final Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024, underlyingConfig, executionConfig, MAX_PARALLELISM, 1, 0);
 		mockTask = mock(StreamTask.class);
 		processingTimeService = new TestProcessingTimeService();
 		processingTimeService.setCurrentTime(0);
 
 		when(mockTask.getName()).thenReturn("Mock Task");
-		when(mockTask.getCheckpointLock()).thenReturn(new Object());
+		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
 		when(mockTask.getConfiguration()).thenReturn(config);
 		when(mockTask.getTaskConfiguration()).thenReturn(underlyingConfig);
 		when(mockTask.getEnvironment()).thenReturn(env);
@@ -330,7 +336,9 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	}
 
 	public void setProcessingTime(long time) throws Exception {
-		processingTimeService.setCurrentTime(time);
+		synchronized (checkpointLock) {
+			processingTimeService.setCurrentTime(time);
+		}
 	}
 
 	public void processWatermark(Watermark mark) throws Exception {


Mime
View raw message