kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [DISCUSS]: KIP-159: Introducing Rich functions to Streams
Date Wed, 14 Mar 2018 06:02:03 GMT
Warren,

thanks for following up this KIP. And sorry for the "messy" discussion
thread. Adding this feature is a little tricky. We still hope to get it
into 1.2 release, but atm there is not much progress.

However, for your use case, you can replace .map() with .transform()
that allows you to access the record's timestamp (via the provided
`context` object) as extracted from the TimestampExtractor. See the docs
for more details:
https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#applying-processors-and-transformers-processor-api-integration


-Matthias

On 3/13/18 12:51 PM, Warren, Brad wrote:
> Hi devs,
> 
>  
> 
> It’s a bit difficult to put all of the pieces together regarding the
> status and API changes around the KIPs dealing with exposing the record
> metadata in the Processor and DSL APIs.  This is a feature that my team
> here at American Airlines is keenly interested in and I’d like to
> provide a real world use case to help move the discussion along:
> 
>  
> 
> I have a source topic that contains a text value that includes datetimes
> without a year.  The desire is to order the records in a stream by an
> extracted timestamp from the record value and we plan to use the
> timestamp from the source topic to provide the year.  We’re hoping to
> use the DSL.  Something like:
> 
>  
> 
> val streamOrderedByMyValueTime = Builder.stream(“sourceTopic”).map( K,V
> -> KeyValue(KR, VR, timestamp) )
> 
>  
> 
> so then I can do
> 
>  
> 
> groupBy(), aggregate(), etc.
> 
>  
> 
> Inside the mapper, my timestamp would be something like
> LocalDateTime.of(yearFromIncomingConsumerRecordTimestamp,
> monthFromValue, dayFromValue, ….)
> 
>  
> 
> Looking at the wiki here
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=73637757,
> what is the proposed implementation of RichValueMapper?  Is it going to
> support what I want to do here?
> 
>  
> 
>  
> 
> Thanks,
> 
> Brad
> 
>  
> 
> cid:49F8CA06-65F7-457B-9DC0-8251F696295B
> 
>  
> 
> *Brad Warren***
> 
> /Principal Application Architect/
> 
> /Airport Technology///
> 
>  
> 
> brad.warren@aa.com
> 
>  
> 
> cid:DB82A805-2411-4411-8D3D-3688F7234324
> 
>  
> 
>  
> 
> 
> 
>  
> 
> NOTICE: This email and any attachments are for the exclusive and
> confidential use of the intended recipient(s). If you are not an
> intended recipient, please do not read, distribute, or take action in
> reliance upon this message. If you have received this in error, please
> notify me immediately by return email and promptly delete this message
> and its attachments from your computer.
> 


Mime
View raw message