beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carlos Alonso <car...@mrcalonso.com>
Subject Re: Chasing OOM errors
Date Wed, 07 Feb 2018 10:26:19 GMT
Thanks Eugene!

I'll try those options and will let you know but I'd also like to know
about the implications of them:
- What could happen if I reduce the number of threads? It will lose
"parallelisation capacity" and thus more likely to need more workers, right?
- What could happen if I reduce the GCS upload buffer size bytes? It will
need to send more "packets" to complete an upload, right?

Aside from those I have a couple of outstanding questions:
- From the .by() comments it says that specifies how to partition elements
into groups (destinations). How does it exactly works? by using the
hashCode of the object returned?
- Were the previous versions of the job (using 2.2.0) likely to be losing
data due to the OOM errors?

Thanks again for all your help. Really appreciate it!

On Wed, Feb 7, 2018 at 1:37 AM Eugene Kirpichov <kirpichov@google.com>
wrote:

> Ugh. The logging in 2.4.0-SNAPSHOT helped me identify the issue, and it is
> more mundane than expected.
>
> TL;DR Your job seems to be running fine, and it's not losing any data. You
> can simply do nothing. There was only a handful of OOM errors, they were
> transient, and were later successfully retried.
>
> However, if you want to not worry about any OOMs, your easy workarounds
> are any of the following:
> - Specify --numberOfWorkerHarnessThreads=(something not larger than 100 or
> so - by default in streaming worker it is limited to a few hundred)
> - Specify --gcsUploadBufferSizeBytes=1048576 or something like that (as
> opposed to the default value of 8x that)
> - Use a worker with more memory (I think any machine type larger than the
> one you're using will do)
>
> The issue is that the Dataflow streaming worker by default (currently - do
> NOT rely on these values to stay fixed) uses up to 300 threads, and about
> 5.7GB of memory (out of the 15GB of n1-standard-4 - the rest is reserved
> for various purposes); your code (e.g. storing the stateful DoFn state)
> uses some part of that. When the worker receives data for many destinations
> at the same time, it processes them all at the same time with all 300
> threads, each of which uses 1 8Mb upload buffer for GCS = 2.4GB which is
> not that much but pushes the worker over the limit.
>
> The "proper" fixes for that on our side could be e.g. throttling number of
> streaming worker threads based on memory pressure (which is surprisingly
> difficult to do given Java's GC) - but I'm not sure it's worth it right now
> given that your code is effectively running successfully.
>
> On Tue, Feb 6, 2018 at 2:14 PM Carlos Alonso <carlos@mrcalonso.com> wrote:
>
>> Hi Eugene.
>>
>> I've updated to 2.4.0-SNAPSHOT because I think 2.3.0-SNAPSHOT's
>> FileNaming is unusable as this commit:
>> https://github.com/apache/beam/commit/ece8709526cfb484aaf90c1508b0c1ea99186235 is
>> missing.
>>
>> This is the new code I've developed to use FileIO.writeDynamic API:
>> https://gist.github.com/calonso/9a708df6f2cbf48dd1a1fb67f2f571a3
>>
>> Unfortunately OOM errors are still around :( Here you can see a full
>> stack trace: https://pastebin.com/YndmFjCb. Although, it is weird I
>> cannot find the errors on StackDriver logs (I got that stack trace from the
>> Dataflow UI. Job ID: 2018-02-06_10_11_02-10621326282488809928)
>>
>> Please, find the Dominator tree of the heap dump below:
>>
>> There's one thing I'd like to know. From the .by() comments it says that
>> specifies how to partition elements into groups (destinations). How does it
>> exactly works? by using the hashCode of the object returned?
>>
>> Thanks!
>>
>>
>>
>> [image: Screen Shot 2018-02-06 at 23.08.42.png]
>>
>>
>> On Mon, Feb 5, 2018 at 10:36 PM Eugene Kirpichov <kirpichov@google.com>
>> wrote:
>>
>>> Strongly recommend to move to FileIO.writeDynamic; it's a better API in
>>> many ways (though underlying implementation is currently the same).
>>> I think you can use either 2.3.0-SNAPSHOT or 2.4.0-SNAPSHOT - right now
>>> (in the middle of 2.3.0 release being created) they should be almost the
>>> same anyway (the former is the last snapshot built from master before 2.3.0
>>> tag was created, the latter is the last snapshot built after that tag was
>>> created).
>>>
>>> On Mon, Feb 5, 2018 at 1:27 PM Carlos Alonso <carlos@mrcalonso.com>
>>> wrote:
>>>
>>>> Does it make sense to try my current code `TextIO.writeCustomType` with
>>>> 2.3 or 2.4 versions or shall I move to `FileIO.writeDynamic` straight away?
>>>> Which transform on which version shall I try first?
>>>>
>>>> Regards
>>>>
>>>> On Mon, 5 Feb 2018 at 19:30, Eugene Kirpichov <kirpichov@google.com>
>>>> wrote:
>>>>
>>>>> All Beam artifacts have to be at the same version.
>>>>> Do you have the Apache Maven snapshot repository configured? Its URL
>>>>> is http://repository.apache.org/snapshots/
>>>>> If that doesn't work out, I think you should be able to clone the git
>>>>> repo and do "mvn clean install".
>>>>>
>>>>> Your new code looks good!
>>>>>
>>>>> On Mon, Feb 5, 2018 at 9:59 AM Carlos Alonso <carlos@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> Sorry Eugene, I've tried to update my dependencies to both 2.3.0-RC1
>>>>>> and 2.4.0-SNAPSHOT but none of them were able to fully build as other
>>>>>> dependencies such as beam-runners-direct-java and
>>>>>> beam-runners-google-cloud-dataflow-java are not available on those
>>>>>> versions. Can I point them to 2.2.0 safely? Otherwise, how can I
build them?
>>>>>>
>>>>>> In the meantime I've changed the DynamicDestinations implementation
>>>>>> to use FileParts and a variant for empty windows that you can see
here:
>>>>>> https://gist.github.com/calonso/15eae12ecaa3a7dfd9f55c6dede09a42
and
>>>>>> I'm running a test with it to see if that fixes the OOMs issue.
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>> On Sat, Feb 3, 2018 at 12:01 AM Eugene Kirpichov <
>>>>>> kirpichov@google.com> wrote:
>>>>>>
>>>>>>> Thanks. Yeah, please try with 2.4.0-SNAPSHOT!
>>>>>>> In your code, I'd recommend changing the DynamicDestinations
to not
>>>>>>> use DefaultFilenamePolicy.Params as the destination type. Generally,
the
>>>>>>> destination type should contain just enough information to be
able to
>>>>>>> *construct* a filename policy - in your case, using FileParts
as your
>>>>>>> destination type would be a much better choice.
>>>>>>>
>>>>>>> If you'll be trying 2.4.0-SNAPSHOT, I recommend also switching
to
>>>>>>> FileIO.writeDynamic() while you're at it.
>>>>>>> It would be something like:
>>>>>>>
>>>>>>> FileIO.writeDynamic()
>>>>>>> .by(kv -> FileParts.partsFor(kv.getValue()))
>>>>>>> .via(kv -> kv.getValue().contents(), TextIO.sink())
>>>>>>> .to(baseDir)
>>>>>>> .withNaming(parts -> (window, pane, n, i, compression) ->
>>>>>>> ...construct filename...)
>>>>>>>
>>>>>>> On Fri, Feb 2, 2018 at 8:32 AM Carlos Alonso <carlos@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> That's great!! I'll do everything I can to get to the bottom
of
>>>>>>>> this as well.
>>>>>>>>
>>>>>>>> Here:
>>>>>>>> https://gist.github.com/calonso/674990e9c57ac364b6eea3f2e6ca799d
you
>>>>>>>> have the whole transform that is responsible for writing
the windowed
>>>>>>>> messages into GCS buckets.
>>>>>>>>
>>>>>>>> I can definitely run any Beam version required. You let me
know.
>>>>>>>>
>>>>>>>> On Fri, Feb 2, 2018 at 5:25 PM Eugene Kirpichov <
>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>
>>>>>>>>> The fact that it persisted after switching to DynamicDestinations
>>>>>>>>> is very concerning and I very much would like to get
to the bottom of it.
>>>>>>>>>
>>>>>>>>> - Can you share your new code? I'm interested only in
the code of
>>>>>>>>> your DynamicDestinations subclass.
>>>>>>>>> - Is it an option for you to use Beam 2.4.0-SNAPSHOT,
which
>>>>>>>>> contains more logging that will help debug this?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Feb 2, 2018 at 8:03 AM Carlos Alonso <carlos@mrcalonso.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Great!! That cast worked and I managed to get a pipeline
working
>>>>>>>>>> with that overload of the TypedWrite#to() method.
Thanks Eugene!!
>>>>>>>>>>
>>>>>>>>>> The bad news is that I'm still getting OOMs with
very similar
>>>>>>>>>> Stack traces and dominator trees... After a few hours
running I got a
>>>>>>>>>> couple more OOMs with this stacks traces:
>>>>>>>>>> https://pastebin.com/4MH2xFjH and https://pastebin.com/VfTRNVnk
and
>>>>>>>>>> the dominator trees you can see below... This is
the job id if it
>>>>>>>>>> helps: 2018-02-02_03_07_20-1859206457474601634
>>>>>>>>>>
>>>>>>>>>> Thanks!!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 2, 2018 at 12:32 AM Eugene Kirpichov
<
>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Sorry, my bad - previous message was wrong. You
*do* need to use
>>>>>>>>>>> TextIO.writeCustomType(). It does *not* force
the destination type to be
>>>>>>>>>>> void - it is indeed void on the return value
of writeCustomType(), but
>>>>>>>>>>> .to(DynamicDestinations) changes the destination
type (if you include
>>>>>>>>>>> https://github.com/apache/beam/pull/4319/files
that is -
>>>>>>>>>>> otherwise you need a raw cast: e.g. TextIO.TypedWrite<MyType,
>>>>>>>>>>> MyDestinationType> write = (TextIO.TypedWrite)
>>>>>>>>>>> TextIO.writeCustomType().to(dynamicDestinations)
or something like that)
>>>>>>>>>>>
>>>>>>>>>>> The OOM errors were, I think, caused by having
too many GCS
>>>>>>>>>>> upload buffers active at the same time, because
too many GCS writers were
>>>>>>>>>>> open at the same time here
>>>>>>>>>>> https://github.com/apache/beam/blob/v2.2.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java#L522
-
>>>>>>>>>>> the map of writers is supposed to typically have
only 1 element, modulo
>>>>>>>>>>> hash collisions, but for some reason that I haven't
figure out it probably
>>>>>>>>>>> had a lot more. If you update to a newer version
of Beam, it'll also print
>>>>>>>>>>> more logging to debug this.
>>>>>>>>>>>
>>>>>>>>>>> I'm surprised that this doesn't appear in your
OOM dumps, I
>>>>>>>>>>> suppose that's because they all became unreachable
and got GCd after the
>>>>>>>>>>> exception was thrown. It seems that the Dataflow
worker dumps only live
>>>>>>>>>>> objects in the heap - this investigation indicates
we should change that on
>>>>>>>>>>> Dataflow side.
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Feb 1, 2018 at 10:41 AM Carlos Alonso
<
>>>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Eugene!!
>>>>>>>>>>>>
>>>>>>>>>>>> Thank you very much for your help!! I'm really
willing to
>>>>>>>>>>>> rewrite that bit of code but, TBH, I don't
know how to do that. Using
>>>>>>>>>>>> TextIO.write() forces my DynamicDestinations
to be of type <String, ?,
>>>>>>>>>>>> String>, which means, if I didn't misunderstood
anything, that the input
>>>>>>>>>>>> type has to be a String already... Could
you please provide a quick example
>>>>>>>>>>>> on how could I write it to have a custom
type as an input?
>>>>>>>>>>>>
>>>>>>>>>>>> The only way I can think of around this is
to encode the route
>>>>>>>>>>>> and the contents within the same string and
then split them in the
>>>>>>>>>>>> DynamicDestinations methods... but I think
that's not what you were
>>>>>>>>>>>> suggesting...
>>>>>>>>>>>>
>>>>>>>>>>>> Aside from that, just out of curiosity, could
I know what was
>>>>>>>>>>>> causing the OOM errors that you saw?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks again for your help, really appreciate
it!!
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Feb 1, 2018 at 4:17 PM Eugene Kirpichov
<
>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi! The full stack trace confirms what
I saw in Dataflow-side
>>>>>>>>>>>>> logs, thanks.
>>>>>>>>>>>>> To use to(DynamicDestinations) you need
to use
>>>>>>>>>>>>> TextIO.write().to(DynamicDestinations),
not writeCustomType(). Note PR
>>>>>>>>>>>>> https://github.com/apache/beam/pull/4319
that fixes a typo in
>>>>>>>>>>>>> TextIO.write().to() - you can circumvent
that issue with a raw type cast if
>>>>>>>>>>>>> you really want to stay on officially
released Beam 2.2.0.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jan 31, 2018 at 7:01 AM Carlos
Alonso <
>>>>>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi again Eugene.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Many thanks for your comments!
>>>>>>>>>>>>>> I've pasted the full stack trace
in this paste:
>>>>>>>>>>>>>> https://pastebin.com/HYJvPppY
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> About using the suggested overload
of .to() I've been trying
>>>>>>>>>>>>>> to do it unsuccessfully as I don't
think I understand the way it should be
>>>>>>>>>>>>>> used.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Basically TextIO.writeCustomType()
forces the
>>>>>>>>>>>>>> DynamicDestinations implementation
to be of <UserT, Void, OutputT> which,
>>>>>>>>>>>>>> in my case would be <KV[String,
WindowedDoc], Void, String>, but I don't
>>>>>>>>>>>>>> know how to provide a destination(path)
for each KV[String, WindowedDoc]
>>>>>>>>>>>>>> through Void, as the override method
getDestination will get one of my
>>>>>>>>>>>>>> KV[String, WindowedDoc] and output
Void and getFilenamePolicy receives Void
>>>>>>>>>>>>>> and outputs a FilenamePolicy.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> My idea is that each windowedDoc
has information on the
>>>>>>>>>>>>>> window it is contained and then,
all of the elements within the same window
>>>>>>>>>>>>>> are saved on the same file, named
with the start datetime of the window.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Can you please provide an example
of how to do it with
>>>>>>>>>>>>>> TextIO.writeCustomType and a DynamicDestination
implementation? I've been
>>>>>>>>>>>>>> looking through the examples and
only the overload of .to() that I'm
>>>>>>>>>>>>>> originally using is shown:
>>>>>>>>>>>>>> https://github.com/apache/beam/blob/29859eb54d05b96a9db477e7bb04537510273bd2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java#L280
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks again for your help!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Jan 31, 2018 at 12:45 AM
Eugene Kirpichov <
>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Actually hold on, I'm not sure
that what I said is correct.
>>>>>>>>>>>>>>> This overload of .to() is not
my favorite :-| Can you try using the more
>>>>>>>>>>>>>>> explicit one, with DynamicDestinations
- or still better (if you can use
>>>>>>>>>>>>>>> Beam 2.3), use FileIO.writeDynamic()?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Tue, Jan 30, 2018 at 3:32
PM Eugene Kirpichov <
>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hmm the out of memory errors
are actually happening in
>>>>>>>>>>>>>>>> WriteFiles, not in your code.
>>>>>>>>>>>>>>>> When you said that https://pastebin.com/0vfE6pUg
is the
>>>>>>>>>>>>>>>> full trace, did you mean
that this is all you see in the UI? It seems that
>>>>>>>>>>>>>>>> this is just the top-level
exception but it is omitting the nested chain of
>>>>>>>>>>>>>>>> causes ("Caused by: ..."),
and the root cause is the OOM in
>>>>>>>>>>>>>>>> WriteFiles/WriteShardedBundles.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I have a hypothesis as to
why they might be happening.
>>>>>>>>>>>>>>>> You're using a type called
WindowedDoc as your destination type - does it
>>>>>>>>>>>>>>>> have hashCode() and equals()
properly defined on it? If no, that could lead
>>>>>>>>>>>>>>>> to this issue (and also to
simply incorrect behavior), because it's used as
>>>>>>>>>>>>>>>> a key in hashmaps inside
that transform. And what is the coder used for
>>>>>>>>>>>>>>>> that type?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018 at 12:28
PM Carlos Alonso <
>>>>>>>>>>>>>>>> carlos@mrcalonso.com>
wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Sure!:
>>>>>>>>>>>>>>>>> https://gist.github.com/calonso/14b06f4f58faf286cc79181cb46a9b85
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It would be weird though,
as the bigger the buffer, the
>>>>>>>>>>>>>>>>> less OOM errors I see,
but that could totally be something I'm
>>>>>>>>>>>>>>>>> misunderstanding.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks again for your
help!!
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018
at 7:32 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>> kirpichov@google.com>
wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The dumps look fairly
consistent. I suspect that the
>>>>>>>>>>>>>>>>>> memory is being hogged
by data you're buffering in BufferMessagesDoFn, can
>>>>>>>>>>>>>>>>>> you show its code?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018,
10:12 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>> carlos@mrcalonso.com>
wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Not all dumps
are 1GB, here you can see a couple more of
>>>>>>>>>>>>>>>>>>> them with bigger
heaps
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> A couple of job
>>>>>>>>>>>>>>>>>>> ids: 2018-01-30_02_57_28-751284895952373783
>>>>>>>>>>>>>>>>>>> and 2018-01-29_03_09_28-5756483832988685011
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> About relevant
parts of my code, here:
>>>>>>>>>>>>>>>>>>> https://gist.github.com/calonso/eec2e1f2b8dd034af429732939eed6ec
>>>>>>>>>>>>>>>>>>> you can see the
most relevant bits with comments, I hope that is easy to
>>>>>>>>>>>>>>>>>>> understand, let
me know otherwise.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for your
help!!
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> [image: Screen
Shot 2018-01-30 at 18.27.11.png][image:
>>>>>>>>>>>>>>>>>>> Screen Shot 2018-01-30
at 18.26.07.png]
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Mon, Jan 29,
2018 at 10:53 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>> kirpichov@google.com>
wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Regarding
how dynamic writes work: it's considerably
>>>>>>>>>>>>>>>>>>>> more complex
than just using destination as the key; it depends also on how
>>>>>>>>>>>>>>>>>>>> you configure
your sharding, how many destinations there are etc. - see
>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
(it
>>>>>>>>>>>>>>>>>>>> is probably
the second most complex transform in all of Beam, second only
>>>>>>>>>>>>>>>>>>>> to BigQueryIO.write()...).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Mon, Jan
29, 2018 at 1:50 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>> kirpichov@google.com>
wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hmm it's
weird, this heap dump seems to be of size
>>>>>>>>>>>>>>>>>>>>> just
1GB which is way below the limit your workers should have. Are the
>>>>>>>>>>>>>>>>>>>>> dumps
all small like that?
>>>>>>>>>>>>>>>>>>>>> Can you
share a Dataflow job ID and some relevant part
>>>>>>>>>>>>>>>>>>>>> of your
code?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Mon,
Jan 29, 2018 at 12:02 PM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>> carlos@mrcalonso.com>
wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> We're
using n1-standard-4 (4 vCPUs, 15 GB memory)
>>>>>>>>>>>>>>>>>>>>>> machines
(the default ones). And please, find the dominator tree view of
>>>>>>>>>>>>>>>>>>>>>> one
of our heap dumps.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> About
the code for dynamic writes... Could you
>>>>>>>>>>>>>>>>>>>>>> quickly
summarise what does it do? From what I've dive into the code I
>>>>>>>>>>>>>>>>>>>>>> think
I saw a reduce by key operation that I guessed uses the file's path
>>>>>>>>>>>>>>>>>>>>>> as
the key. Is that correct? Does that mean that the more files the more
>>>>>>>>>>>>>>>>>>>>>> the
work can be parallelised?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> [image:
Screen Shot 2018-01-29 at 20.57.15.png]
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On
Mon, Jan 29, 2018 at 8:04 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>> kirpichov@google.com>
wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
The code for doing dynamic writes tries to limit its
>>>>>>>>>>>>>>>>>>>>>>>
memory usage, so it wouldn't be my first place to look (but I wouldn't rule
>>>>>>>>>>>>>>>>>>>>>>>
it out). Are you using workers with a large number of cores or threads per
>>>>>>>>>>>>>>>>>>>>>>>
worker?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
In MAT or YourKit, the most useful tool is the
>>>>>>>>>>>>>>>>>>>>>>>
Dominator Tree. Can you paste a screenshot of the dominator tree expanded
>>>>>>>>>>>>>>>>>>>>>>>
to some reasonable depth?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
On Mon, Jan 29, 2018, 10:45 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>>>
carlos@mrcalonso.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
Hi Eugene!
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
Thanks for your comments. Yes, we've downloaded a
>>>>>>>>>>>>>>>>>>>>>>>>
couple of dumps, but TBH, couldn't understand anything (using the Eclipse
>>>>>>>>>>>>>>>>>>>>>>>>
MAT), I was wondering if the trace and the fact that "the smaller the
>>>>>>>>>>>>>>>>>>>>>>>>
buffer, the more OOM errors" could give any of you a hint as I think it may
>>>>>>>>>>>>>>>>>>>>>>>>
be on the writing part...
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
Do you know how the dynamic writes are distributed
>>>>>>>>>>>>>>>>>>>>>>>>
on workers? Based on the full path?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
Regards
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
On Mon, Jan 29, 2018 at 7:38 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>>>>
kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
Hi, I assume you're using the Dataflow runner.
>>>>>>>>>>>>>>>>>>>>>>>>>
Have you tried using the OOM debugging flags at
>>>>>>>>>>>>>>>>>>>>>>>>>
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineDebugOptions.java#L169-L193
>>>>>>>>>>>>>>>>>>>>>>>>>
 ?
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
On Mon, Jan 29, 2018 at 2:05 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>>>>>
carlos@mrcalonso.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
Hi everyone!!
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
On our pipeline we're experiencing OOM errors.
>>>>>>>>>>>>>>>>>>>>>>>>>>
It's been a while since we've been trying to nail them down but without any
>>>>>>>>>>>>>>>>>>>>>>>>>>
luck.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
Our pipeline:
>>>>>>>>>>>>>>>>>>>>>>>>>>
1. Reads messages from PubSub of different types
>>>>>>>>>>>>>>>>>>>>>>>>>>
(about 50 different types).
>>>>>>>>>>>>>>>>>>>>>>>>>>
2. Outputs KV elements being the key the type and
>>>>>>>>>>>>>>>>>>>>>>>>>>
the value the element message itself (JSON)
>>>>>>>>>>>>>>>>>>>>>>>>>>
3. Applies windowing (fixed windows of one our.
>>>>>>>>>>>>>>>>>>>>>>>>>>
With early and late firings after one minute after processing the first
>>>>>>>>>>>>>>>>>>>>>>>>>>
element in pane). Two days allowed lateness and discarding fired panes.
>>>>>>>>>>>>>>>>>>>>>>>>>>
4. Buffers the elements (using stateful and
>>>>>>>>>>>>>>>>>>>>>>>>>>
timely processing). We buffer the elements for 15 minutes or until it
>>>>>>>>>>>>>>>>>>>>>>>>>>
reaches a maximum size of 16Mb. This step's objective is to avoid window's
>>>>>>>>>>>>>>>>>>>>>>>>>>
panes grow too big.
>>>>>>>>>>>>>>>>>>>>>>>>>>
5. Writes the outputted elements to files in GCS
>>>>>>>>>>>>>>>>>>>>>>>>>>
using dynamic routing. Being the route: type/window/buffer_index-paneInfo.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
Our fist step was without buffering and just
>>>>>>>>>>>>>>>>>>>>>>>>>>
windows with early and late firings on 15 minutes, but. guessing that OOMs
>>>>>>>>>>>>>>>>>>>>>>>>>>
were because of panes growing too big we built that buffering step to
>>>>>>>>>>>>>>>>>>>>>>>>>>
trigger on size as well.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
The full trace we're seeing can be found here:
>>>>>>>>>>>>>>>>>>>>>>>>>>
https://pastebin.com/0vfE6pUg
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
There's an annoying thing we've experienced and
>>>>>>>>>>>>>>>>>>>>>>>>>>
is that, the smaller the buffer size, the more OOM errors we see which was
>>>>>>>>>>>>>>>>>>>>>>>>>>
a bit disappointing...
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
Can anyone please give us any hint?
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
Thanks in advance!
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>

Mime
View raw message