beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carlos Alonso <>
Subject Re: Chasing OOM errors
Date Wed, 31 Jan 2018 14:59:26 GMT
Hi again Eugene.

Many thanks for your comments!
I've pasted the full stack trace in this paste:

About using the suggested overload of .to() I've been trying to do it
unsuccessfully as I don't think I understand the way it should be used.

Basically TextIO.writeCustomType() forces the DynamicDestinations
implementation to be of <UserT, Void, OutputT> which, in my case would be
<KV[String, WindowedDoc], Void, String>, but I don't know how to provide a
destination(path) for each KV[String, WindowedDoc] through Void, as the
override method getDestination will get one of my KV[String, WindowedDoc]
and output Void and getFilenamePolicy receives Void and outputs a

My idea is that each windowedDoc has information on the window it is
contained and then, all of the elements within the same window are saved on
the same file, named with the start datetime of the window.

Can you please provide an example of how to do it with
TextIO.writeCustomType and a DynamicDestination implementation? I've been
looking through the examples and only the overload of .to() that I'm
originally using is shown:

Thanks again for your help!

On Wed, Jan 31, 2018 at 12:45 AM Eugene Kirpichov <>

> Actually hold on, I'm not sure that what I said is correct. This overload
> of .to() is not my favorite :-| Can you try using the more explicit one,
> with DynamicDestinations - or still better (if you can use Beam 2.3), use
> FileIO.writeDynamic()?
> On Tue, Jan 30, 2018 at 3:32 PM Eugene Kirpichov <>
> wrote:
>> Hmm the out of memory errors are actually happening in WriteFiles, not in
>> your code.
>> When you said that is the full trace, did
>> you mean that this is all you see in the UI? It seems that this is just the
>> top-level exception but it is omitting the nested chain of causes ("Caused
>> by: ..."), and the root cause is the OOM in WriteFiles/WriteShardedBundles.
>> I have a hypothesis as to why they might be happening. You're using a
>> type called WindowedDoc as your destination type - does it have hashCode()
>> and equals() properly defined on it? If no, that could lead to this issue
>> (and also to simply incorrect behavior), because it's used as a key in
>> hashmaps inside that transform. And what is the coder used for that type?
>> On Tue, Jan 30, 2018 at 12:28 PM Carlos Alonso <>
>> wrote:
>>> Sure!:
>>> It would be weird though, as the bigger the buffer, the less OOM errors
>>> I see, but that could totally be something I'm misunderstanding.
>>> Thanks again for your help!!
>>> On Tue, Jan 30, 2018 at 7:32 PM Eugene Kirpichov <>
>>> wrote:
>>>> The dumps look fairly consistent. I suspect that the memory is being
>>>> hogged by data you're buffering in BufferMessagesDoFn, can you show its
>>>> code?
>>>> On Tue, Jan 30, 2018, 10:12 AM Carlos Alonso <>
>>>> wrote:
>>>>> Not all dumps are 1GB, here you can see a couple more of them with
>>>>> bigger heaps
>>>>> A couple of job ids: 2018-01-30_02_57_28-751284895952373783
>>>>> and 2018-01-29_03_09_28-5756483832988685011
>>>>> About relevant parts of my code, here:
>>>>>  you
>>>>> can see the most relevant bits with comments, I hope that is easy to
>>>>> understand, let me know otherwise.
>>>>> Thanks for your help!!
>>>>> [image: Screen Shot 2018-01-30 at 18.27.11.png][image: Screen Shot
>>>>> 2018-01-30 at 18.26.07.png]
>>>>> On Mon, Jan 29, 2018 at 10:53 PM Eugene Kirpichov <
>>>>>> wrote:
>>>>>> Regarding how dynamic writes work: it's considerably more complex
>>>>>> than just using destination as the key; it depends also on how you
>>>>>> configure your sharding, how many destinations there are etc. - see
>>>>>> is probably the second most complex transform in all of Beam, second
>>>>>> to BigQueryIO.write()...).
>>>>>> On Mon, Jan 29, 2018 at 1:50 PM Eugene Kirpichov <
>>>>>>> wrote:
>>>>>>> Hmm it's weird, this heap dump seems to be of size just 1GB which
>>>>>>> way below the limit your workers should have. Are the dumps all
small like
>>>>>>> that?
>>>>>>> Can you share a Dataflow job ID and some relevant part of your
>>>>>>> On Mon, Jan 29, 2018 at 12:02 PM Carlos Alonso <>
>>>>>>> wrote:
>>>>>>>> We're using n1-standard-4 (4 vCPUs, 15 GB memory) machines
>>>>>>>> default ones). And please, find the dominator tree view of
one of our heap
>>>>>>>> dumps.
>>>>>>>> About the code for dynamic writes... Could you quickly summarise
>>>>>>>> what does it do? From what I've dive into the code I think
I saw a reduce
>>>>>>>> by key operation that I guessed uses the file's path as the
key. Is that
>>>>>>>> correct? Does that mean that the more files the more the
work can be
>>>>>>>> parallelised?
>>>>>>>> Thanks!
>>>>>>>> [image: Screen Shot 2018-01-29 at 20.57.15.png]
>>>>>>>> On Mon, Jan 29, 2018 at 8:04 PM Eugene Kirpichov <
>>>>>>>>> wrote:
>>>>>>>>> The code for doing dynamic writes tries to limit its
memory usage,
>>>>>>>>> so it wouldn't be my first place to look (but I wouldn't
rule it out). Are
>>>>>>>>> you using workers with a large number of cores or threads
per worker?
>>>>>>>>> In MAT or YourKit, the most useful tool is the Dominator
Tree. Can
>>>>>>>>> you paste a screenshot of the dominator tree expanded
to some reasonable
>>>>>>>>> depth?
>>>>>>>>> On Mon, Jan 29, 2018, 10:45 AM Carlos Alonso <>
>>>>>>>>> wrote:
>>>>>>>>>> Hi Eugene!
>>>>>>>>>> Thanks for your comments. Yes, we've downloaded a
couple of
>>>>>>>>>> dumps, but TBH, couldn't understand anything (using
the Eclipse MAT), I was
>>>>>>>>>> wondering if the trace and the fact that "the smaller
the buffer, the more
>>>>>>>>>> OOM errors" could give any of you a hint as I think
it may be on the
>>>>>>>>>> writing part...
>>>>>>>>>> Do you know how the dynamic writes are distributed
on workers?
>>>>>>>>>> Based on the full path?
>>>>>>>>>> Regards
>>>>>>>>>> On Mon, Jan 29, 2018 at 7:38 PM Eugene Kirpichov
>>>>>>>>>>> wrote:
>>>>>>>>>>> Hi, I assume you're using the Dataflow runner.
Have you tried
>>>>>>>>>>> using the OOM debugging flags at
>>>>>>>>>>>  ?
>>>>>>>>>>> On Mon, Jan 29, 2018 at 2:05 AM Carlos Alonso
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> Hi everyone!!
>>>>>>>>>>>> On our pipeline we're experiencing OOM errors.
It's been a
>>>>>>>>>>>> while since we've been trying to nail them
down but without any luck.
>>>>>>>>>>>> Our pipeline:
>>>>>>>>>>>> 1. Reads messages from PubSub of different
types (about 50
>>>>>>>>>>>> different types).
>>>>>>>>>>>> 2. Outputs KV elements being the key the
type and the value the
>>>>>>>>>>>> element message itself (JSON)
>>>>>>>>>>>> 3. Applies windowing (fixed windows of one
our. With early and
>>>>>>>>>>>> late firings after one minute after processing
the first element in pane).
>>>>>>>>>>>> Two days allowed lateness and discarding
fired panes.
>>>>>>>>>>>> 4. Buffers the elements (using stateful and
timely processing).
>>>>>>>>>>>> We buffer the elements for 15 minutes or
until it reaches a maximum size of
>>>>>>>>>>>> 16Mb. This step's objective is to avoid window's
panes grow too big.
>>>>>>>>>>>> 5. Writes the outputted elements to files
in GCS using dynamic
>>>>>>>>>>>> routing. Being the route: type/window/buffer_index-paneInfo.
>>>>>>>>>>>> Our fist step was without buffering and just
windows with early
>>>>>>>>>>>> and late firings on 15 minutes, but. guessing
that OOMs were because of
>>>>>>>>>>>> panes growing too big we built that buffering
step to trigger on size as
>>>>>>>>>>>> well.
>>>>>>>>>>>> The full trace we're seeing can be found
>>>>>>>>>>>> There's an annoying thing we've experienced
and is that, the
>>>>>>>>>>>> smaller the buffer size, the more OOM errors
we see which was a bit
>>>>>>>>>>>> disappointing...
>>>>>>>>>>>> Can anyone please give us any hint?
>>>>>>>>>>>> Thanks in advance!

View raw message