flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7552) Extend SinkFunction interface with SinkContext
Date Fri, 01 Sep 2017 16:27:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16150789#comment-16150789
] 

ASF GitHub Bot commented on FLINK-7552:
---------------------------------------

Github user EronWright commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136614448
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SinkFunction.java
---
    @@ -35,6 +35,39 @@
     	 *
     	 * @param value The input record.
     	 * @throws Exception
    +	 * @deprecated Use {@link #invoke(SinkContext, Object)}.
     	 */
    -	void invoke(IN value) throws Exception;
    +	@Deprecated
    +	default void invoke(IN value) throws Exception {
    +	}
    +
    +	/**
    +	 * Writes the given value to the sink. This function is called for every record.
    +	 *
    +	 * @param context Additional context about the input record.
    +	 * @param value The input record.
    +	 * @throws Exception
    +	 */
    +	default void invoke(SinkContext context, IN value) throws Exception {
    --- End diff --
    
    Consider re-ordering the two arguments to more closely match the similar signature in
`ProcessFunction`.
    
    ```
    default void invoke(IN value, SinkContext ctx) throws Exception
    ```


> Extend SinkFunction interface with SinkContext
> ----------------------------------------------
>
>                 Key: FLINK-7552
>                 URL: https://issues.apache.org/jira/browse/FLINK-7552
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>             Fix For: 1.4.0
>
>
> Now that we require Java 8 we can extend the {{SinkFunction}} interface without breaking
backwards compatibility. I'm proposing this:
> {code}
> /**
>  * Interface for implementing user defined sink functionality.
>  *
>  * @param <IN> Input type parameter.
>  */
> @Public
> public interface SinkFunction<IN> extends Function, Serializable {
> 	/**
> 	 * Function for standard sink behaviour. This function is called for every record.
> 	 *
> 	 * @param value The input record.
> 	 * @throws Exception
> 	 * @deprecated Use {@link #invoke(SinkContext, Object)}.
> 	 */
> 	@Deprecated
> 	default void invoke(IN value) throws Exception {
> 	}
> 	/**
> 	 * Writes the given value to the sink. This function is called for every record.
> 	 *
> 	 * @param context Additional context about the input record.
> 	 * @param value The input record.
> 	 * @throws Exception
> 	 */
> 	default void invoke(SinkContext context, IN value) throws Exception {
> 		invoke(value);
> 	}
> 	/**
> 	 * Context that {@link SinkFunction SinkFunctions } can use for getting additional data
about
> 	 * an input record.
> 	 *
> 	 * @param <T> The type of elements accepted by the sink.
> 	 */
> 	@Public // Interface might be extended in the future with additional methods.
> 	interface SinkContext<T> {
> 		/**
> 		 * Returns the timestamp of the current input record.
> 		 */
> 		long timestamp();
> 	}
> }
> {code}
> For now, this only allows access to the element timestamp. This would allow us to fix
the abomination that is {{FlinkKafkaProducer010}}, which is a hybrid {{SinkFunction}}/{{StreamOperator}}
only because it needs access to timestamps.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message