beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukasz Cwik <lc...@google.com>
Subject Re: How to partition a stream by key before writing with FileBasedSink?
Date Tue, 06 Jun 2017 21:01:58 GMT
Based upon your descriptions, it seemed like you wanted limited parallelism
because of an external dependency.

Your best bet would be to use the global window combined with a
StatefulDoFn. See this blog post (
https://beam.apache.org/blog/2017/02/13/stateful-processing.html) about the
StatefulDoFn.

You will not be able to use a different window function till after the
StatefulDoFn otherwise a GroupByKey may schedule your work on a different
machine since the windows for a key may differ.

Source -> StatefulDoFn -> Window.into(my other window type)

All our sources currently operate within the global window until a
Window.into happens. So there is no need to do Source ->
Window.into(GlobalWindow) -> StatefulDoFn -> Window.into(my other window
type)


On Tue, Jun 6, 2017 at 12:03 PM, <jofo90@gmail.com> wrote:

> Hmm ok, I don't quite get why what I want to do isn't supported in Beam
> ... I don't actually have a limited parallelism requirement, I just want to
> be able to partition my unbounded stream by a key determined from the
> elements, so that any two elements with the same key will be routed to the
> same worker. I want to do this because my DoFn keeps some in-memory cached
> state for each key (which I was planning to store at either DoFn or JVM
> level). Does this sound like a bad idea?
>
>
> On 6 Jun 2017, at 19:14, Lukasz Cwik <lcwik@google.com> wrote:
>
> Your right, the window acts as a secondary key within GroupByKey
> (KeyA,Window1 != KeyA,Window2), which means that each of those two
> composite keys can be scheduled to execute at the same time.
>
> At this point I think you should challenge your limited parallelism
> requirement as you'll need to build something outside of Apache Beam to
> provide these parallelization limits across windows (e.g. lock within the
> same process when limiting yourself to a single machine, distributed lock
> service when dealing with multiple machines).
>
> The backlog of data is either going to grow infinitely at the GroupByKey
> or grow infinitely at the source if your pipeline can't keep up. It is up
> to the Runner to be smart and not produce a giant backlog at the GroupByKey
> since it knows how fast work is being completed (unfortunately I don't know
> if any Runner is this smart yet to push the backlog up to the source).
>
> On Tue, Jun 6, 2017 at 11:03 AM, Josh <jofo90@gmail.com> wrote:
>
>> I see, thanks for the tips!
>>
>> Last question about this! How could this be adapted to work in a
>> unbounded/streaming job? To work in an unbounded job, I need to put a
>> Window.into with a trigger before GroupByKey.
>> I guess this would mean that the "shard gets processed by a single thread
>> in MyDofn" guarantee will only apply to messages within a single window,
>> and would not apply across windows?
>> If this is the case, is there a better solution? I would like to avoid
>> buffering data in windows, and want the shard guarantee to apply across
>> windows.
>>
>>
>>
>> On Tue, Jun 6, 2017 at 5:42 PM, Lukasz Cwik <lcwik@google.com> wrote:
>>
>>> Your code looks like what I was describing. My only comment would be to
>>> use a deterministic hashing function which is stable across JVM versions
>>> and JVM instances as it will help in making your pipeline consistent across
>>> different runs/environments.
>>>
>>> Parallelizing across 8 instances instead of 4 would break the contract
>>> around GroupByKey (since it didn't group all the elements for a key
>>> correctly). Also, each element is the smallest unit of work and
>>> specifically in your pipeline you have chosen to reduce all your elements
>>> into 4 logical elements (each containing some proportion of your original
>>> data).
>>>
>>> On Tue, Jun 6, 2017 at 9:37 AM, Josh <jofo90@gmail.com> wrote:
>>>
>>>> Thanks for the reply, Lukasz.
>>>>
>>>>
>>>> What I meant was that I want to shard my data by a "shard key", and be
>>>> sure that any two elements with the same "shard key" are processed by the
>>>> same thread on the same worker. (Or if that's not possible, by the same
>>>> worker JVM with no thread guarantee would be good enough). It doesn't
>>>> actually matter to me whether there's 1 or 4 or 100 DoFn instances
>>>> processing the data.
>>>>
>>>>
>>>> It sounds like what you suggested will work for this, with the downside
>>>> of me needing to choose a number of shards/DoFns (e.g. 4).
>>>>
>>>> It seems a bit long and messy but am I right in thinking it would look
>>>> like this? ...
>>>>
>>>>
>>>> PCollection<MyElement> elements = ...;
>>>>
>>>> elements
>>>>
>>>> .apply(MapElements
>>>>
>>>> .into(TypeDescriptors.kvs(TypeDescriptors.integers(),
>>>> TypeDescriptor.of(MyElement.class)))
>>>>
>>>> .via((MyElement e) -> KV.of(
>>>>
>>>> e.getKey().toString().hashCode() % 4, e)))
>>>>
>>>> .apply(GroupByKey.create())
>>>>
>>>> .apply(Partition.of(4,
>>>>
>>>> (Partition.PartitionFn<KV<Integer, Iterable<MyElement>>>)
(kv, i) ->
>>>> kv.getKey()))
>>>>
>>>> .apply(ParDo.of(new MyDofn()));
>>>>
>>>> // Where MyDofn must be changed to handle a KV<Integer,
>>>> Iterable<MyElement>> as input instead of just a MyElement
>>>>
>>>>
>>>> I was wondering is there a guarantee that the runner won't parallelise
>>>> the final MyDofn across e.g. 8 instances instead of 4? If there are two
>>>> input elements with the same key are they actually guaranteed to be
>>>> processed on the same instance?
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Josh
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Jun 6, 2017 at 4:51 PM, Lukasz Cwik <lcwik@google.com> wrote:
>>>>
>>>>> I think this is what your asking for but your statement about 4
>>>>> instances is unclear as to whether that is 4 copies of the same DoFn
or 4
>>>>> completely different DoFns. Also its unclear what you mean by
>>>>> instance/thread, I'm assuming that you want at most 4 instances of a
DoFn
>>>>> each being processed by a single thread.
>>>>>
>>>>> This is a bad idea because you limit your parallelism but this is
>>>>> similar to what the default file sharding logic does. In Apache Beam
the
>>>>> smallest unit of output for a GroupByKey is a single key+iterable pair.
We
>>>>> exploit this by assigning all our values to a fixed number of keys and
then
>>>>> performing a GroupByKey. This is the same trick that powers the file
>>>>> sharding logic in AvroIO/TextIO/...
>>>>>
>>>>> Your pipeline would look like (fixed width font diagram):
>>>>> your data      -> apply shard key       -> GroupByKey        ->
>>>>> partition by key -> your dofn #1
>>>>>
>>>>>            \> your dofn #2
>>>>>
>>>>>            \> ...
>>>>> a  / b / c / d -> 1,a / 2,b / 1,c / 2,d -> 1,[a,c] / 2,[b,d] ->
???
>>>>>
>>>>> This is not exactly the same as processing a single DoFn
>>>>> instance/thread because it relies on the Runner to be able to schedule
each
>>>>> key to be processed on a different machine. For example a Runner may
choose
>>>>> to process value 1,[a,c] and 2,[b,d] sequentially on the same machine
or
>>>>> may choose to distribute them.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Jun 6, 2017 at 8:13 AM, Josh <jofo90@gmail.com> wrote:
>>>>>
>>>>>> Hey Lukasz,
>>>>>>
>>>>>> I have a follow up question about this -
>>>>>>
>>>>>> What if I want to do something very similar, but instead of with
4
>>>>>> instances of AvroIO following the partition transform, I want 4 instances
>>>>>> of a DoFn that I've written. I want to ensure that each partition
is
>>>>>> processed by a single DoFn instance/thread. Is this possible with
Beam?
>>>>>>
>>>>>> Thanks,
>>>>>> Josh
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, May 24, 2017 at 6:15 PM, Josh <jofo90@gmail.com> wrote:
>>>>>>
>>>>>>> Ahh I see - Ok I'll try out this solution then. Thanks Lukasz!
>>>>>>>
>>>>>>> On Wed, May 24, 2017 at 5:20 PM, Lukasz Cwik <lcwik@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Google Cloud Dataflow won't override your setting. The dynamic
>>>>>>>> sharding occurs if you don't explicitly set a numShard value.
>>>>>>>>
>>>>>>>> On Wed, May 24, 2017 at 9:14 AM, Josh <jofo90@gmail.com>
wrote:
>>>>>>>>
>>>>>>>>> Hi Lukasz,
>>>>>>>>>
>>>>>>>>> Thanks for the example. That sounds like a nice solution
-
>>>>>>>>> I am running on Dataflow though, which dynamically sets
numShards
>>>>>>>>> - so if I set numShards to 1 on each of those AvroIO
writers, I can't be
>>>>>>>>> sure that Dataflow isn't going to override my setting
right? I guess this
>>>>>>>>> should work fine as long as I partition my stream into
a large enough
>>>>>>>>> number of partitions so that Dataflow won't override
numShards.
>>>>>>>>>
>>>>>>>>> Josh
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, May 24, 2017 at 4:10 PM, Lukasz Cwik <lcwik@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Since your using a small number of shards, add a
Partition
>>>>>>>>>> transform which uses a deterministic hash of the
key to choose one of 4
>>>>>>>>>> partitions. Write each partition with a single shard.
>>>>>>>>>>
>>>>>>>>>> (Fixed width diagram below)
>>>>>>>>>> Pipeline -> AvroIO(numShards = 4)
>>>>>>>>>> Becomes:
>>>>>>>>>> Pipeline -> Partition --> AvroIO(numShards
= 1)
>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>                       |-> AvroIO(numShards = 1)
>>>>>>>>>>                       \-> AvroIO(numShards = 1)
>>>>>>>>>>
>>>>>>>>>> On Wed, May 24, 2017 at 1:05 AM, Josh <jofo90@gmail.com>
wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I am using a FileBasedSink (AvroIO.write) on
an unbounded stream
>>>>>>>>>>> (withWindowedWrites, hourly windows, numShards=4).
>>>>>>>>>>>
>>>>>>>>>>> I would like to partition the stream by some
key in the element,
>>>>>>>>>>> so that all elements with the same key will get
processed by the same shard
>>>>>>>>>>> writer, and therefore written to the same file.
Is there a way to do this?
>>>>>>>>>>> Note that in my stream the number of keys is
very large (most elements have
>>>>>>>>>>> a unique key, while a few elements share a key).
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Josh
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message