beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raghu Angadi <>
Subject Re: Limit the number of DoFn instances per worker?
Date Wed, 18 Oct 2017 06:37:09 GMT
One way you can limit DoFn parallelism is to reshuffle input into fixed
number of shards. If you want to limit it to 32 across 8 workers, you can
reshuffle into 32 shards. In Dataflow, this roughly evenly distribute among
the workers. You can take a look at the this stackoverflow question
the user wanted to increase parallelism.
In Dataflow DoFn parallelism depends not just on cores on the workers.. it
can have much higher parallism if it is after a GroupByKey (depending on
key cardinality)

Regd Autoscaling: if you have small number of records with a lot processing
(order of minutes), it might be hard to trigger upscaling if this skew is
very high. All the pending records might fit in internal queues and the
runner might see zero backlog. Limiting parallelism with reshuffle should
help with this.

On Tue, Oct 17, 2017 at 5:33 PM, Derek Hao Hu <>

> Thanks Rafal and Lukasz! These are great suggestions! One quick question
> about using semaphore though, would it be possible for multiple elements to
> pile up in a particular worker instance, waiting to acquire the semaphore
> but can't? I'll definitely test it though.
> Lukasz, let me try to explain why I feel this autoscaling might not be the
> ideal solution first. I'll definitely contact
> as well but I'll try to give some of my [probably incorrect] thoughts.
> So basically based on my understanding if Beam tries to allocate multiple
> elements to a single machine, let's assume an ideal computational model
> where each single core takes T time to finish processing an element but if
> all 32 cores can be used to process this element then it takes T/32 time.
> Therefore, if we have 32 incoming elements, if Beam allocates 32 threads
> on a worker instance for this DoFn, each element using a single core will
> be finished in T time and therefore there would be no back log during this
> time since all the elements are being processed. But if we can tune the
> parameter to say Beam should allocate fewer elements per worker instance,
> then this creates a backlog and autoscaling might trigger earlier, so
> technically the overall system lag might actually be better?
> I haven't tested this hypothesis yet but basically the above is my
> reasoning.
> Thanks,
> Derek
> On Tue, Oct 17, 2017 at 8:49 AM, Lukasz Cwik <> wrote:
>> The `numberOfWorkerHarnessThreads` is worker wide and not per DoFn.
>> Setting this value to constrain how many threads are executing will impact
>> all parts of your pipeline. One idea is to use a Semaphore as a static
>> object within your DoFn with a fixed number of allowed actors that can
>> enter and execute your Tensorflow.
>> class TfDoFn<X, Y> {
>>   private static final int MAX_TF_ACTORS = 4;
>>   private static final Semaphore semaphore = new Semaphore(MAX_TF_ACTORS,
>> true);
>>   @ProcessElement
>>   public void processElement(X x) {
>>     try {
>>       semaphore.acquire();
>>       // Do TF work
>>     } finally {
>>       semaphore.release();
>>     }
>>   }
>> }
>> This will ensure that your processing each TF item in a more timely
>> manner but it will still mean that there could be many other TF items which
>> are still sitting around waiting for the semaphore to be acquired.
>> As an alternative, I would recommend contacting
>> specifically referencing how you believe
>> autoscaling is not working well for your usecase/pipeline. Also provide a
>> description of your pipeline and some job ids (if possible).
>> On Mon, Oct 16, 2017 at 6:26 PM, Rafal Wojdyla <> wrote:
>>> Hi.
>>> To answer your question: if we limit ourselves to DataflowRunner, you
>>> could use `numberOfWorkerHarnessThreads`. See more here
>>> <>.
>>> That said, I'm not gonna comment whether that is a good remedy for your
>>> actual problem.
>>> - rav
>>> On Mon, Oct 16, 2017 at 8:48 PM, Derek Hao Hu <>
>>> wrote:
>>>> Hi,
>>>> ​Is there an easy way to limit the number of DoFn instances per worker?
>>>> The use case is like this: we are calling TensorFlow in our DoFn and
>>>> each TensorFlow call would automatically try to allocate the available CPU
>>>> resources. So in a streaming pipeline, what I'm seeing is the inference
>>>> time will become longer over time if autoscaling didn't catch up. My
>>>> hypothesis is that Beam is trying to allocate a specific number of elements
>>>> (maybe the number of cores?) on each worker for a particular DoFn and then
>>>> these TensorFlow threads contend for CPU cycles. Therefore, I would like
>>>> know whether it's possible to limit the number of threads a pipeline runner
>>>> can allocate for a DoFn per worker. By doing this, we can ensure we are
>>>> accumulating backlogs in the streaming pipeline earlier and autoscaling
>>>> would probably happen earlier as well.
>>>> Thanks!​
>>>> --
>>>> Derek Hao Hu
>>>> Software Engineer | Snapchat
>>>> Snap Inc.
> --
> Derek Hao Hu
> Software Engineer | Snapchat
> Snap Inc.

View raw message