beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Raghu Angadi (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (BEAM-591) Better handling watermark in KafkaIO
Date Thu, 25 Aug 2016 22:04:22 GMT

     [ https://issues.apache.org/jira/browse/BEAM-591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Raghu Angadi updated BEAM-591:
------------------------------
    Description: 
Right now default watermark in KafkaIO is same as timestamp of the record. The main problem
with this is that watermark does not change if there n't any new records on the topic. This
can hold up many open windows. 

The record timestamp by default is set to processing time (i.e. when the runner reads a record
from Kafka reader).

A user can provide functions to calculate watermark and record timestamps. There are a few
concerns with current design:

* What should happen when a kafka topic is idle:
  ** in default case, I think watermark should advance to current time.
  ** What should happen when user has provided a function to calculate record timestamp? 
   *** Should the watermark stay same as record timestamp?
   *** same when user has provided own watermark function? 
* Are the current semantics of user provided watermark function correct?
  ** -it is run once for each record read-.
  ** -Should it instead be run inside {{getWatermark()}} called by the runner (we could still
provide the last user record, and its timestamp)-.
  ** It does run inside {{getWatermark()}}. should we pass current record timestamp in addition
to the record?
 


  was:
Right now default watermark in KafkaIO is same as timestamp of the record. The main problem
with this is that watermark does not change if there n't any new records on the topic. This
can hold up many open windows. 

The record timestamp by default is set to processing time (i.e. when the runner reads a record
from Kafka reader).

A user can provide functions to calculate watermark and record timestamps. There are a few
concerns with current design:

* What should happen when a kafka topic is idle:
  ** in default case, I think watermark should advance to current time.
  ** What should happen when user has provided a function to calculate record timestamp? 
   *** Should the watermark stay same as record timestamp?
   *** same when user has provided own watermark function? 
* Are the current semantics of user provided watermark function correct?
  ** it is run once for each record read.
  ** Should it instead be run inside {{getWatermark()}} called by the runner (we could still
provide the last user record, and its timestamp).
 



> Better handling watermark in KafkaIO
> ------------------------------------
>
>                 Key: BEAM-591
>                 URL: https://issues.apache.org/jira/browse/BEAM-591
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>            Reporter: Raghu Angadi
>            Assignee: Raghu Angadi
>
> Right now default watermark in KafkaIO is same as timestamp of the record. The main problem
with this is that watermark does not change if there n't any new records on the topic. This
can hold up many open windows. 
> The record timestamp by default is set to processing time (i.e. when the runner reads
a record from Kafka reader).
> A user can provide functions to calculate watermark and record timestamps. There are
a few concerns with current design:
> * What should happen when a kafka topic is idle:
>   ** in default case, I think watermark should advance to current time.
>   ** What should happen when user has provided a function to calculate record timestamp?

>    *** Should the watermark stay same as record timestamp?
>    *** same when user has provided own watermark function? 
> * Are the current semantics of user provided watermark function correct?
>   ** -it is run once for each record read-.
>   ** -Should it instead be run inside {{getWatermark()}} called by the runner (we could
still provide the last user record, and its timestamp)-.
>   ** It does run inside {{getWatermark()}}. should we pass current record timestamp in
addition to the record?
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message