flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods
Date Wed, 28 Feb 2018 13:10:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16380273#comment-16380273
] 

ASF GitHub Bot commented on FLINK-8560:
---------------------------------------

Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5481#discussion_r171234411
  
    --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
---
    @@ -321,19 +324,80 @@ private boolean validateKeyTypeIsHashable(TypeInformation<?>
type) {
     	 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
     	 *
     	 * @return The transformed {@link DataStream}.
    +	 *
    +	 * @deprecated Use {@link KeyedStream#process(KeyedProcessFunction, TypeInformation)}
     	 */
    +	@Deprecated
     	@Override
     	@Internal
     	public <R> SingleOutputStreamOperator<R> process(
     			ProcessFunction<T, R> processFunction,
     			TypeInformation<R> outputType) {
     
    -		KeyedProcessOperator<KEY, T, R> operator =
    -				new KeyedProcessOperator<>(clean(processFunction));
    +		LegacyKeyedProcessOperator<K, T, R> operator = new LegacyKeyedProcessOperator<>(clean(processFunction));
     
     		return transform("Process", outputType, operator);
     	}
     
    +	/**
    +	 * Applies the given {@link KeyedProcessFunction} on the input stream, thereby creating
a transformed output stream.
    +	 *
    +	 * <p>The function will be called for every element in the input streams and can
produce zero
    +	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
    +	 * function, this function can also query the time and set timers. When reacting to
the firing
    +	 * of set timers the function can directly emit elements and/or register yet more timers.
    +	 *
    +	 * @param keyedProcessFunction The {@link KeyedProcessFunction} that is called for each
element in the stream.
    +	 *
    +	 * @param <K> The type of key in {@code KeyedProcessFunction}.
    +	 *
    +	 * @param <R> The type of elements emitted by the {@code KeyedProcessFunction}.
    +	 *
    +	 * @return The transformed {@link DataStream}.
    +	 */
    +	@PublicEvolving
    +	public <K, R> SingleOutputStreamOperator<R> process(KeyedProcessFunction<K,
T, R> keyedProcessFunction) {
    +
    +		TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
    +				keyedProcessFunction,
    +				KeyedProcessFunction.class,
    +				0,
    --- End diff --
    
    The indices here are not `0` and `1` for input and output type, but `1` and `2`. In the
process function it was 0 and 1 because we did not have the key.


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -------------------------------------------------------------------------
>
>                 Key: FLINK-8560
>                 URL: https://issues.apache.org/jira/browse/FLINK-8560
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>    Affects Versions: 1.4.0
>            Reporter: J├╝rgen Thomann
>            Assignee: Bowen Li
>            Priority: Major
>             Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement method to
have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for every element
if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar method.
Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message