beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kaymak, Tobias" <>
Subject Re: Experience with KafkaIO -> BigQueryIO
Date Wed, 07 Nov 2018 13:04:00 GMT
On Tue, Nov 6, 2018 at 6:58 PM Raghu Angadi <> wrote:

> You seem to be reading from multiple topics and your timestamp policy is
> too simplistic (withTimestampFn() was never meant to be public API, I am
> sending a PR to deprecate it first and then will make it package private).
> So if you have two topics of different sizes, smaller topic might be
> consumed really fast pushing your watermark way ahead. This might or might
> be happening, but this is one of the dangers of using record timestamp for
> watermark (we should never do that).
To clarify: Test was done consuming from a single topic. I am using a field
inside the element's JSON to get the element's timestamp. Data in a topic
go way back to let's say 2017, but that data was pushed to Kafka in one go
the timestamp when it arrived is for example wednesday last week. Sometimes
producing side does not set the element's timestamp for Kafka (since it's
a library that does not support that yet), so it has to be parsed.

I could also not fiddle with the timestamp at all and let the system decide
then in the BigQuery.IO partitioning step parse it and assign it to a
partition. Is this better?

> On Tue, Nov 6, 2018 at 3:44 AM Kaymak, Tobias <>
> wrote:
>> Hi,
>> I am sharing my experience with you after trying to use the following
>> pipeline
>> logic (with Beam 2.6.0 - running on Flink 1.5):
>> 1. Reading from KafkaIO, attaching a timestamp from each parsed element
>> 2. Filtering bad records
>> 3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch jobs)
>> every 15 minutes
>> I had a working pipeline that does not write to BigQuery directly, but to
>> Cloud Storage, so it's 3rd step was
>> 3. Writing files to GCS in daily "subdirectories"
>> I tried to rewrite the pipeline to reduce complexity: Resetting it's state
>> should no longer be tied to thinking about what to delete on GCS, also
>> configurable refresh times directly from within the Pipeline was
>> something I
>> was looking for. The thing that I needed to change was the output in the
>> end,
>> so knew my parsing logic would not change and that should reduce the
>> risk.
>> I tested the pipeline within our testcluster and it looked promising.
>> When I
>> deployed it last week everything seemed to go smoothly. On Friday I
>> noticed
>> that I had holes in the data: in the BigQuery tables there were missing
>> days
>> (tricky was that the recent days looked fine). (To be sure I reset the
>> pipeline
>> and read from the very beginning of each topic from Kafka. Within
>> different
>> runs, different days were missing.) I spent the weekend rolling back the
>> changes and trying to figure out what was going on.
>> I didn't see any error in the logs (the log level was on WARNING for most
>> parts), but I thought, well maybe it's because there are too many
>> partitions
>> and BigQuery has a limit of 1000 partition operations per day. So I
>> started
>> reading from just 90 days in the past, but I still had holes (whole days).
>> I had a windowing step that I needed for the GCS pipeline, I became aware
>> that I
>> wouldn't need this anymore with BigQueryIO so I commented it out and
>> tested
>> again, without luck.
>> What struck me was that the Flink Cluster didn't do any checkpoints for
>> the
>> pipeline that was using BigQueryIO - it does so when writing to GCS and I
>> tested it's failure logic there. Additionally the graph in Flink with
>> BigQueryIO becomes very complex, but this is something I expected.
>> Here is the Pipeline code with the commented out windowing part:
>>   pipeline
>>         .apply(
>>             KafkaIO.<String, String>read()
>>                 .withBootstrapServers(bootstrap)
>>                 .withTopics(topics)
>>                 .withKeyDeserializer(StringDeserializer.class)
>>                 .withValueDeserializer(ConfigurableDeserializer.class)
>>                 .updateConsumerProperties(
>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>> inputMessagesConfig))
>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
>>                 .updateConsumerProperties(ImmutableMap.of("",
>> "di-beam-consumers"))
>> .updateConsumerProperties(ImmutableMap.of("", "true"))
>>                 .withTimestampPolicyFactory(
>>                     TimestampPolicyFactory.withTimestampFn(
>>                         new
>> MessageTimestampExtractor(inputMessagesConfig)))
>>                 .withReadCommitted()
>>                 .commitOffsetsInFinalize())
>>         .apply(ParDo.of(new ToEventFn()))
>>         //        .apply(
>>         //            Window.into(new ZurichTimePartitioningWindowFn())
>>         //                .triggering(
>>         //                    Repeatedly.forever(
>>         //                        AfterFirst.of(
>>         //
>> AfterPane.elementCountAtLeast(bundleSize),
>>         //
>> AfterProcessingTime.pastFirstElementInPane()
>>         //
>> .plusDelayOf(refreshFrequency))))
>>         //                .withAllowedLateness(Duration.standardDays(1))
>>         //                .discardingFiredPanes())
>>         .apply(
>>             BigQueryIO.<Event>write()
>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>                 .withTriggeringFrequency(refreshFrequency)
>>                 .withNumFileShards(1)
>>                 .to(partitionedTableDynamicDestinations)
>>                 .withFormatFunction(
>>                     (SerializableFunction<Event, TableRow>)
>>                         KafkaToBigQuery::convertUserEventToTableRow)
>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>> I have the feeling that I must make some serious and dumb mistakes as I
>> know
>> the Beam framework is very robust. Thanks for taking the time to read
>> this.
>> Tobi

Tobias Kaymak
Data Engineer

View raw message