beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shen Li <>
Subject Re: How can runners make use of sink parallelism?
Date Wed, 04 Apr 2018 13:50:25 GMT
Thanks for the explanation!


On Wed, Apr 4, 2018 at 12:38 AM, Eugene Kirpichov <>

> Hi Shen,
> There is no "IO connector API" in Beam (not counting the deprecated Source
> API), IO is merely an informal term for a PTransform that interacts in some
> way with some external storage system. So whatever question you're asking
> about IO connectors, you might as well be asking it about PTransforms in
> general. See
> public/schedule/detail/63696
> To answer your question, then: is it responsibility of a PTransform author
> to make sure their code works correctly when different elements of various
> PCollection's are processed by downstream ParDo's in parallel? Yes, of
> course.
> Things like "writing to a single file" are simply implemented by
> non-parallel code - e.g. GBK the data onto a single key, and write a ParDo
> that takes the single KV<K, Iterable<V>> and writes the Iterable to the
> file. This is, by definition, sequential (modulo windowing/triggering -
> different windows and different firings for the same key can still be
> processed in parallel).
> On Tue, Apr 3, 2018 at 8:56 PM Shen Li <> wrote:
>> Hi Kenn,
>> Thanks for the response.
>> I haven't hit any specific issue yet. I think if the IO connector
>> implementation does take parallelism into consideration, runners can
>> parallelize primitive transforms in the connector (key-partitioned for GBK
>> and stateful ParDo, and round robin for stateless ParDo). For example,
>> TextIO first writes a temp file for every bundle, then uses a void key to
>> prevent parallelism, and then finalizes the result. It should work properly
>> in a distributed environment.
>> But applications can provide any custom IO connectors, and the runner
>> does not know whether a connector can be safely parallelized. Can I assume
>> that it is the applications' responsibility to make sure their IO connector
>> works correctly when running in parallel?
>> Thanks,
>> Shen
>> On Tue, Apr 3, 2018 at 6:11 PM, Kenneth Knowles <> wrote:
>>> The runner should generally not need to be aware of any getNumShard()
>>> API on a connector. The connector itself is probably a composite transform
>>> (with a ParDo or two or three somewhere doing the actual writes) and should
>>> be designed to expose available parallelism. Specifying the number of
>>> shards actually usually limits the parallelism, versus letting the runner
>>> use the maximum allowed parallelism.
>>> If the connector does a GBK to gather input elements into a single
>>> iterable, then it is a single element and cannot be processed in parallel
>>> (except through splittable DoFn, but in that case you may not need to do
>>> the GBK in the first place). And converse to that, if the connector does
>>> not do a GBK to gather input elements, then the runner is permitted to
>>> bundle them any way it wants and process all of them as though in parallel
>>> (except for stateful DoFn, in which case you probably don't need the GBK).
>>> Bundling is an important way that this works, too, since the
>>> @FinishBundle method is really a "flush" method, with @ProcessElement
>>> perhaps buffering up elements to be written to e.g. the same file shard. It
>>> is not this simple in practice but that gives the idea of how even with
>>> unrestricted elementwise parallelism you don't get one shard per element.
>>> These are all just ideas, and I'm not the connector expert. But I think
>>> the TL;DR is that a runner shouldn't need to know this - have you hit
>>> specific issues with a particular connector? That could make this a very
>>> productive discussion.
>>> Kenn
>>> On Mon, Apr 2, 2018 at 1:41 PM Shen Li <> wrote:
>>>> Hi,
>>>> It seems that there is no Sink base class. Some IO connectors (e.g.,
>>>> KafkaIO and TextIO) provide a getNumShard() API. But it is not generally
>>>> available for all existing Beam IO connectors and potential custom ones.
>>>> some IO connectors are implemented using ParDo/GBK, it is unclear whether
>>>> the runner can directly parallelize those transforms (e.g., what if it only
>>>> writes to a single file). Is there a general way for runners to take
>>>> advantage of sink parallelism?
>>>> Thanks,
>>>> Shen

View raw message