flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Elias Levy (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9600) Add DataStream transformation variants that pass timestamp to the user function
Date Sat, 04 Aug 2018 00:35:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16568955#comment-16568955
] 

Elias Levy commented on FLINK-9600:
-----------------------------------

[~aljoscha] I am aware of {{ProcessFunction}}, but I consider it an escape hatch when you
can't perform what you want within the higher level DSL.  The improvement I am suggestion
is within the higher level DSL.

E.g.it is a lot nicer to write:
{code:java}
dataStream.filter( (x, ts) => { isDayTime(ts) } )
{code}
than
{code:java}
class ProcessFilter extends ProcessFunction[T,T] {
  override def processElement(value: T, ctx: Context, out: Collector[T]): Unit ={
    if (isDayTime(ctx.timestamp))
      out.collect(value) }
  }
} 
dataStream.process(new ProcessFilter())
{code}
 

> Add DataStream transformation variants that pass timestamp to the user function
> -------------------------------------------------------------------------------
>
>                 Key: FLINK-9600
>                 URL: https://issues.apache.org/jira/browse/FLINK-9600
>             Project: Flink
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.5.0
>            Reporter: Elias Levy
>            Priority: Minor
>
> It is often necessary to access the timestamp assigned to records within user functions.
 At the moment this is only possible from {{RichFunction}}. Implementing a {{RichFunction}}
just to access the timestamp is burdensome, so most job carry a duplicate of the timestamp
within the record.
> It would be useful if {{DataStream}} provided transformation methods that accepted user
functions that could be passed the record's timestamp as an additional argument, similar to
how there are two variants of {{flatMap}}, one with an extra parameter that gives the user
function access to the output {{Collector}}.
> Along similar lines, it may be useful to have variants that pass the record's key as
an additional parameter.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message