beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eugene Kirpichov <>
Subject Re: How can runners make use of sink parallelism?
Date Wed, 04 Apr 2018 04:38:23 GMT
Hi Shen,

There is no "IO connector API" in Beam (not counting the deprecated Source
API), IO is merely an informal term for a PTransform that interacts in some
way with some external storage system. So whatever question you're asking
about IO connectors, you might as well be asking it about PTransforms in
general. See

To answer your question, then: is it responsibility of a PTransform author
to make sure their code works correctly when different elements of various
PCollection's are processed by downstream ParDo's in parallel? Yes, of

Things like "writing to a single file" are simply implemented by
non-parallel code - e.g. GBK the data onto a single key, and write a ParDo
that takes the single KV<K, Iterable<V>> and writes the Iterable to the
file. This is, by definition, sequential (modulo windowing/triggering -
different windows and different firings for the same key can still be
processed in parallel).

On Tue, Apr 3, 2018 at 8:56 PM Shen Li <> wrote:

> Hi Kenn,
> Thanks for the response.
> I haven't hit any specific issue yet. I think if the IO connector
> implementation does take parallelism into consideration, runners can
> parallelize primitive transforms in the connector (key-partitioned for GBK
> and stateful ParDo, and round robin for stateless ParDo). For example,
> TextIO first writes a temp file for every bundle, then uses a void key to
> prevent parallelism, and then finalizes the result. It should work properly
> in a distributed environment.
> But applications can provide any custom IO connectors, and the runner does
> not know whether a connector can be safely parallelized. Can I assume that
> it is the applications' responsibility to make sure their IO connector
> works correctly when running in parallel?
> Thanks,
> Shen
> On Tue, Apr 3, 2018 at 6:11 PM, Kenneth Knowles <> wrote:
>> The runner should generally not need to be aware of any getNumShard() API
>> on a connector. The connector itself is probably a composite transform
>> (with a ParDo or two or three somewhere doing the actual writes) and should
>> be designed to expose available parallelism. Specifying the number of
>> shards actually usually limits the parallelism, versus letting the runner
>> use the maximum allowed parallelism.
>> If the connector does a GBK to gather input elements into a single
>> iterable, then it is a single element and cannot be processed in parallel
>> (except through splittable DoFn, but in that case you may not need to do
>> the GBK in the first place). And converse to that, if the connector does
>> not do a GBK to gather input elements, then the runner is permitted to
>> bundle them any way it wants and process all of them as though in parallel
>> (except for stateful DoFn, in which case you probably don't need the GBK).
>> Bundling is an important way that this works, too, since the
>> @FinishBundle method is really a "flush" method, with @ProcessElement
>> perhaps buffering up elements to be written to e.g. the same file shard. It
>> is not this simple in practice but that gives the idea of how even with
>> unrestricted elementwise parallelism you don't get one shard per element.
>> These are all just ideas, and I'm not the connector expert. But I think
>> the TL;DR is that a runner shouldn't need to know this - have you hit
>> specific issues with a particular connector? That could make this a very
>> productive discussion.
>> Kenn
>> On Mon, Apr 2, 2018 at 1:41 PM Shen Li <> wrote:
>>> Hi,
>>> It seems that there is no Sink base class. Some IO connectors (e.g.,
>>> KafkaIO and TextIO) provide a getNumShard() API. But it is not generally
>>> available for all existing Beam IO connectors and potential custom ones. Although
>>> some IO connectors are implemented using ParDo/GBK, it is unclear whether
>>> the runner can directly parallelize those transforms (e.g., what if it only
>>> writes to a single file). Is there a general way for runners to take
>>> advantage of sink parallelism?
>>> Thanks,
>>> Shen

View raw message