flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/2] flink git commit: [FLINK-7552] Extend SinkFunction interface with SinkContext
Date Thu, 21 Sep 2017 11:47:00 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9b0ba7ba3 -> 6886f638d


[FLINK-7552] Extend SinkFunction interface with SinkContext


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e7996b0d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e7996b0d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e7996b0d

Branch: refs/heads/master
Commit: e7996b0d0ff5fe705a3830f3855f977cad4f0c44
Parents: 9b0ba7b
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Aug 29 15:50:56 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Sep 21 13:46:17 2017 +0200

----------------------------------------------------------------------
 .../cassandra/CassandraConnectorITCase.java     |   3 +-
 .../kafka/FlinkKafkaProducerBaseTest.java       |   3 +-
 .../connectors/rabbitmq/common/RMQSinkTest.java |   7 +-
 .../api/functions/sink/RichSinkFunction.java    |   3 -
 .../api/functions/sink/SinkContextUtil.java     |  56 +++++++++
 .../api/functions/sink/SinkFunction.java        |  48 +++++++-
 .../streaming/api/operators/StreamSink.java     |  60 +++++++++-
 .../api/functions/PrintSinkFunctionTest.java    |  13 ++-
 .../functions/sink/SocketClientSinkTest.java    |   8 +-
 .../api/operators/StreamSinkOperatorTest.java   | 117 +++++++++++++++++++
 pom.xml                                         |   2 +
 11 files changed, 300 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index bc5d1a8..f52a42c 100644
--- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -36,6 +36,7 @@ import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
 
 import com.datastax.driver.core.Cluster;
@@ -459,7 +460,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri
 
 		sink.open(new Configuration());
 		for (scala.Tuple3<String, Integer, Integer> value : scalaTupleCollection) {
-			sink.invoke(value);
+			sink.invoke(value, SinkContextUtil.forTimestamp(0));
 		}
 		sink.close();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
index 08c5f01..6b4b6ff 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
 import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig;
