beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boyuan Zhang <boyu...@google.com>
Subject Re: Usability regression using SDF Unbounded Source wrapper + DirectRunner
Date Mon, 28 Dec 2020 20:17:23 GMT
Hi Antonio,

Thanks for the data! I want to elaborate more on where the overhead could
come from when on Flink.

-  with --experiments=use_deprecated_read --fasterrCopy=true, I am able to
> achieve 13K TPS


This execution uses UnboundedSource path, where the checkpoint frequency
for source reading is configured as the same frequency of flink checkpoint
interval. In your case, the checkpoint frequency is every 60s.
Flink reschedule the checkpoint marks to process by reading from states.
Thue the overhead here could be the time for executing
source.getCheckpointMark + reading from/writing to state + overhead of
flink checkpoint execution.


> -  with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true,
> I am able to achieve 10K
>

This execution uses Kafka SDF implementation, where the
checkpoint frequency is configutred as every 10000 elements or every 10
seconds by the OutputAndTimeBoundedSplittableProcessElementInvoker. As you
mentioned that every poll takes 0.8s and returns 200 elements. So the
checkpoint frequency here should be every 4s(hitting the 10000 limit).
The residuals can be from runner-issued checkpoint or SDF self-checkpoint.
Flink reshedules the residuals by using Timer and State.
Thus the overhead here could be the time for scheduling timers + reading
from/writing to states. I would expect to see improvements if we control
the frequency longer than 60s(for example, every 60s or every 15000
elements).


> -  with --fasterCopy=true alone, I am only able to achieve 5K TPS


This execution uses UnboundedSourceAsSDFWrapperFn path, where the
checkpoint frequency is also every 10000 elements or every 10
seconds. Flink also reshedules the residuals by using Timer and State. So
the overhead here could be the time for scheduling timers + reading
from/writing to states +  overhead of the wrapper wrapping unbounded source.


On Wed, Dec 23, 2020 at 12:30 PM Jan Lukavsk√Ĺ <je.ik@seznam.cz> wrote:

