beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kaymak, Tobias" <tobias.kay...@ricardo.ch>
Subject Re: Experience with KafkaIO -> BigQueryIO
Date Thu, 08 Nov 2018 08:36:53 GMT
On Wed, Nov 7, 2018 at 6:49 PM Raghu Angadi <rangadi@google.com> wrote:

>
> On Wed, Nov 7, 2018 at 5:04 AM Kaymak, Tobias <tobias.kaymak@ricardo.ch>
> wrote:
>
>>
>> On Tue, Nov 6, 2018 at 6:58 PM Raghu Angadi <rangadi@google.com> wrote:
>>
>>> You seem to be reading from multiple topics and your timestamp policy is
>>> too simplistic (withTimestampFn() was never meant to be public API, I am
>>> sending a PR to deprecate it first and then will make it package private).
>>> So if you have two topics of different sizes, smaller topic might be
>>> consumed really fast pushing your watermark way ahead. This might or might
>>> be happening, but this is one of the dangers of using record timestamp for
>>> watermark (we should never do that).
>>>
>>>
>> To clarify: Test was done consuming from a single topic. I am using a
>> field
>> inside the element's JSON to get the element's timestamp. Data in a topic
>> can
>> go way back to let's say 2017, but that data was pushed to Kafka in one
>> go and
>> the timestamp when it arrived is for example wednesday last week.
>> Sometimes the
>> producing side does not set the element's timestamp for Kafka (since it's
>> using
>> a library that does not support that yet), so it has to be parsed.
>>
>
> That is fine. We can ignore the timestamp as possible suspect for
> debugging this. Using custom timestamps from records is normal.
>
>
Could you clarify of what you meant with "withTimestampFn() was never meant
to
be public"? I am using it to attach the right timestamp to an element to be
able to window into days with the local time zone in the windowing
function. If
this is not used in the correct way could you tell me how I can do it
better?

After the rollback I am busy making the existing pipeline to GCS so robust
that
it never fails to deliver all files so that there is always a backup. As I
am
under a lot of pressure right now I don't want to fuck it up with
easy-to-avoid
mistakes and the GCS pipeline has the same logic, but just a different sink
that uses a FileIO to write out different days to different "folders".

Thank you,

Tobi



