beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tóth Andor <andor.t...@centralmediacsoport.hu>
Subject RE: Window bounded to unbounded PCollection
Date Fri, 24 Aug 2018 18:46:53 GMT
I can deal with being less efficient if it’s more transparent :). I want to achieve the latter,
unbounded scenario. How can it be made?

--
Bests,
Andor

Feladó: Lukasz Cwik [mailto:lcwik@google.com]
Küldve: Friday, August 24, 2018 19:58
Címzett: user@beam.apache.org
Tárgy: Re: Window bounded to unbounded PCollection

In a bounded pipeline "GroupByKey waits for all data to arrive, and no output files appear
until the end." is expected. Runners like Dataflow, Spark, ... process the transform tree
in topological order. In your pipeline, this would mean that all of the data is read and written
to the group by key. Once that is complete, all of the data is read from the group by key
and then written to the output files. This is done because it is very efficient.

If you treat this as an unbounded pipeline, then all transforms are running in parallel and
partial progress is continuously made but this is significantly less efficient then the above.

On Fri, Aug 24, 2018 at 12:39 AM Tóth Andor <andor.toth@centralmediacsoport.hu<mailto:andor.toth@centralmediacsoport.hu>>
wrote:
Yes, I did. The same happens as currently with windowing. GroupByKey waits for all data to
arrive, and no output files appear until the end.
Maybe it’s not a memory issue and Beam sorts this out with temporary files. But if there’s
a problem in thousands of files and such amount of data, then I won’t be able to fix and
resume, already preprocessed data in temporary files are lost. Also I cannot really see the
progress.

Meanwhile I had to realize, that there’s something with the AddTimestampDoFn, because if
on a small sample I collect the output of it into a list, then those are not TimestampedValue
objects, therefore windowing also could not occur. I suppose that’s because the PCollection
I’m reading from is not unbounded, therefore timestamp is thrown away. It may sound nonsense
if you know Beam well, but that’s my best guess now.

I have found a possible workaround, as to push file contents into PubSub, then read data from
there with Beam/DataFlow. Dataflow even has a template for that called “GCS Text to Cloud
PubSub”. Though, I can’t believe that there’s no simpler and more elegant way to solve
this.

--
Bests,
Andor

Feladó: Lukasz Cwik [mailto:lcwik@google.com<mailto:lcwik@google.com>]
Küldve: Friday, August 24, 2018 00:10
Címzett: user@beam.apache.org<mailto:user@beam.apache.org>
Tárgy: Re: Window bounded to unbounded PCollection

It is unclear what the purpose of windowing is since windowing doesn't impact memory utilization
much on Google Dataflow.

The Direct runner uses memory proportional to the amount of data processed but the Google
Dataflow runner does not. The Google Dataflow runner's memory usage is proportional to the
size of your elements. You can reach out to Google Cloud Platform support with some job ids
or additional questions if you want.

Have you tried a pipeline like (without windowing):

lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input)
results = lines \
| 'groupbykey' >> beam.GroupByKey()\
| 'parse' >>
| beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows',
| 'invalid_rows', 'missing_recipients', main='main_valid')
output = results['main_valid'] \
        | 'format' >> beam.Map(output_format) \
        | 'write' >> beam.io.WriteToText(known_args.output, file_name_suffix=".gz")


On Thu, Aug 23, 2018 at 5:32 AM Tóth Andor <andor.toth@centralmediacsoport.hu<mailto:andor.toth@centralmediacsoport.hu>>
wrote:
Hello,

I'm trying to process a few terrabytes of mail logs without using too much memory, by windowing
the bounded source into an unbounded one.
Still, the GroupByKey waits for all data to arrive. Can you give me hints how to work this
around?

I have already searched, read available manuals and documentation, but I still don't have
a clue.
Neither Direct, nor Google Dataflow runner works.

I'm using Python. Every item gets a timestamp, and then they are sorted to sliding windows,
by the following code:

lines = pipeline | 'read' >> beam.io.ReadFromText(known_args.input)

window_trigger = trigger.AfterWatermark() sliding_window = beam.window.SlidingWindows(size=3600+600,
period=3600) windowed_lines = lines \
| 'timestamp' >> beam.ParDo(AddTimestampDoFn())\ 'window' >>
| beam.WindowInto(sliding_window, trigger=window_trigger,
| accumulation_mode=trigger.AccumulationMode.DISCARDING)

results = windowed_lines \
| 'groupbykey' >> beam.GroupByKey()\
| 'parse' >>
| beam.ParDo(ParseSendingsDoFn()).with_outputs('too_few_rows',
| 'invalid_rows', 'missing_recipients', main='main_valid')

output = results['main_valid'] \
        | 'format' >> beam.Map(output_format)\
        | 'write' >> beam.io.WriteToText(known_args.output, file_name_suffix=".gz")

--
Bests,
Andor
Mime
View raw message