> OK,
>
> could you make an experiment and increase the parallelism to something
> significantly higher than the total number of partitions? Say 5 times
> higher? Would that have impact on throughput in your case?
>
> Jan
>
> On 12/23/20 7:03 PM, Antonio Si wrote:
> > Hi Jan,
> >
> > The performance data that I reported was run with parallelism = 8. We
> also ran with parallelism = 15 and we observed similar behaviors although I
> don't have the exact numbers. I can get you the numbers if needed.
> >
> > Regarding number of partitions, since we have multiple topics, the
> number of partitions varies from 180 to 12. The highest TPS topic has 180
> partitions, while the lowest TPS topic has 12 partitions.
> >
> > Thanks.
> >
> > Antonio.
> >
> > On 2020/12/23 12:28:42, Jan Lukavsk√Ĺ <je.ik@seznam.cz> wrote:
> >> Hi Antonio,
> >>
> >> can you please clarify a few things:
> >>
> >>    a) what parallelism you use for your sources
> >>
> >>    b) how many partitions there is in your topic(s)
> >>
> >> Thanks,
> >>
> >>    Jan
> >>
> >> On 12/22/20 10:07 PM, Antonio Si wrote:
> >>> Hi Boyuan,
> >>>
> >>> Let me clarify, I have tried with and without using
> --experiments=beam_fn_api,use_sdf_kafka_read option:
> >>>
> >>> -  with --experiments=use_deprecated_read --fasterrCopy=true, I am
> able to achieve 13K TPS
> >>> -  with --experiments="beam_fn_api,use_sdf_kafka_read"
> --fasterCopy=true, I am able to achieve 10K
> >>> -  with --fasterCopy=true alone, I am only able to achieve 5K TPS
> >>>
> >>> In our testcase, we have multiple topics, checkpoint intervals is 60s.
> Some topics have a lot higher traffics than others. We look at the case
> with --experiments="beam_fn_api,use_sdf_kafka_read" --fasterCopy=true
> options a little. Based on our observation, each consumer poll() in
> ReadFromKafkaDoFn.processElement() takes about 0.8ms. So for topic with
> high traffics, it will continue in the loop because every poll() will
> return some records. Every poll returns about 200 records. So, it takes
> about 0.8ms for every 200 records. I am not sure if that is part of the
> reason for the performance.
> >>>
> >>> Thanks.
> >>>
> >>> Antonio.
> >>>
> >>> On 2020/12/21 19:03:19, Boyuan Zhang <boyuanz@google.com> wrote:
> >>>> Hi Antonio,
> >>>>
> >>>> Thanks for the data point. That's very valuable information!
> >>>>
> >>>> I didn't use DirectRunner. I am using FlinkRunner.
> >>>>> We measured the number of Kafka messages that we can processed per
> second.
> >>>>> With Beam v2.26 with --experiments=use_deprecated_read and
> >>>>> --fasterCopy=true,
> >>>>> we are able to consume 13K messages per second, but with Beam v2.26
> >>>>> without the use_deprecated_read option, we are only able to process
> 10K
> >>>>> messages
> >>>>> per second for the same pipeline.
> >>>> We do have SDF implementation of Kafka Read instead of using the
> wrapper.
> >>>> Would you like to have a try to see whether it helps you improve your
> >>>> situation?  You can use --experiments=beam_fn_api,use_sdf_kafka_read
> to
> >>>> switch to the Kafka SDF Read.
> >>>>
> >>>> On Mon, Dec 21, 2020 at 10:54 AM Boyuan Zhang <boyuanz@google.com>
> wrote:
> >>>>
> >>>>> Hi Jan,
> >>>>>> it seems that what we would want is to couple the lifecycle
of the
> Reader
> >>>>>> not with the restriction but with the particular instance of
> >>>>>> (Un)boundedSource (after being split). That could be done in
the
> processing
> >>>>>> DoFn, if it contained a cache mapping instance of the source
to the
> >>>>>> (possibly null - i.e. not yet open) reader. In @NewTracker we
could
> assign
> >>>>>> (or create) the reader to the tracker, as the tracker is created
> for each
> >>>>>> restriction.
> >>>>>>
> >>>>>> WDYT?
> >>>>>>
> >>>>> I was thinking about this but it seems like it is not applicable
to
> the
> >>>>> way how UnboundedSource and UnboundedReader work together.
> >>>>> Please correct me if I'm wrong. The UnboundedReader is created from
> >>>>> UnboundedSource per CheckpointMark[1], which means for certain
> sources, the
> >>>>> CheckpointMark could affect some attributes like start position
of
> the
> >>>>> reader when resuming. So a single UnboundedSource could be mapped
to
> >>>>> multiple readers because of different instances of CheckpointMarl.
> That's
> >>>>> also the reason why we use CheckpointMark as the restriction.
> >>>>>
> >>>>> Please let me know if I misunderstand your suggestion.
> >>>>>
> >>>>> [1]
> >>>>>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L73-L78
> >>>>>
> >>>>> On Mon, Dec 21, 2020 at 9:18 AM Antonio Si <antonio.si@gmail.com>
> wrote:
> >>>>>
> >>>>>> Hi Boyuan,
> >>>>>>
> >>>>>> Sorry for my late reply. I was off for a few days.
> >>>>>>
> >>>>>> I didn't use DirectRunner. I am using FlinkRunner.
> >>>>>>
> >>>>>> We measured the number of Kafka messages that we can processed
per
> second.
> >>>>>> With Beam v2.26 with --experiments=use_deprecated_read and
> >>>>>> --fasterCopy=true,
> >>>>>> we are able to consume 13K messages per second, but with Beam
v2.26
> >>>>>> without the use_deprecated_read option, we are only able to
process
> 10K
> >>>>>> messages
> >>>>>> per second for the same pipeline.
> >>>>>>
> >>>>>> Thanks and regards,
> >>>>>>
> >>>>>> Antonio.
> >>>>>>
> >>>>>> On 2020/12/11 22:19:40, Boyuan Zhang <boyuanz@google.com>
wrote:
> >>>>>>> Hi Antonio,
> >>>>>>>
> >>>>>>> Thanks for the details! Which version of Beam SDK are you
using?
> And are
> >>>>>>> you using --experiments=beam_fn_api with DirectRunner to
launch
> your
> >>>>>>> pipeline?
> >>>>>>>
> >>>>>>> For ReadFromKafkaDoFn.processElement(), it will take a Kafka
> >>>>>>> topic+partition as input element and a KafkaConsumer will
be
> assigned to
> >>>>>>> this topic+partition then poll records continuously. The
Kafka
> consumer
> >>>>>>> will resume reading and return from the process fn when
> >>>>>>>
> >>>>>>>      - There are no available records currently(this is
a feature
> of SDF
> >>>>>>>      which calls SDF self-initiated checkpoint)
> >>>>>>>      - The OutputAndTimeBoundedSplittableProcessElementInvoker
> issues
> >>>>>>>      checkpoint request to ReadFromKafkaDoFn for getting
partial
> results.
> >>>>>> The
> >>>>>>>      checkpoint frequency for DirectRunner is every 100
output
> records or
> >>>>>> every
> >>>>>>>      1 seconds.
> >>>>>>>
> >>>>>>> It seems like either the self-initiated checkpoint or DirectRunner
> >>>>>> issued
> >>>>>>> checkpoint gives you the performance regression since there
is
> overhead
> >>>>>>> when rescheduling residuals. In your case, it's more like
that the
> >>>>>>> checkpoint behavior of
> >>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker
> >>>>>>> gives you 200 elements a batch. I want to understand what
kind of
> >>>>>>> performance regression you are noticing? Is it slower to
output
> the same
> >>>>>>> amount of records?
> >>>>>>>
> >>>>>>> On Fri, Dec 11, 2020 at 1:31 PM Antonio Si <antonio.si@gmail.com>
> >>>>>> wrote:
> >>>>>>>> Hi Boyuan,
> >>>>>>>>
> >>>>>>>> This is Antonio. I reported the KafkaIO.read() performance
issue
> on
> >>>>>> the
> >>>>>>>> slack channel a few days ago.
> >>>>>>>>
> >>>>>>>> I am not sure if this is helpful, but I have been doing
some
> >>>>>> debugging on
> >>>>>>>> the SDK KafkaIO performance issue for our pipeline and
I would
> like to
> >>>>>>>> provide some observations.
> >>>>>>>>
> >>>>>>>> It looks like in my case the ReadFromKafkaDoFn.processElement()
> was
> >>>>>>>> invoked within the same thread and every time kafaconsumer.poll()
> is
> >>>>>>>> called, it returns some records, from 1 up to 200 records.
So, it
> will
> >>>>>>>> proceed to run the pipeline steps. Each kafkaconsumer.poll()
takes
> >>>>>> about
> >>>>>>>> 0.8ms. So, in this case, the polling and running of
the pipeline
> are
> >>>>>>>> executed sequentially within a single thread. So, after
> processing a
> >>>>>> batch
> >>>>>>>> of records, it will need to wait for 0.8ms before it
can process
> the
> >>>>>> next
> >>>>>>>> batch of records again.
> >>>>>>>>
> >>>>>>>> Any suggestions would be appreciated.
> >>>>>>>>
> >>>>>>>> Hope that helps.
> >>>>>>>>
> >>>>>>>> Thanks and regards,
> >>>>>>>>
> >>>>>>>> Antonio.
> >>>>>>>>
> >>>>>>>> On 2020/12/04 19:17:46, Boyuan Zhang <boyuanz@google.com>
wrote:
> >>>>>>>>> Opened https://issues.apache.org/jira/browse/BEAM-11403
for
> >>>>>> tracking.
> >>>>>>>>> On Fri, Dec 4, 2020 at 10:52 AM Boyuan Zhang <boyuanz@google.com
> >
> >>>>>> wrote:
> >>>>>>>>>> Thanks for the pointer, Steve! I'll check it
out. The execution
> >>>>>> paths
> >>>>>>>> for
> >>>>>>>>>> UnboundedSource and SDF wrapper are different.
It's highly
> >>>>>> possible
> >>>>>>>> that
> >>>>>>>>>> the regression either comes from the invocation
path for SDF
> >>>>>> wrapper,
> >>>>>>>> or
> >>>>>>>>>> the implementation of SDF wrapper itself.
> >>>>>>>>>>
> >>>>>>>>>> On Fri, Dec 4, 2020 at 6:33 AM Steve Niemitz
<
> sniemitz@apache.org
> >>>>>>>> wrote:
> >>>>>>>>>>> Coincidentally, someone else in the ASF
slack mentioned [1]
> >>>>>> yesterday
> >>>>>>>>>>> that they were seeing significantly reduced
performance using
> >>>>>>>> KafkaIO.Read
> >>>>>>>>>>> w/ the SDF wrapper vs the unbounded source.
 They mentioned