@@ -117,7 +118,7 @@ public class FlinkKafkaProducerBaseTest {
 		producer.open(new Configuration());
 		verify(mockPartitioner, times(1)).open(0, 1);
 
-		producer.invoke("foobar");
+		producer.invoke("foobar", SinkContextUtil.forTimestamp(0));
 		verify(mockPartitioner, times(1)).partition(
 			"foobar", null, "foobar".getBytes(), DummyFlinkKafkaProducer.DUMMY_TOPIC, new int[] {0,
1, 2, 3});
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
index 540a7ba..4fb6097 100644
--- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
+++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQSinkTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.rabbitmq.common;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 
@@ -91,7 +92,7 @@ public class RMQSinkTest {
 	public void invokePublishBytesToQueue() throws Exception {
 		RMQSink<String> rmqSink = createRMQSink();
 
-		rmqSink.invoke(MESSAGE_STR);
+		rmqSink.invoke(MESSAGE_STR, SinkContextUtil.forTimestamp(0));
 		verify(serializationSchema).serialize(MESSAGE_STR);
 		verify(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
 	}
@@ -101,7 +102,7 @@ public class RMQSinkTest {
 		RMQSink<String> rmqSink = createRMQSink();
 
 		doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
-		rmqSink.invoke("msg");
+		rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
 	}
 
 	@Test
@@ -110,7 +111,7 @@ public class RMQSinkTest {
 		rmqSink.setLogFailuresOnly(true);
 
 		doThrow(IOException.class).when(channel).basicPublish("", QUEUE_NAME, null, MESSAGE);
-		rmqSink.invoke("msg");
+		rmqSink.invoke("msg", SinkContextUtil.forTimestamp(0));
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
index 64c38b9..66a0c93 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/RichSinkFunction.java
@@ -27,7 +27,4 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
 public abstract class RichSinkFunction<IN> extends AbstractRichFunction implements
SinkFunction<IN> {
 
 	private static final long serialVersionUID = 1L;
-
-	public abstract void invoke(IN value) throws Exception;
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
new file mode 100644
index 0000000..2749560
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkContextUtil.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.sink;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Utility for creating Sink {@link SinkFunction.Context Contexts}.
+ */
+@Internal
+public class SinkContextUtil {
+
+	/**
+	 * Creates a {@link SinkFunction.Context} that
+	 * throws an exception when trying to access the current watermark or processing time.
+	 */
+	public static <T> SinkFunction.Context<T> forTimestamp(long timestamp) {
+		return new SinkFunction.Context<T>() {
+			@Override
+			public long currentProcessingTime() {
+				throw new RuntimeException("Not implemented");
+			}
+
+			@Override
+			public long currentWatermark() {
+				throw new RuntimeException("Not implemented");
+			}
+
+			@Override
+			public long timestamp() {
+				return timestamp;
+			}
+
+			@Override
+			public boolean hasTimestamp() {
+				return true;
+			}
+		};
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
index cb9e11d..15a77c4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
@@ -35,6 +35,52 @@ public interface SinkFunction<IN> extends Function, Serializable
{
 	 *
 	 * @param value The input record.
 	 * @throws Exception
+	 * @deprecated Use {@link #invoke(Object, Context)}.
 	 */
-	void invoke(IN value) throws Exception;
+	@Deprecated
+	default void invoke(IN value) throws Exception {
+	}
+
+	/**
+	 * Writes the given value to the sink. This function is called for every record.
+	 *
+	 * @param value The input record.
+	 * @param context Additional context about the input record.
+	 * @throws Exception
+	 */
+	default void invoke(IN value, Context context) throws Exception {
+		invoke(value);
+	}
+
+	/**
+	 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data
about
+	 * an input record.
+	 *
+	 * <p>The context is only valid for the duration of a
+	 * {@link SinkFunction#invoke(Object, Context)} call. Do not store the context and use
+	 * afterwards!
+	 *
+	 * @param <T> The type of elements accepted by the sink.
+	 */
+	@Public // Interface might be extended in the future with additional methods.
+	interface Context<T> {
+
+		/** Returns the current processing time. */
+		long currentProcessingTime();
+
+		/** Returns the current event-time watermark. */
+		long currentWatermark();
+
+		/**
+		 * Returns the timestamp of the current input record.
+		 */
+		long timestamp();
+
+		/**
+		 * Checks whether this record has a timestamp.
+		 *
+		 * @return True if the record has a timestamp, false if not.
+		 */
+		boolean hasTimestamp();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index d92d789..f4b09af 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -19,8 +19,10 @@ package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 /**
  * A {@link StreamOperator} for executing {@link SinkFunction SinkFunctions}.
@@ -31,14 +33,27 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object,
SinkFuncti
 
 	private static final long serialVersionUID = 1L;
 
+	private transient SimpleContext sinkContext;
+
+	/** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
+	private long currentWatermark = Long.MIN_VALUE;
+
 	public StreamSink(SinkFunction<IN> sinkFunction) {
 		super(sinkFunction);
 		chainingStrategy = ChainingStrategy.ALWAYS;
 	}
 
 	@Override
+	public void open() throws Exception {
+		super.open();
+
+		this.sinkContext = new SimpleContext<>(getProcessingTimeService());
+	}
+
+	@Override
 	public void processElement(StreamRecord<IN> element) throws Exception {
-		userFunction.invoke(element.getValue());
+		sinkContext.element = element;
+		userFunction.invoke(element.getValue(), sinkContext);
 	}
 
 	@Override
@@ -48,4 +63,47 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object,
SinkFuncti
 
 		// sinks don't forward latency markers
 	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		super.processWatermark(mark);
+		this.currentWatermark = mark.getTimestamp();
+	}
+
+	private class SimpleContext<IN> implements SinkFunction.Context<IN> {
+
+		private StreamRecord<IN> element;
+
+		private final ProcessingTimeService processingTimeService;
+
+		public SimpleContext(ProcessingTimeService processingTimeService) {
+			this.processingTimeService = processingTimeService;
+		}
+
+		@Override
+		public long currentProcessingTime() {
+			return processingTimeService.getCurrentProcessingTime();
+		}
+
+		@Override
+		public long currentWatermark() {
+			return currentWatermark;
+		}
+
+		@Override
+		public long timestamp() {
+			if (!element.hasTimestamp()) {
+				throw new IllegalStateException(
+					"Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " +
+							"did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
+
+			}
+			return element.getTimestamp();
+		}
+
+		public boolean hasTimestamp() {
+			return element.hasTimestamp();
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
index 9e7ecdd..8c303d1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/PrintSinkFunctionTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 
 import org.junit.After;
@@ -40,7 +41,7 @@ public class PrintSinkFunctionTest {
 	private String line = System.lineSeparator();
 
 	@Test
-	public void testPrintSinkStdOut(){
+	public void testPrintSinkStdOut() throws Exception {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		PrintStream stream = new PrintStream(baos);
 		System.setOut(stream);
@@ -55,7 +56,7 @@ public class PrintSinkFunctionTest {
 			Assert.fail();
 		}
 		printSink.setTargetToStandardOut();
-		printSink.invoke("hello world!");
+		printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0));
 
 		assertEquals("Print to System.out", printSink.toString());
 		assertEquals("hello world!" + line, baos.toString());
@@ -65,7 +66,7 @@ public class PrintSinkFunctionTest {
 	}
 
 	@Test
-	public void testPrintSinkStdErr(){
+	public void testPrintSinkStdErr() throws Exception {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		PrintStream stream = new PrintStream(baos);
 		System.setOut(stream);
@@ -80,7 +81,7 @@ public class PrintSinkFunctionTest {
 			Assert.fail();
 		}
 		printSink.setTargetToStandardErr();
-		printSink.invoke("hello world!");
+		printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0));
 
 		assertEquals("Print to System.err", printSink.toString());
 		assertEquals("hello world!" + line, baos.toString());
@@ -90,7 +91,7 @@ public class PrintSinkFunctionTest {
 	}
 
 	@Test
-	public void testPrintSinkWithPrefix(){
+	public void testPrintSinkWithPrefix() throws Exception {
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
 		PrintStream stream = new PrintStream(baos);
 		System.setOut(stream);
@@ -107,7 +108,7 @@ public class PrintSinkFunctionTest {
 			Assert.fail();
 		}
 		printSink.setTargetToStandardErr();
-		printSink.invoke("hello world!");
+		printSink.invoke("hello world!", SinkContextUtil.forTimestamp(0));
 
 		assertEquals("Print to System.err", printSink.toString());
 		assertEquals("2> hello world!" + line, baos.toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
index 63e83d2..6cdce11 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -74,7 +74,7 @@ public class SocketClientSinkTest extends TestLogger {
 				try {
 					SocketClientSink<String> simpleSink = new SocketClientSink<>(host, port,
simpleSchema, 0);
 					simpleSink.open(new Configuration());
-					simpleSink.invoke(TEST_MESSAGE + '\n');
+					simpleSink.invoke(TEST_MESSAGE + '\n', SinkContextUtil.forTimestamp(0));
 					simpleSink.close();
 				}
 				catch (Throwable t) {
@@ -117,7 +117,7 @@ public class SocketClientSinkTest extends TestLogger {
 			public void run() {
 				try {
 					// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the
server is CLOSE_WAIT
-					simpleSink.invoke(TEST_MESSAGE + '\n');
+					simpleSink.invoke(TEST_MESSAGE + '\n', SinkContextUtil.forTimestamp(0));
 				}
 				catch (Throwable t) {
 					error.set(t);
@@ -182,7 +182,7 @@ public class SocketClientSinkTest extends TestLogger {
 				// socket should be closed, so this should trigger a re-try
 				// need two messages here: send a fin to cancel the client state:FIN_WAIT_2 while the
server is CLOSE_WAIT
 				while (true) { // we have to do this more often as the server side closed is not guaranteed
to be noticed immediately
-					simpleSink.invoke(TEST_MESSAGE + '\n');
+					simpleSink.invoke(TEST_MESSAGE + '\n', SinkContextUtil.forTimestamp(0));
 				}
 			}
 			catch (IOException e) {
@@ -238,7 +238,7 @@ public class SocketClientSinkTest extends TestLogger {
 
 			// Initial payload => this will be received by the server an then the socket will be
 			// closed.
-			sink.invoke("0\n");
+			sink.invoke("0\n", SinkContextUtil.forTimestamp(0));
 
 			// Get future an make sure there was no problem. This will rethrow any Exceptions from
 			// the server.

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
new file mode 100644
index 0000000..500a52a
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamSinkOperatorTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link StreamSink}.
+ */
+public class StreamSinkOperatorTest extends TestLogger {
+
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
+	/**
+	 * Verify that we can correctly query watermark, processing time and the timestamp from
the
+	 * context.
+	 */
+	@Test
+	public void testTimeQuerying() throws Exception {
+
+		BufferingQueryingSink<String> bufferingSink = new BufferingQueryingSink<>();
+
+		StreamSink<String> operator = new StreamSink<>(bufferingSink);
+
+		OneInputStreamOperatorTestHarness<String, Object> testHarness =
+				new OneInputStreamOperatorTestHarness<>(operator);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(17));
+		testHarness.setProcessingTime(12);
+		testHarness.processElement(new StreamRecord<>("Hello", 12L));
+
+		testHarness.processWatermark(new Watermark(42));
+		testHarness.setProcessingTime(15);
+		testHarness.processElement(new StreamRecord<>("Ciao", 13L));
+
+		testHarness.processWatermark(new Watermark(42));
+		testHarness.setProcessingTime(15);
+		testHarness.processElement(new StreamRecord<>("Ciao"));
+
+		assertThat(bufferingSink.data.size(), is(3));
+
+		assertThat(bufferingSink.data,
+			contains(
+				new Tuple4<>(17L, 12L, 12L, "Hello"),
+				new Tuple4<>(42L, 15L, 13L, "Ciao"),
+				new Tuple4<>(42L, 15L, null, "Ciao")));
+
+		testHarness.close();
+	}
+
+	private static class BufferingQueryingSink<T> implements SinkFunction<T> {
+
+		// watermark, processing-time, timestamp, event
+		private final List<Tuple4<Long, Long, Long, T>> data;
+
+		public BufferingQueryingSink() {
+			data = new ArrayList<>();
+		}
+
+		@Override
+		public void invoke(
+			T value, Context context) throws Exception {
+			if (context.hasTimestamp()) {
+				data.add(
+					new Tuple4<>(
+						context.currentWatermark(),
+						context.currentProcessingTime(),
+						context.timestamp(),
+						value));
+			} else {
+				data.add(
+					new Tuple4<>(
+						context.currentWatermark(),
+						context.currentProcessingTime(),
+						null,
+						value));
+
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e7996b0d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 561ec79..229e93d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1457,6 +1457,8 @@ under the License.
 							<excludes>
 								<exclude>@org.apache.flink.annotation.PublicEvolving</exclude>
 								<exclude>@org.apache.flink.annotation.Internal</exclude>
+								<exclude>org.apache.flink.streaming.api.functions.sink.RichSinkFunction#invoke(java.lang.Object)</exclude>
+								<exclude>org.apache.flink.streaming.api.functions.sink.SinkFunction</exclude>
 							</excludes>
 							<accessModifier>public</accessModifier>
 							<breakBuildOnModifications>false</breakBuildOnModifications>


Mime
View raw message