beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Event time processing with Flink runner and Kafka source
Date Fri, 08 Jul 2016 07:10:02 GMT
Hi,
I'm afraid there is no option for Kafka 0.8 right now. The API changed
quite a bit between 0.8 and 0.9 and the old API is somewhat cumbersome to
program against. If there is a strong need for that someone could maybe
whip up something based on the 0.9 KafkaIO.

Regarding UnboundedFlinkSource: I would strongly suggest not to use this
since it is not well integrated with Beam and you cannot to proper
event-time windowing. Each runner has a set of custom sources that only
work with that specific runner because the selection of Beam-native sources
was a bit sparse in the beginning. Now might be a good time to get rid of
the special sources (for all runners). They make it impossible to run a
Pipeline on any runner, which is one of the main ideas behind Beam, IMHO.

Cheers,
Aljoscha

On Fri, 8 Jul 2016 at 01:56 David Desberg <david.desberg@uber.com> wrote:

> I see. Are there any options for Kafka 0.8? Thanks for the heads up.
>
> On Jul 7, 2016, at 4:54 PM, Raghu Angadi <rangadi@google.com> wrote:
>
> David,
>
> note that KafkaIO in Beam requires Kafka server version should be >= 0.9
>
> On Thu, Jul 7, 2016 at 4:27 PM, David Desberg <david.desberg@uber.com>
> wrote:
>
>> Dan,
>>
>> Yeah, it’s setting it to the ingestion time. I will look into KafkaIO, as
>> it looks to provide exactly the functionality I want. I was wondering how
>> to set the timestamp correctly, at the source. Thank you for your help!
>>
>> David
>>
>> On Jul 7, 2016, at 4:25 PM, Dan Halperin <dhalperi@google.com> wrote:
>>
>> Hi David,
>>
>> In Beam pipelines, the event time is initially set on the source.
>> Downstream code can make an event *later* just fine, but, making it
>> *earlier* might move it before the current watermark. This would effective
>> tur data that we believe is on-time into late data, and would in general be
>> very bad! Allowed lateness is a feature that lets you move data earlier by
>> a fixed amount, so if you have a tight bound on the time set by the source,
>> this can sometimes help. But it's generally discouraged in favor of proper
>> timestamps in the first place.
>>
>> My guess is that UnboundedFlinkSource is using the *processing time*, aka
>> current time when the element is received, rather than any event time
>> provided by the element. It might be possible using that source to provide
>> the element time.
>>
>> Alternately, I think you should be using KafkaIO and setting the event
>> time there using withTimestampFn:
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136
>>
>> This way the elements will come into the system from Kafka with good
>> timestamps, and you don't need a downstream DoFn to transport them back in
>> time.
>>
>> Thanks,
>> Dan
>>
>> On Thu, Jul 7, 2016 at 4:15 PM, amir bahmanyari <amirtousa@yahoo.com>
>> wrote:
>>
>>> Hi David,
>>> I am doing pretty much the same thing  using Beam KafkaIO.
>>> For the simple thing I am doing, its working as expected.
>>> Can you provide the code how you are invoking/receiving from Kafka pls?
>>> Cheers
>>>
>>>
>>> ------------------------------
>>> *From:* David Desberg <david.desberg@uber.com>
>>> *To:* user@beam.incubator.apache.org
>>> *Sent:* Thursday, July 7, 2016 12:54 PM
>>> *Subject:* Event time processing with Flink runner and Kafka source
>>>
>>> Hi all,
>>>
>>> I’m struggling to get a basic Beam application setup, windowed based
>>> upon event time. I’m reading from an UnboundedFlinkSource of a
>>> FlinkKafkaConsumer to begin my pipeline. To set up event time processing, I
>>> applied a DoFn transformation (via ParDo) that calls
>>> ProcessContext.outputWithTimestamp using a timestamp extracted from each
>>> Kafka message. However, this results in an exception telling me to
>>> override getAllowedTimestampSkew, since evidently the messages are already
>>> timestamped and I am moving these timestamps back in time, but only
>>> shifting to the future is allowed. getAllowedTimestampSkew, however, is
>>> deprecated, and if I do override it and allow skew, the windowing I am
>>> applying later in the pipeline fails. I decided to backtrack and look at
>>> how the timestamps are even being assigned initially, since the Flink
>>> source has no concept of the structure of my messages and thus shouldn’t
>>> know how to assign any time at all. I found that it turns out that the
>>> pipeline runner marks each incoming message with ingestion time, in a
>>> manner that cannot be overridden/is not configurable (see
>>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273
>>> )
>>>
>>> Why is this the case? Since part of the point of Beam is to allow
>>> event-time processing, I’m sure I’m missing something here. How can I
>>> correctly ingest message from Kafka and stamp them with event time, rather
>>> than ingestion time?
>>>
>>>
>>>
>>
>>
>
>

Mime
View raw message