beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukasz Cwik <lc...@google.com>
Subject Re: Multimap PCollectionViews' values udpated rather than appended
Date Mon, 11 Jun 2018 20:19:28 GMT
Thanks for the snippet, updated BEAM-4470 with the additional details.

On Mon, Jun 11, 2018 at 10:56 AM Carlos Alonso <carlos@mrcalonso.com> wrote:

> Many thanks for your help. Actually, my use case emits the entire map
> everytime, so I guess I'm good to go with discarding mode.
>
> This test reproduces the issue:
> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala#L19-L53
>
> Hope it helps
>
> On Mon, Jun 4, 2018 at 9:04 PM Lukasz Cwik <lcwik@google.com> wrote:
>
>> Carlos, can you provide a test/code snippet for the bug that shows the
>> issue?
>>
>> On Mon, Jun 4, 2018 at 11:57 AM Lukasz Cwik <lcwik@google.com> wrote:
>>
>>> +dev@beam.apache.org
>>> Note that this is likely a bug in the DirectRunner for accumulation
>>> mode, filed: https://issues.apache.org/jira/browse/BEAM-4470
>>>
>>> Discarding mode is meant to always be the latest firing, the issue
>>> though is that you need to emit the entire map every time. If you can do
>>> this, then it makes sense to use discarding mode. The issue with discarding
>>> mode is that if your first trigger firing produces (A, 1), (B, 1) and your
>>> second firing produces (B, 2), the multimap will only contain (B, 2) and
>>> (A, 1) will have been discarded.
>>>
>>> To my knowledge, there is no guarantee about the order in which the
>>> values are combined. You will need to use some piece of information about
>>> the element to figure out which is the latest (or encode some additional
>>> information along with each element to make this easy).
>>>
>>> On Thu, May 31, 2018 at 9:16 AM Carlos Alonso <carlos@mrcalonso.com>
>>> wrote:
>>>
>>>> I've improved the example a little and added some tests
>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/test/scala/com/mrcalonso/RefreshingSideInput2Test.scala
>>>>
>>>> The behaviour is slightly different, which is possibly because of the
>>>> different runners (Dataflow/Direct) implementations, but still not working.
>>>>
>>>> Now what happens is that although the internal PCollection gets
>>>> updated, the view isn't. This is happening regardless of the accumulation
>>>> mode.
>>>>
>>>> Regarding the accumulation mode on Dataflow... That was it!! Now the
>>>> sets contain all the items, however, one more question, is the ordering
>>>> within the set deterministic? (i.e: Can I assume that the latest will
>>>> always be on the last position of the Iterable object?)
>>>>
>>>> Also... given that for my particular case I only want the latest
>>>> version, would you advice me to go ahead with Discarding mode?
>>>>
>>>> Regards
>>>>
>>>> On Thu, May 31, 2018 at 4:44 PM Lukasz Cwik <lcwik@google.com> wrote:
>>>>
>>>>> The trigger definition in the sample code you have is using discarding
>>>>> firing mode. Try swapping to using accumulating mode.
>>>>>
>>>>>
>>>>> On Thu, May 31, 2018 at 1:42 AM Carlos Alonso <carlos@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> But I think what I'm experiencing is quite different. Basically the
>>>>>> side input is updated, but only one element is found on the Iterable
that
>>>>>> is the value of any key of the multimap.
>>>>>>
>>>>>> I mean, no concatenation seems to be happening. On the linked thread,
>>>>>> Kenn suggests that every firing will add the new value to the set
of values
>>>>>> for the emitted key, but what I'm experiencing is that the new value
is
>>>>>> there, but just itself (i.e: is the only element in the set).
>>>>>>
>>>>>> @Robert, I'm using
>>>>>> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())
>>>>>>
>>>>>> On Wed, May 30, 2018 at 7:46 PM Lukasz Cwik <lcwik@google.com>
wrote:
>>>>>>
>>>>>>> An alternative to the thread that Kenn linked (adding support
for
>>>>>>> retractions) is to add explicit support for combiners into side
inputs. The
>>>>>>> system currently works by using a hardcoded concatenating combiner,
so
>>>>>>> maps, lists, iterables, singletons, multimaps all work by concatenating
the
>>>>>>> set of values emitted and then turning it into a view which is
why it is an
>>>>>>> error for a singleton and map view if the trigger fires multiple
times.
>>>>>>>
>>>>>>> On Wed, May 30, 2018 at 10:01 AM Kenneth Knowles <klk@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes, this is a known issue. Here's a prior discussion:
>>>>>>>> https://lists.apache.org/thread.html/e9518f5d5f4bcf7bab02de2cb9fe1bd5293d87aa12d46de1eac4600b@%3Cuser.beam.apache.org%3E
>>>>>>>>
>>>>>>>> It is actually long-standing and the solution is known but
hard.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, May 30, 2018 at 9:48 AM Carlos Alonso <carlos@mrcalonso.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi everyone!!
>>>>>>>>>
>>>>>>>>> Working with multimap based side inputs on the global
window I'm
>>>>>>>>> experiencing something unexpected (at least to me) that
I'd like to share
>>>>>>>>> with you to clarify.
>>>>>>>>>
>>>>>>>>> The way I understand multimaps is that when one emits
two values
>>>>>>>>> for the same key for the same window (obvious thing here
as I'm working on
>>>>>>>>> the Global one), the newly emitted values are appended
to the Iterable
>>>>>>>>> collection that is the value for that particular key
on the map.
>>>>>>>>>
>>>>>>>>> Testing it in this job (it is using scio, but side inputs
are
>>>>>>>>> implemented with PCollectionViews):
>>>>>>>>> https://github.com/calonso/beam_experiments/blob/master/refreshingsideinput/src/main/scala/com/mrcalonso/RefreshingSideInput2.scala
>>>>>>>>>
>>>>>>>>> The steps to reproduce are:
>>>>>>>>> 1. Create one table on the target BQ
>>>>>>>>> 2. Run the job
>>>>>>>>> 3. Patch the table on BQ (add one field), this should
generate a
>>>>>>>>> new TableSchema for the corresponding TableReference
>>>>>>>>> 4. An updated value of the fields number appear on the
logs, but
>>>>>>>>> there is only one element within the iterable, as if
it had been updated
>>>>>>>>> instead of appended!!
>>>>>>>>>
>>>>>>>>> Is that the expected behaviour? Is a bug? Am I missing
something?
>>>>>>>>>
>>>>>>>>> Thanks!
>>>>>>>>>
>>>>>>>>

Mime
View raw message