beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carlos Alonso <>
Subject Re: Chasing OOM errors
Date Wed, 07 Feb 2018 22:58:17 GMT
BTW, the approach I took was to reduce the GCS upload buffer size to
the 1048576
you suggested.

On Wed, Feb 7, 2018 at 11:41 PM Carlos Alonso <> wrote:

> That's great!! The job is running smoothly without a single OOM so far...
> I'm moving now to increasing the load (rate of incoming messages) on the
> job and got into a situation where the job got completely stuck. It started
> with one worker and tried to scale to 8 (maxNumWorkers), but could only get
> to 7 due to a quota issue. At that point the job stopped moving, as if it
> couldn't distribute work across workers. Eventually had an OOM whose trace
> you can see here: and the job ID
> is: 2018-02-07_08_59_35-14489372822523694308. I don't think it is relevant
> though, but just in case.
> The important bit I'd like to understand is why the job gets stuck.
> After cancelling that job and restarting removing all 'Metrics.counter'
> (because I was submitting too much of those uselessly), numWorkers=4 and
> maxNumWorkers=7 (to avoid the quota issue), the job managed to run
> successfully (it is actually still running with this
> id: 2018-02-07_09_49_57-11129391338943683710). It first scaled to 7 workers
> and after clearing the piled unacknowledged messages the previous job left,
> it scaled down again to 2. There are a couple OOMs though, but I don't
> think they are that important as they seem very scarce.
> Could you please help me understand why the scaling did not worked in that
> first scenario? I wonder if it was due to the Metrics, the jump from 1 to 7
> workers or the fact that it tried to get to 8 but just could get 7 or a
> combination of those or a different issue...
> Thanks again for all your help!
> On Wed, Feb 7, 2018 at 5:46 PM Eugene Kirpichov <>
> wrote:
>> Yes, the reason is the same as for 2.2.0, so you can stay on 2.2.0 and do
>> nothing. This actually would be my recommendation.
>> Errors never cause data loss in Beam (unless something is implemented
>> incorrectly, of course), but if your job was stuck repeatedly throwing OOMs
>> then it would be able to write the data only once it stopped doing that.
>> However it turned out that it threw only a couple OOMs and went on its way.
>> I suggest to save those workarounds for a rainy day when you have
>> something that gets stuck in throwing OOMs.
>> - Yes, reducing maximum number of threads will yield fewer parallelism
>> and you'll be likely to need more workers.
>> - Decreasing GCS upload buffer will reduce the performance of writing
>> large files (that are much larger than the buffer).
>> Grouping by destinations uses a GroupByKey by a hash of encoded bytes of
>> the destination (using the destination coder), and also uses a local hash
>> map on the worker in case the hashes collide. So the coder has to be
>> deterministic (the transform enforces this) and consistent with
>> equals()/hashCode() (I don't think the transform enforces this). In other
>> words, it groups by "structural value". That's effectively the same
>> requirements as for any other operation that groups by a key.
>> On Wed, Feb 7, 2018 at 6:49 AM Carlos Alonso <>
>> wrote:
>>> And also...
>>> Was the OOM error caused for the same reason on the old 2.2.0 version of
>>> the job? If that was the case I could take any of the workarounds and stick
>>> with the "official" 2.2.0, what do you think about it?
>>> Thanks!
>>> On Wed, Feb 7, 2018 at 11:26 AM Carlos Alonso <>
>>> wrote:
>>>> Thanks Eugene!
>>>> I'll try those options and will let you know but I'd also like to know
>>>> about the implications of them:
>>>> - What could happen if I reduce the number of threads? It will lose
>>>> "parallelisation capacity" and thus more likely to need more workers, right?
>>>> - What could happen if I reduce the GCS upload buffer size bytes? It
>>>> will need to send more "packets" to complete an upload, right?
>>>> Aside from those I have a couple of outstanding questions:
>>>> - From the .by() comments it says that specifies how to partition
>>>> elements into groups (destinations). How does it exactly works? by using
>>>> the hashCode of the object returned?
>>>> - Were the previous versions of the job (using 2.2.0) likely to be
>>>> losing data due to the OOM errors?
>>>> Thanks again for all your help. Really appreciate it!
>>>> On Wed, Feb 7, 2018 at 1:37 AM Eugene Kirpichov <>
>>>> wrote:
>>>>> Ugh. The logging in 2.4.0-SNAPSHOT helped me identify the issue, and
>>>>> it is more mundane than expected.
>>>>> TL;DR Your job seems to be running fine, and it's not losing any data.
>>>>> You can simply do nothing. There was only a handful of OOM errors, they
>>>>> were transient, and were later successfully retried.
>>>>> However, if you want to not worry about any OOMs, your easy
>>>>> workarounds are any of the following:
>>>>> - Specify --numberOfWorkerHarnessThreads=(something not larger than
>>>>> 100 or so - by default in streaming worker it is limited to a few hundred)
>>>>> - Specify --gcsUploadBufferSizeBytes=1048576 or something like that
>>>>> (as opposed to the default value of 8x that)
>>>>> - Use a worker with more memory (I think any machine type larger than
>>>>> the one you're using will do)
>>>>> The issue is that the Dataflow streaming worker by default (currently
>>>>> - do NOT rely on these values to stay fixed) uses up to 300 threads, and
>>>>> about 5.7GB of memory (out of the 15GB of n1-standard-4 - the rest is
>>>>> reserved for various purposes); your code (e.g. storing the stateful DoFn
>>>>> state) uses some part of that. When the worker receives data for many
>>>>> destinations at the same time, it processes them all at the same time with
>>>>> all 300 threads, each of which uses 1 8Mb upload buffer for GCS = 2.4GB
>>>>> which is not that much but pushes the worker over the limit.
>>>>> The "proper" fixes for that on our side could be e.g. throttling
>>>>> number of streaming worker threads based on memory pressure (which is
>>>>> surprisingly difficult to do given Java's GC) - but I'm not sure it's worth
>>>>> it right now given that your code is effectively running successfully.
>>>>> On Tue, Feb 6, 2018 at 2:14 PM Carlos Alonso <>
>>>>> wrote:
>>>>>> Hi Eugene.
>>>>>> I've updated to 2.4.0-SNAPSHOT because I think 2.3.0-SNAPSHOT's
>>>>>> FileNaming is unusable as this commit:
>>>>>> is
>>>>>> missing.
>>>>>> This is the new code I've developed to use FileIO.writeDynamic API:
>>>>>> Unfortunately OOM errors are still around :( Here you can see a full
>>>>>> stack trace: Although, it is weird I
>>>>>> cannot find the errors on StackDriver logs (I got that stack trace from the
>>>>>> Dataflow UI. Job ID: 2018-02-06_10_11_02-10621326282488809928)
>>>>>> Please, find the Dominator tree of the heap dump below:
>>>>>> There's one thing I'd like to know. From the .by() comments it says
>>>>>> that specifies how to partition elements into groups (destinations). How
>>>>>> does it exactly works? by using the hashCode of the object returned?
>>>>>> Thanks!
>>>>>> [image: Screen Shot 2018-02-06 at 23.08.42.png]
>>>>>> On Mon, Feb 5, 2018 at 10:36 PM Eugene Kirpichov <
>>>>>>> wrote:
>>>>>>> Strongly recommend to move to FileIO.writeDynamic; it's a better API
>>>>>>> in many ways (though underlying implementation is currently the same).
>>>>>>> I think you can use either 2.3.0-SNAPSHOT or 2.4.0-SNAPSHOT - right
>>>>>>> now (in the middle of 2.3.0 release being created) they should be almost
>>>>>>> the same anyway (the former is the last snapshot built from master before
>>>>>>> 2.3.0 tag was created, the latter is the last snapshot built after that tag
>>>>>>> was created).
>>>>>>> On Mon, Feb 5, 2018 at 1:27 PM Carlos Alonso <>
>>>>>>> wrote:
>>>>>>>> Does it make sense to try my current code `TextIO.writeCustomType`
>>>>>>>> with 2.3 or 2.4 versions or shall I move to `FileIO.writeDynamic` straight
>>>>>>>> away? Which transform on which version shall I try first?
>>>>>>>> Regards
>>>>>>>> On Mon, 5 Feb 2018 at 19:30, Eugene Kirpichov <>
>>>>>>>> wrote:
>>>>>>>>> All Beam artifacts have to be at the same version.
>>>>>>>>> Do you have the Apache Maven snapshot repository configured? Its
>>>>>>>>> URL is
>>>>>>>>> If that doesn't work out, I think you should be able to clone the
>>>>>>>>> git repo and do "mvn clean install".
>>>>>>>>> Your new code looks good!
>>>>>>>>> On Mon, Feb 5, 2018 at 9:59 AM Carlos Alonso <>
>>>>>>>>> wrote:
>>>>>>>>>> Sorry Eugene, I've tried to update my dependencies to both
>>>>>>>>>> 2.3.0-RC1 and 2.4.0-SNAPSHOT but none of them were able to fully build as
>>>>>>>>>> other dependencies such as beam-runners-direct-java and
>>>>>>>>>> beam-runners-google-cloud-dataflow-java are not available on those
>>>>>>>>>> versions. Can I point them to 2.2.0 safely? Otherwise, how can I build them?
>>>>>>>>>> In the meantime I've changed the DynamicDestinations
>>>>>>>>>> implementation to use FileParts and a variant for empty windows that you
>>>>>>>>>> can see here:
>>>>>>>>>> and
>>>>>>>>>> I'm running a test with it to see if that fixes the OOMs issue.
>>>>>>>>>> Thanks!
>>>>>>>>>> On Sat, Feb 3, 2018 at 12:01 AM Eugene Kirpichov <
>>>>>>>>>>> wrote:
>>>>>>>>>>> Thanks. Yeah, please try with 2.4.0-SNAPSHOT!
>>>>>>>>>>> In your code, I'd recommend changing the DynamicDestinations to
>>>>>>>>>>> not use DefaultFilenamePolicy.Params as the destination type. Generally,
>>>>>>>>>>> the destination type should contain just enough information to be able to
>>>>>>>>>>> *construct* a filename policy - in your case, using FileParts as your
>>>>>>>>>>> destination type would be a much better choice.
>>>>>>>>>>> If you'll be trying 2.4.0-SNAPSHOT, I recommend also switching
>>>>>>>>>>> to FileIO.writeDynamic() while you're at it.
>>>>>>>>>>> It would be something like:
>>>>>>>>>>> FileIO.writeDynamic()
>>>>>>>>>>> .by(kv -> FileParts.partsFor(kv.getValue()))
>>>>>>>>>>> .via(kv -> kv.getValue().contents(), TextIO.sink())
>>>>>>>>>>> .to(baseDir)
>>>>>>>>>>> .withNaming(parts -> (window, pane, n, i, compression) ->
>>>>>>>>>>> ...construct filename...)
>>>>>>>>>>> On Fri, Feb 2, 2018 at 8:32 AM Carlos Alonso <
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> That's great!! I'll do everything I can to get to the bottom of
>>>>>>>>>>>> this as well.
>>>>>>>>>>>> Here:
>>>>>>>>>>>> you
>>>>>>>>>>>> have the whole transform that is responsible for writing the windowed
>>>>>>>>>>>> messages into GCS buckets.
>>>>>>>>>>>> I can definitely run any Beam version required. You let me know.
>>>>>>>>>>>> On Fri, Feb 2, 2018 at 5:25 PM Eugene Kirpichov <
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> The fact that it persisted after switching to
>>>>>>>>>>>>> DynamicDestinations is very concerning and I very much would like to get to
>>>>>>>>>>>>> the bottom of it.
>>>>>>>>>>>>> - Can you share your new code? I'm interested only in the code
>>>>>>>>>>>>> of your DynamicDestinations subclass.
>>>>>>>>>>>>> - Is it an option for you to use Beam 2.4.0-SNAPSHOT, which
>>>>>>>>>>>>> contains more logging that will help debug this?
>>>>>>>>>>>>> On Fri, Feb 2, 2018 at 8:03 AM Carlos Alonso <
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Great!! That cast worked and I managed to get a pipeline
>>>>>>>>>>>>>> working with that overload of the TypedWrite#to() method. Thanks Eugene!!
>>>>>>>>>>>>>> The bad news is that I'm still getting OOMs with very similar
>>>>>>>>>>>>>> Stack traces and dominator trees... After a few hours running I got a
>>>>>>>>>>>>>> couple more OOMs with this stacks traces:
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>> and the dominator trees you
>>>>>>>>>>>>>> can see below... This is the job id if it
>>>>>>>>>>>>>> helps: 2018-02-02_03_07_20-1859206457474601634
>>>>>>>>>>>>>> Thanks!!
>>>>>>>>>>>>>> On Fri, Feb 2, 2018 at 12:32 AM Eugene Kirpichov <
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> Sorry, my bad - previous message was wrong. You *do* need to
>>>>>>>>>>>>>>> use TextIO.writeCustomType(). It does *not* force the destination type to
>>>>>>>>>>>>>>> be void - it is indeed void on the return value of writeCustomType(), but
>>>>>>>>>>>>>>> .to(DynamicDestinations) changes the destination type (if you include
>>>>>>>>>>>>>>> that is -
>>>>>>>>>>>>>>> otherwise you need a raw cast: e.g. TextIO.TypedWrite<MyType,
>>>>>>>>>>>>>>> MyDestinationType> write = (TextIO.TypedWrite)
>>>>>>>>>>>>>>> TextIO.writeCustomType().to(dynamicDestinations) or something like that)
>>>>>>>>>>>>>>> The OOM errors were, I think, caused by having too many GCS
>>>>>>>>>>>>>>> upload buffers active at the same time, because too many GCS writers were
>>>>>>>>>>>>>>> open at the same time here
>>>>>>>>>>>>>>> -
>>>>>>>>>>>>>>> the map of writers is supposed to typically have only 1 element, modulo
>>>>>>>>>>>>>>> hash collisions, but for some reason that I haven't figure out it probably
>>>>>>>>>>>>>>> had a lot more. If you update to a newer version of Beam, it'll also print
>>>>>>>>>>>>>>> more logging to debug this.
>>>>>>>>>>>>>>> I'm surprised that this doesn't appear in your OOM dumps, I
>>>>>>>>>>>>>>> suppose that's because they all became unreachable and got GCd after the
>>>>>>>>>>>>>>> exception was thrown. It seems that the Dataflow worker dumps only live
>>>>>>>>>>>>>>> objects in the heap - this investigation indicates we should change that on
>>>>>>>>>>>>>>> Dataflow side.
>>>>>>>>>>>>>>> On Thu, Feb 1, 2018 at 10:41 AM Carlos Alonso <
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> Hi Eugene!!
>>>>>>>>>>>>>>>> Thank you very much for your help!! I'm really willing to
>>>>>>>>>>>>>>>> rewrite that bit of code but, TBH, I don't know how to do that. Using
>>>>>>>>>>>>>>>> TextIO.write() forces my DynamicDestinations to be of type <String, ?,
>>>>>>>>>>>>>>>> String>, which means, if I didn't misunderstood anything, that the input
>>>>>>>>>>>>>>>> type has to be a String already... Could you please provide a quick example
>>>>>>>>>>>>>>>> on how could I write it to have a custom type as an input?
>>>>>>>>>>>>>>>> The only way I can think of around this is to encode the
>>>>>>>>>>>>>>>> route and the contents within the same string and then split them in the
>>>>>>>>>>>>>>>> DynamicDestinations methods... but I think that's not what you were
>>>>>>>>>>>>>>>> suggesting...
>>>>>>>>>>>>>>>> Aside from that, just out of curiosity, could I know what
>>>>>>>>>>>>>>>> was causing the OOM errors that you saw?
>>>>>>>>>>>>>>>> Thanks again for your help, really appreciate it!!
>>>>>>>>>>>>>>>> On Thu, Feb 1, 2018 at 4:17 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> Hi! The full stack trace confirms what I saw in
>>>>>>>>>>>>>>>>> Dataflow-side logs, thanks.
>>>>>>>>>>>>>>>>> To use to(DynamicDestinations) you need to use
>>>>>>>>>>>>>>>>> TextIO.write().to(DynamicDestinations), not writeCustomType(). Note PR
>>>>>>>>>>>>>>>>> that fixes a
>>>>>>>>>>>>>>>>> typo in TextIO.write().to() - you can circumvent that issue with a raw type
>>>>>>>>>>>>>>>>> cast if you really want to stay on officially released Beam 2.2.0.
>>>>>>>>>>>>>>>>> On Wed, Jan 31, 2018 at 7:01 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> Hi again Eugene.
>>>>>>>>>>>>>>>>>> Many thanks for your comments!
>>>>>>>>>>>>>>>>>> I've pasted the full stack trace in this paste:
>>>>>>>>>>>>>>>>>> About using the suggested overload of .to() I've been
>>>>>>>>>>>>>>>>>> trying to do it unsuccessfully as I don't think I understand the way it
>>>>>>>>>>>>>>>>>> should be used.
>>>>>>>>>>>>>>>>>> Basically TextIO.writeCustomType() forces the
>>>>>>>>>>>>>>>>>> DynamicDestinations implementation to be of <UserT, Void, OutputT> which,
>>>>>>>>>>>>>>>>>> in my case would be <KV[String, WindowedDoc], Void, String>, but I don't
>>>>>>>>>>>>>>>>>> know how to provide a destination(path) for each KV[String, WindowedDoc]
>>>>>>>>>>>>>>>>>> through Void, as the override method getDestination will get one of my
>>>>>>>>>>>>>>>>>> KV[String, WindowedDoc] and output Void and getFilenamePolicy receives Void
>>>>>>>>>>>>>>>>>> and outputs a FilenamePolicy.
>>>>>>>>>>>>>>>>>> My idea is that each windowedDoc has information on the
>>>>>>>>>>>>>>>>>> window it is contained and then, all of the elements within the same window
>>>>>>>>>>>>>>>>>> are saved on the same file, named with the start datetime of the window.
>>>>>>>>>>>>>>>>>> Can you please provide an example of how to do it with
>>>>>>>>>>>>>>>>>> TextIO.writeCustomType and a DynamicDestination implementation? I've been
>>>>>>>>>>>>>>>>>> looking through the examples and only the overload of .to() that I'm
>>>>>>>>>>>>>>>>>> originally using is shown:
>>>>>>>>>>>>>>>>>> Thanks again for your help!
>>>>>>>>>>>>>>>>>> On Wed, Jan 31, 2018 at 12:45 AM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> Actually hold on, I'm not sure that what I said is
>>>>>>>>>>>>>>>>>>> correct. This overload of .to() is not my favorite :-| Can you try using
>>>>>>>>>>>>>>>>>>> the more explicit one, with DynamicDestinations - or still better (if you
>>>>>>>>>>>>>>>>>>> can use Beam 2.3), use FileIO.writeDynamic()?
>>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018 at 3:32 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> Hmm the out of memory errors are actually happening in
>>>>>>>>>>>>>>>>>>>> WriteFiles, not in your code.
>>>>>>>>>>>>>>>>>>>> When you said that is
>>>>>>>>>>>>>>>>>>>> the full trace, did you mean that this is all you see in the UI? It seems
>>>>>>>>>>>>>>>>>>>> that this is just the top-level exception but it is omitting the nested
>>>>>>>>>>>>>>>>>>>> chain of causes ("Caused by: ..."), and the root cause is the OOM in
>>>>>>>>>>>>>>>>>>>> WriteFiles/WriteShardedBundles.
>>>>>>>>>>>>>>>>>>>> I have a hypothesis as to why they might be happening.
>>>>>>>>>>>>>>>>>>>> You're using a type called WindowedDoc as your destination type - does it
>>>>>>>>>>>>>>>>>>>> have hashCode() and equals() properly defined on it? If no, that could lead
>>>>>>>>>>>>>>>>>>>> to this issue (and also to simply incorrect behavior), because it's used as
>>>>>>>>>>>>>>>>>>>> a key in hashmaps inside that transform. And what is the coder used for
>>>>>>>>>>>>>>>>>>>> that type?
>>>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018 at 12:28 PM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> Sure!:
>>>>>>>>>>>>>>>>>>>>> It would be weird though, as the bigger the buffer,
>>>>>>>>>>>>>>>>>>>>> the less OOM errors I see, but that could totally be something I'm
>>>>>>>>>>>>>>>>>>>>> misunderstanding.
>>>>>>>>>>>>>>>>>>>>> Thanks again for your help!!
>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018 at 7:32 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> The dumps look fairly consistent. I suspect that the
>>>>>>>>>>>>>>>>>>>>>> memory is being hogged by data you're buffering in BufferMessagesDoFn, can
>>>>>>>>>>>>>>>>>>>>>> you show its code?
>>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018, 10:12 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> Not all dumps are 1GB, here you can see a couple
>>>>>>>>>>>>>>>>>>>>>>> more of them with bigger heaps
>>>>>>>>>>>>>>>>>>>>>>> A couple of job
>>>>>>>>>>>>>>>>>>>>>>> ids: 2018-01-30_02_57_28-751284895952373783
>>>>>>>>>>>>>>>>>>>>>>> and 2018-01-29_03_09_28-5756483832988685011
>>>>>>>>>>>>>>>>>>>>>>> About relevant parts of my code, here:
>>>>>>>>>>>>>>>>>>>>>>> you can see the most relevant bits with comments, I hope that is easy to
>>>>>>>>>>>>>>>>>>>>>>> understand, let me know otherwise.
>>>>>>>>>>>>>>>>>>>>>>> Thanks for your help!!
>>>>>>>>>>>>>>>>>>>>>>> [image: Screen Shot 2018-01-30 at 18.27.11.png][image:
>>>>>>>>>>>>>>>>>>>>>>> Screen Shot 2018-01-30 at 18.26.07.png]
>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 10:53 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>> Regarding how dynamic writes work: it's
>>>>>>>>>>>>>>>>>>>>>>>> considerably more complex than just using destination as the key; it
>>>>>>>>>>>>>>>>>>>>>>>> depends also on how you configure your sharding, how many destinations
>>>>>>>>>>>>>>>>>>>>>>>> there are etc. - see
>>>>>>>>>>>>>>>>>>>>>>>> (it
>>>>>>>>>>>>>>>>>>>>>>>> is probably the second most complex transform in all of Beam, second only
>>>>>>>>>>>>>>>>>>>>>>>> to BigQueryIO.write()...).
>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 1:50 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>> Hmm it's weird, this heap dump seems to be of size
>>>>>>>>>>>>>>>>>>>>>>>>> just 1GB which is way below the limit your workers should have. Are the
>>>>>>>>>>>>>>>>>>>>>>>>> dumps all small like that?
>>>>>>>>>>>>>>>>>>>>>>>>> Can you share a Dataflow job ID and some relevant
>>>>>>>>>>>>>>>>>>>>>>>>> part of your code?
>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 12:02 PM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> We're using n1-standard-4 (4 vCPUs, 15 GB memory)
>>>>>>>>>>>>>>>>>>>>>>>>>> machines (the default ones). And please, find the dominator tree view of
>>>>>>>>>>>>>>>>>>>>>>>>>> one of our heap dumps.
>>>>>>>>>>>>>>>>>>>>>>>>>> About the code for dynamic writes... Could you
>>>>>>>>>>>>>>>>>>>>>>>>>> quickly summarise what does it do? From what I've dive into the code I
>>>>>>>>>>>>>>>>>>>>>>>>>> think I saw a reduce by key operation that I guessed uses the file's path
>>>>>>>>>>>>>>>>>>>>>>>>>> as the key. Is that correct? Does that mean that the more files the more
>>>>>>>>>>>>>>>>>>>>>>>>>> the work can be parallelised?
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>>>>>>>>>> [image: Screen Shot 2018-01-29 at 20.57.15.png]
>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 8:04 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> The code for doing dynamic writes tries to limit
>>>>>>>>>>>>>>>>>>>>>>>>>>> its memory usage, so it wouldn't be my first place to look (but I wouldn't
>>>>>>>>>>>>>>>>>>>>>>>>>>> rule it out). Are you using workers with a large number of cores or threads
>>>>>>>>>>>>>>>>>>>>>>>>>>> per worker?
>>>>>>>>>>>>>>>>>>>>>>>>>>> In MAT or YourKit, the most useful tool is the
>>>>>>>>>>>>>>>>>>>>>>>>>>> Dominator Tree. Can you paste a screenshot of the dominator tree expanded
>>>>>>>>>>>>>>>>>>>>>>>>>>> to some reasonable depth?
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018, 10:45 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Eugene!
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your comments. Yes, we've downloaded
>>>>>>>>>>>>>>>>>>>>>>>>>>>> a couple of dumps, but TBH, couldn't understand anything (using the Eclipse
>>>>>>>>>>>>>>>>>>>>>>>>>>>> MAT), I was wondering if the trace and the fact that "the smaller the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> buffer, the more OOM errors" could give any of you a hint as I think it may
>>>>>>>>>>>>>>>>>>>>>>>>>>>> be on the writing part...
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Do you know how the dynamic writes are
>>>>>>>>>>>>>>>>>>>>>>>>>>>> distributed on workers? Based on the full path?
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 7:38 PM Eugene
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kirpichov <> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, I assume you're using the Dataflow runner.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Have you tried using the OOM debugging flags at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  ?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 2:05 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone!!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On our pipeline we're experiencing OOM
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> errors. It's been a while since we've been trying to nail them down but
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> without any luck.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Our pipeline:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Reads messages from PubSub of different
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> types (about 50 different types).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. Outputs KV elements being the key the type
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and the value the element message itself (JSON)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Applies windowing (fixed windows of one
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> our. With early and late firings after one minute after processing the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> first element in pane). Two days allowed lateness and discarding fired
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> panes.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Buffers the elements (using stateful and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timely processing). We buffer the elements for 15 minutes or until it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reaches a maximum size of 16Mb. This step's objective is to avoid window's
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> panes grow too big.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 5. Writes the outputted elements to files in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> GCS using dynamic routing. Being the route:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> type/window/buffer_index-paneInfo.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Our fist step was without buffering and just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windows with early and late firings on 15 minutes, but. guessing that OOMs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> were because of panes growing too big we built that buffering step to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger on size as well.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The full trace we're seeing can be found
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> There's an annoying thing we've experienced
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and is that, the smaller the buffer size, the more OOM errors we see which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was a bit disappointing...
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Can anyone please give us any hint?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks in advance!

View raw message