> they
> >>>>>> were
> >>>>>>>> using
> >>>>>>>>>>> flink 1.9.
> >>>>>>>>>>>
> >>>>>>>>>>> https://the-asf.slack.com/archives/C9H0YNP3P/p1607057900393900
> >>>>>>>>>>>
> >>>>>>>>>>> On Thu, Dec 3, 2020 at 1:56 PM Boyuan Zhang
<
> boyuanz@google.com>
> >>>>>>>> wrote:
> >>>>>>>>>>>> Hi Steve,
> >>>>>>>>>>>>
> >>>>>>>>>>>> I think the major performance regression
comes from
> >>>>>>>>>>>> OutputAndTimeBoundedSplittableProcessElementInvoker[1],
which
> >>>>>> will
> >>>>>>>>>>>> checkpoint the DoFn based on time/output
limit and use
> >>>>>> timers/state
> >>>>>>>> to
> >>>>>>>>>>>> reschedule works.
> >>>>>>>>>>>>
> >>>>>>>>>>>> [1]
> >>>>>>>>>>>>
> >>>>>>
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
> >>>>>>>>>>>> On Thu, Dec 3, 2020 at 9:40 AM Steve
Niemitz <
> >>>>>> sniemitz@apache.org>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> I have a pipeline that reads from
pubsub, does some
> >>>>>> aggregation, and
> >>>>>>>>>>>>> writes to various places.  Previously,
in older versions of
> >>>>>> beam,
> >>>>>>>> when
> >>>>>>>>>>>>> running this in the DirectRunner,
messages would go through
> the
> >>>>>>>> pipeline
> >>>>>>>>>>>>> almost instantly, making it very
easy to debug locally, etc.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> However, after upgrading to beam
2.25, I noticed that it
> could
> >>>>>> take
> >>>>>>>> on
> >>>>>>>>>>>>> the order of 5-10 minutes for messages
to get from the pubsub
> >>>>>> read
> >>>>>>>> step to
> >>>>>>>>>>>>> the next step in the pipeline (deserializing
them, etc).  The
> >>>>>>>> subscription
> >>>>>>>>>>>>> being read from has on the order
of 100,000 elements/sec
> >>>>>> arriving
> >>>>>>>> in it.
> >>>>>>>>>>>>> Setting --experiments=use_deprecated_read
fixes it, and makes
> >>>>>> the
> >>>>>>>>>>>>> pipeline behave as it did before.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> It seems like the SDF implementation
in the DirectRunner here
> >>>>>> is
> >>>>>>>>>>>>> causing some kind of issue, either
buffering a very large
> >>>>>> amount of
> >>>>>>>> data
> >>>>>>>>>>>>> before emitting it in a bundle,
or something else.  Has
> anyone
> >>>>>> else
> >>>>>>>> run
> >>>>>>>>>>>>> into this?
> >>>>>>>>>>>>>
>

Mime
View raw message