> Raghu.
>
>
>> I could also not fiddle with the timestamp at all and let the system
>> decide and
>> then in the BigQuery.IO partitioning step parse it and assign it to a
>> partition. Is this better?
>>
>>
>>
>>> On Tue, Nov 6, 2018 at 3:44 AM Kaymak, Tobias <tobias.kaymak@ricardo.ch>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am sharing my experience with you after trying to use the following
>>>> pipeline
>>>> logic (with Beam 2.6.0 - running on Flink 1.5):
>>>>
>>>> 1. Reading from KafkaIO, attaching a timestamp from each parsed element
>>>> 2. Filtering bad records
>>>> 3. Writing to a partitioned table in BigQuery with FILE_LOADS (batch
>>>> jobs)
>>>> every 15 minutes
>>>>
>>>> I had a working pipeline that does not write to BigQuery directly, but
>>>> to
>>>> Cloud Storage, so it's 3rd step was
>>>>
>>>> 3. Writing files to GCS in daily "subdirectories"
>>>>
>>>> I tried to rewrite the pipeline to reduce complexity: Resetting it's
>>>> state
>>>> should no longer be tied to thinking about what to delete on GCS, also
>>>> configurable refresh times directly from within the Pipeline was
>>>> something I
>>>> was looking for. The thing that I needed to change was the output in
>>>> the end,
>>>> so knew my parsing logic would not change and that should reduce the
>>>> risk.
>>>>
>>>> I tested the pipeline within our testcluster and it looked promising.
>>>> When I
>>>> deployed it last week everything seemed to go smoothly. On Friday I
>>>> noticed
>>>> that I had holes in the data: in the BigQuery tables there were missing
>>>> days
>>>> (tricky was that the recent days looked fine). (To be sure I reset the
>>>> pipeline
>>>> and read from the very beginning of each topic from Kafka. Within
>>>> different
>>>> runs, different days were missing.) I spent the weekend rolling back the
>>>> changes and trying to figure out what was going on.
>>>>
>>>> I didn't see any error in the logs (the log level was on WARNING for
>>>> most
>>>> parts), but I thought, well maybe it's because there are too many
>>>> partitions
>>>> and BigQuery has a limit of 1000 partition operations per day. So I
>>>> started
>>>> reading from just 90 days in the past, but I still had holes (whole
>>>> days).
>>>>
>>>> I had a windowing step that I needed for the GCS pipeline, I became
>>>> aware that I
>>>> wouldn't need this anymore with BigQueryIO so I commented it out and
>>>> tested
>>>> again, without luck.
>>>>
>>>> What struck me was that the Flink Cluster didn't do any checkpoints for
>>>> the
>>>> pipeline that was using BigQueryIO - it does so when writing to GCS and
>>>> I
>>>> tested it's failure logic there. Additionally the graph in Flink with
>>>> BigQueryIO becomes very complex, but this is something I expected.
>>>>
>>>> Here is the Pipeline code with the commented out windowing part:
>>>>
>>>>   pipeline
>>>>         .apply(
>>>>             KafkaIO.<String, String>read()
>>>>                 .withBootstrapServers(bootstrap)
>>>>                 .withTopics(topics)
>>>>                 .withKeyDeserializer(StringDeserializer.class)
>>>>                 .withValueDeserializer(ConfigurableDeserializer.class)
>>>>                 .updateConsumerProperties(
>>>>
>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>> inputMessagesConfig))
>>>>
>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", "earliest"))
>>>>                 .updateConsumerProperties(ImmutableMap.of("group.id",
>>>> "di-beam-consumers"))
>>>>
>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
>>>>                 .withTimestampPolicyFactory(
>>>>                     TimestampPolicyFactory.withTimestampFn(
>>>>                         new
>>>> MessageTimestampExtractor(inputMessagesConfig)))
>>>>                 .withReadCommitted()
>>>>                 .commitOffsetsInFinalize())
>>>>         .apply(ParDo.of(new ToEventFn()))
>>>>         //        .apply(
>>>>         //            Window.into(new ZurichTimePartitioningWindowFn())
>>>>         //                .triggering(
>>>>         //                    Repeatedly.forever(
>>>>         //                        AfterFirst.of(
>>>>         //
>>>> AfterPane.elementCountAtLeast(bundleSize),
>>>>         //
>>>> AfterProcessingTime.pastFirstElementInPane()
>>>>         //
>>>> .plusDelayOf(refreshFrequency))))
>>>>         //                .withAllowedLateness(Duration.standardDays(1))
>>>>         //                .discardingFiredPanes())
>>>>         .apply(
>>>>             BigQueryIO.<Event>write()
>>>>                 .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
>>>>                 .withTriggeringFrequency(refreshFrequency)
>>>>                 .withNumFileShards(1)
>>>>                 .to(partitionedTableDynamicDestinations)
>>>>                 .withFormatFunction(
>>>>                     (SerializableFunction<Event, TableRow>)
>>>>                         KafkaToBigQuery::convertUserEventToTableRow)
>>>>
>>>> .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
>>>>
>>>> .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
>>>>
>>>>
>>>> I have the feeling that I must make some serious and dumb mistakes as I
>>>> know
>>>> the Beam framework is very robust. Thanks for taking the time to read
>>>> this.
>>>>
>>>> Tobi
>>>>
>>>
>>
>> --
>> Tobias Kaymak
>> Data Engineer
>>
>> tobias.kaymak@ricardo.ch
>> www.ricardo.ch
>>
>

Mime
View raw message