beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Carlos Alonso <>
Subject Re: Chasing OOM errors
Date Wed, 07 Feb 2018 22:41:19 GMT
That's great!! The job is running smoothly without a single OOM so far...

I'm moving now to increasing the load (rate of incoming messages) on the
job and got into a situation where the job got completely stuck. It started
with one worker and tried to scale to 8 (maxNumWorkers), but could only get
to 7 due to a quota issue. At that point the job stopped moving, as if it
couldn't distribute work across workers. Eventually had an OOM whose trace
you can see here: and the job ID
is: 2018-02-07_08_59_35-14489372822523694308. I don't think it is relevant
though, but just in case.

The important bit I'd like to understand is why the job gets stuck.

After cancelling that job and restarting removing all 'Metrics.counter'
(because I was submitting too much of those uselessly), numWorkers=4 and
maxNumWorkers=7 (to avoid the quota issue), the job managed to run
successfully (it is actually still running with this
id: 2018-02-07_09_49_57-11129391338943683710). It first scaled to 7 workers
and after clearing the piled unacknowledged messages the previous job left,
it scaled down again to 2. There are a couple OOMs though, but I don't
think they are that important as they seem very scarce.

Could you please help me understand why the scaling did not worked in that
first scenario? I wonder if it was due to the Metrics, the jump from 1 to 7
workers or the fact that it tried to get to 8 but just could get 7 or a
combination of those or a different issue...

Thanks again for all your help!

On Wed, Feb 7, 2018 at 5:46 PM Eugene Kirpichov <>

> Yes, the reason is the same as for 2.2.0, so you can stay on 2.2.0 and do
> nothing. This actually would be my recommendation.
> Errors never cause data loss in Beam (unless something is implemented
> incorrectly, of course), but if your job was stuck repeatedly throwing OOMs
> then it would be able to write the data only once it stopped doing that.
> However it turned out that it threw only a couple OOMs and went on its way.
> I suggest to save those workarounds for a rainy day when you have
> something that gets stuck in throwing OOMs.
> - Yes, reducing maximum number of threads will yield fewer parallelism and
> you'll be likely to need more workers.
> - Decreasing GCS upload buffer will reduce the performance of writing
> large files (that are much larger than the buffer).
> Grouping by destinations uses a GroupByKey by a hash of encoded bytes of
> the destination (using the destination coder), and also uses a local hash
> map on the worker in case the hashes collide. So the coder has to be
> deterministic (the transform enforces this) and consistent with
> equals()/hashCode() (I don't think the transform enforces this). In other
> words, it groups by "structural value". That's effectively the same
> requirements as for any other operation that groups by a key.
> On Wed, Feb 7, 2018 at 6:49 AM Carlos Alonso <> wrote:
>> And also...
>> Was the OOM error caused for the same reason on the old 2.2.0 version of
>> the job? If that was the case I could take any of the workarounds and stick
>> with the "official" 2.2.0, what do you think about it?
>> Thanks!
>> On Wed, Feb 7, 2018 at 11:26 AM Carlos Alonso <>
>> wrote:
>>> Thanks Eugene!
>>> I'll try those options and will let you know but I'd also like to know
>>> about the implications of them:
>>> - What could happen if I reduce the number of threads? It will lose
>>> "parallelisation capacity" and thus more likely to need more workers, right?
>>> - What could happen if I reduce the GCS upload buffer size bytes? It
>>> will need to send more "packets" to complete an upload, right?
>>> Aside from those I have a couple of outstanding questions:
>>> - From the .by() comments it says that specifies how to partition
>>> elements into groups (destinations). How does it exactly works? by using
>>> the hashCode of the object returned?
>>> - Were the previous versions of the job (using 2.2.0) likely to be
>>> losing data due to the OOM errors?
>>> Thanks again for all your help. Really appreciate it!
>>> On Wed, Feb 7, 2018 at 1:37 AM Eugene Kirpichov <>
>>> wrote:
>>>> Ugh. The logging in 2.4.0-SNAPSHOT helped me identify the issue, and it
>>>> is more mundane than expected.
>>>> TL;DR Your job seems to be running fine, and it's not losing any data.
>>>> You can simply do nothing. There was only a handful of OOM errors, they
>>>> were transient, and were later successfully retried.
>>>> However, if you want to not worry about any OOMs, your easy workarounds
>>>> are any of the following:
>>>> - Specify --numberOfWorkerHarnessThreads=(something not larger than 100
>>>> or so - by default in streaming worker it is limited to a few hundred)
>>>> - Specify --gcsUploadBufferSizeBytes=1048576 or something like that (as
>>>> opposed to the default value of 8x that)
>>>> - Use a worker with more memory (I think any machine type larger than
>>>> the one you're using will do)
>>>> The issue is that the Dataflow streaming worker by default (currently -
>>>> do NOT rely on these values to stay fixed) uses up to 300 threads, and
>>>> about 5.7GB of memory (out of the 15GB of n1-standard-4 - the rest is
>>>> reserved for various purposes); your code (e.g. storing the stateful DoFn
>>>> state) uses some part of that. When the worker receives data for many
>>>> destinations at the same time, it processes them all at the same time with
>>>> all 300 threads, each of which uses 1 8Mb upload buffer for GCS = 2.4GB
>>>> which is not that much but pushes the worker over the limit.
>>>> The "proper" fixes for that on our side could be e.g. throttling number
>>>> of streaming worker threads based on memory pressure (which is surprisingly
>>>> difficult to do given Java's GC) - but I'm not sure it's worth it right now
>>>> given that your code is effectively running successfully.
>>>> On Tue, Feb 6, 2018 at 2:14 PM Carlos Alonso <>
>>>> wrote:
>>>>> Hi Eugene.
>>>>> I've updated to 2.4.0-SNAPSHOT because I think 2.3.0-SNAPSHOT's
>>>>> FileNaming is unusable as this commit:
>>>>> is
>>>>> missing.
>>>>> This is the new code I've developed to use FileIO.writeDynamic API:
>>>>> Unfortunately OOM errors are still around :( Here you can see a full
>>>>> stack trace: Although, it is weird I
>>>>> cannot find the errors on StackDriver logs (I got that stack trace from the
>>>>> Dataflow UI. Job ID: 2018-02-06_10_11_02-10621326282488809928)
>>>>> Please, find the Dominator tree of the heap dump below:
>>>>> There's one thing I'd like to know. From the .by() comments it says
>>>>> that specifies how to partition elements into groups (destinations). How
>>>>> does it exactly works? by using the hashCode of the object returned?
>>>>> Thanks!
>>>>> [image: Screen Shot 2018-02-06 at 23.08.42.png]
>>>>> On Mon, Feb 5, 2018 at 10:36 PM Eugene Kirpichov <>
>>>>> wrote:
>>>>>> Strongly recommend to move to FileIO.writeDynamic; it's a better API
>>>>>> in many ways (though underlying implementation is currently the same).
>>>>>> I think you can use either 2.3.0-SNAPSHOT or 2.4.0-SNAPSHOT - right
>>>>>> now (in the middle of 2.3.0 release being created) they should be almost
>>>>>> the same anyway (the former is the last snapshot built from master before
>>>>>> 2.3.0 tag was created, the latter is the last snapshot built after that tag
>>>>>> was created).
>>>>>> On Mon, Feb 5, 2018 at 1:27 PM Carlos Alonso <>
>>>>>> wrote:
>>>>>>> Does it make sense to try my current code `TextIO.writeCustomType`
>>>>>>> with 2.3 or 2.4 versions or shall I move to `FileIO.writeDynamic` straight
>>>>>>> away? Which transform on which version shall I try first?
>>>>>>> Regards
>>>>>>> On Mon, 5 Feb 2018 at 19:30, Eugene Kirpichov <>
>>>>>>> wrote:
>>>>>>>> All Beam artifacts have to be at the same version.
>>>>>>>> Do you have the Apache Maven snapshot repository configured? Its
>>>>>>>> URL is
>>>>>>>> If that doesn't work out, I think you should be able to clone the
>>>>>>>> git repo and do "mvn clean install".
>>>>>>>> Your new code looks good!
>>>>>>>> On Mon, Feb 5, 2018 at 9:59 AM Carlos Alonso <>
>>>>>>>> wrote:
>>>>>>>>> Sorry Eugene, I've tried to update my dependencies to both
>>>>>>>>> 2.3.0-RC1 and 2.4.0-SNAPSHOT but none of them were able to fully build as
>>>>>>>>> other dependencies such as beam-runners-direct-java and
>>>>>>>>> beam-runners-google-cloud-dataflow-java are not available on those
>>>>>>>>> versions. Can I point them to 2.2.0 safely? Otherwise, how can I build them?
>>>>>>>>> In the meantime I've changed the DynamicDestinations
>>>>>>>>> implementation to use FileParts and a variant for empty windows that you
>>>>>>>>> can see here:
>>>>>>>>> and
>>>>>>>>> I'm running a test with it to see if that fixes the OOMs issue.
>>>>>>>>> Thanks!
>>>>>>>>> On Sat, Feb 3, 2018 at 12:01 AM Eugene Kirpichov <
>>>>>>>>>> wrote:
>>>>>>>>>> Thanks. Yeah, please try with 2.4.0-SNAPSHOT!
>>>>>>>>>> In your code, I'd recommend changing the DynamicDestinations to
>>>>>>>>>> not use DefaultFilenamePolicy.Params as the destination type. Generally,
>>>>>>>>>> the destination type should contain just enough information to be able to
>>>>>>>>>> *construct* a filename policy - in your case, using FileParts as your
>>>>>>>>>> destination type would be a much better choice.
>>>>>>>>>> If you'll be trying 2.4.0-SNAPSHOT, I recommend also switching to
>>>>>>>>>> FileIO.writeDynamic() while you're at it.
>>>>>>>>>> It would be something like:
>>>>>>>>>> FileIO.writeDynamic()
>>>>>>>>>> .by(kv -> FileParts.partsFor(kv.getValue()))
>>>>>>>>>> .via(kv -> kv.getValue().contents(), TextIO.sink())
>>>>>>>>>> .to(baseDir)
>>>>>>>>>> .withNaming(parts -> (window, pane, n, i, compression) ->
>>>>>>>>>> ...construct filename...)
>>>>>>>>>> On Fri, Feb 2, 2018 at 8:32 AM Carlos Alonso <
>>>>>>>>>>> wrote:
>>>>>>>>>>> That's great!! I'll do everything I can to get to the bottom of
>>>>>>>>>>> this as well.
>>>>>>>>>>> Here:
>>>>>>>>>>> you
>>>>>>>>>>> have the whole transform that is responsible for writing the windowed
>>>>>>>>>>> messages into GCS buckets.
>>>>>>>>>>> I can definitely run any Beam version required. You let me know.
>>>>>>>>>>> On Fri, Feb 2, 2018 at 5:25 PM Eugene Kirpichov <
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> The fact that it persisted after switching to
>>>>>>>>>>>> DynamicDestinations is very concerning and I very much would like to get to
>>>>>>>>>>>> the bottom of it.
>>>>>>>>>>>> - Can you share your new code? I'm interested only in the code
>>>>>>>>>>>> of your DynamicDestinations subclass.
>>>>>>>>>>>> - Is it an option for you to use Beam 2.4.0-SNAPSHOT, which
>>>>>>>>>>>> contains more logging that will help debug this?
>>>>>>>>>>>> On Fri, Feb 2, 2018 at 8:03 AM Carlos Alonso <
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Great!! That cast worked and I managed to get a pipeline
>>>>>>>>>>>>> working with that overload of the TypedWrite#to() method. Thanks Eugene!!
>>>>>>>>>>>>> The bad news is that I'm still getting OOMs with very similar
>>>>>>>>>>>>> Stack traces and dominator trees... After a few hours running I got a
>>>>>>>>>>>>> couple more OOMs with this stacks traces:
>>>>>>>>>>>>> and
>>>>>>>>>>>>> and the dominator trees you can
>>>>>>>>>>>>> see below... This is the job id if it
>>>>>>>>>>>>> helps: 2018-02-02_03_07_20-1859206457474601634
>>>>>>>>>>>>> Thanks!!
>>>>>>>>>>>>> On Fri, Feb 2, 2018 at 12:32 AM Eugene Kirpichov <
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Sorry, my bad - previous message was wrong. You *do* need to
>>>>>>>>>>>>>> use TextIO.writeCustomType(). It does *not* force the destination type to
>>>>>>>>>>>>>> be void - it is indeed void on the return value of writeCustomType(), but
>>>>>>>>>>>>>> .to(DynamicDestinations) changes the destination type (if you include
>>>>>>>>>>>>>> that is -
>>>>>>>>>>>>>> otherwise you need a raw cast: e.g. TextIO.TypedWrite<MyType,
>>>>>>>>>>>>>> MyDestinationType> write = (TextIO.TypedWrite)
>>>>>>>>>>>>>> TextIO.writeCustomType().to(dynamicDestinations) or something like that)
>>>>>>>>>>>>>> The OOM errors were, I think, caused by having too many GCS
>>>>>>>>>>>>>> upload buffers active at the same time, because too many GCS writers were
>>>>>>>>>>>>>> open at the same time here
>>>>>>>>>>>>>> -
>>>>>>>>>>>>>> the map of writers is supposed to typically have only 1 element, modulo
>>>>>>>>>>>>>> hash collisions, but for some reason that I haven't figure out it probably
>>>>>>>>>>>>>> had a lot more. If you update to a newer version of Beam, it'll also print
>>>>>>>>>>>>>> more logging to debug this.
>>>>>>>>>>>>>> I'm surprised that this doesn't appear in your OOM dumps, I
>>>>>>>>>>>>>> suppose that's because they all became unreachable and got GCd after the
>>>>>>>>>>>>>> exception was thrown. It seems that the Dataflow worker dumps only live
>>>>>>>>>>>>>> objects in the heap - this investigation indicates we should change that on
>>>>>>>>>>>>>> Dataflow side.
>>>>>>>>>>>>>> On Thu, Feb 1, 2018 at 10:41 AM Carlos Alonso <
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> Hi Eugene!!
>>>>>>>>>>>>>>> Thank you very much for your help!! I'm really willing to
>>>>>>>>>>>>>>> rewrite that bit of code but, TBH, I don't know how to do that. Using
>>>>>>>>>>>>>>> TextIO.write() forces my DynamicDestinations to be of type <String, ?,
>>>>>>>>>>>>>>> String>, which means, if I didn't misunderstood anything, that the input
>>>>>>>>>>>>>>> type has to be a String already... Could you please provide a quick example
>>>>>>>>>>>>>>> on how could I write it to have a custom type as an input?
>>>>>>>>>>>>>>> The only way I can think of around this is to encode the
>>>>>>>>>>>>>>> route and the contents within the same string and then split them in the
>>>>>>>>>>>>>>> DynamicDestinations methods... but I think that's not what you were
>>>>>>>>>>>>>>> suggesting...
>>>>>>>>>>>>>>> Aside from that, just out of curiosity, could I know what
>>>>>>>>>>>>>>> was causing the OOM errors that you saw?
>>>>>>>>>>>>>>> Thanks again for your help, really appreciate it!!
>>>>>>>>>>>>>>> On Thu, Feb 1, 2018 at 4:17 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> Hi! The full stack trace confirms what I saw in
>>>>>>>>>>>>>>>> Dataflow-side logs, thanks.
>>>>>>>>>>>>>>>> To use to(DynamicDestinations) you need to use
>>>>>>>>>>>>>>>> TextIO.write().to(DynamicDestinations), not writeCustomType(). Note PR
>>>>>>>>>>>>>>>> that fixes a typo
>>>>>>>>>>>>>>>> in TextIO.write().to() - you can circumvent that issue with a raw type cast
>>>>>>>>>>>>>>>> if you really want to stay on officially released Beam 2.2.0.
>>>>>>>>>>>>>>>> On Wed, Jan 31, 2018 at 7:01 AM Carlos Alonso <
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>> Hi again Eugene.
>>>>>>>>>>>>>>>>> Many thanks for your comments!
>>>>>>>>>>>>>>>>> I've pasted the full stack trace in this paste:
>>>>>>>>>>>>>>>>> About using the suggested overload of .to() I've been
>>>>>>>>>>>>>>>>> trying to do it unsuccessfully as I don't think I understand the way it
>>>>>>>>>>>>>>>>> should be used.
>>>>>>>>>>>>>>>>> Basically TextIO.writeCustomType() forces the
>>>>>>>>>>>>>>>>> DynamicDestinations implementation to be of <UserT, Void, OutputT> which,
>>>>>>>>>>>>>>>>> in my case would be <KV[String, WindowedDoc], Void, String>, but I don't
>>>>>>>>>>>>>>>>> know how to provide a destination(path) for each KV[String, WindowedDoc]
>>>>>>>>>>>>>>>>> through Void, as the override method getDestination will get one of my
>>>>>>>>>>>>>>>>> KV[String, WindowedDoc] and output Void and getFilenamePolicy receives Void
>>>>>>>>>>>>>>>>> and outputs a FilenamePolicy.
>>>>>>>>>>>>>>>>> My idea is that each windowedDoc has information on the
>>>>>>>>>>>>>>>>> window it is contained and then, all of the elements within the same window
>>>>>>>>>>>>>>>>> are saved on the same file, named with the start datetime of the window.
>>>>>>>>>>>>>>>>> Can you please provide an example of how to do it with
>>>>>>>>>>>>>>>>> TextIO.writeCustomType and a DynamicDestination implementation? I've been
>>>>>>>>>>>>>>>>> looking through the examples and only the overload of .to() that I'm
>>>>>>>>>>>>>>>>> originally using is shown:
>>>>>>>>>>>>>>>>> Thanks again for your help!
>>>>>>>>>>>>>>>>> On Wed, Jan 31, 2018 at 12:45 AM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> Actually hold on, I'm not sure that what I said is
>>>>>>>>>>>>>>>>>> correct. This overload of .to() is not my favorite :-| Can you try using
>>>>>>>>>>>>>>>>>> the more explicit one, with DynamicDestinations - or still better (if you
>>>>>>>>>>>>>>>>>> can use Beam 2.3), use FileIO.writeDynamic()?
>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018 at 3:32 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> Hmm the out of memory errors are actually happening in
>>>>>>>>>>>>>>>>>>> WriteFiles, not in your code.
>>>>>>>>>>>>>>>>>>> When you said that is the
>>>>>>>>>>>>>>>>>>> full trace, did you mean that this is all you see in the UI? It seems that
>>>>>>>>>>>>>>>>>>> this is just the top-level exception but it is omitting the nested chain of
>>>>>>>>>>>>>>>>>>> causes ("Caused by: ..."), and the root cause is the OOM in
>>>>>>>>>>>>>>>>>>> WriteFiles/WriteShardedBundles.
>>>>>>>>>>>>>>>>>>> I have a hypothesis as to why they might be happening.
>>>>>>>>>>>>>>>>>>> You're using a type called WindowedDoc as your destination type - does it
>>>>>>>>>>>>>>>>>>> have hashCode() and equals() properly defined on it? If no, that could lead
>>>>>>>>>>>>>>>>>>> to this issue (and also to simply incorrect behavior), because it's used as
>>>>>>>>>>>>>>>>>>> a key in hashmaps inside that transform. And what is the coder used for
>>>>>>>>>>>>>>>>>>> that type?
>>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018 at 12:28 PM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> Sure!:
>>>>>>>>>>>>>>>>>>>> It would be weird though, as the bigger the buffer, the
>>>>>>>>>>>>>>>>>>>> less OOM errors I see, but that could totally be something I'm
>>>>>>>>>>>>>>>>>>>> misunderstanding.
>>>>>>>>>>>>>>>>>>>> Thanks again for your help!!
>>>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018 at 7:32 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>> The dumps look fairly consistent. I suspect that the
>>>>>>>>>>>>>>>>>>>>> memory is being hogged by data you're buffering in BufferMessagesDoFn, can
>>>>>>>>>>>>>>>>>>>>> you show its code?
>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018, 10:12 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> Not all dumps are 1GB, here you can see a couple more
>>>>>>>>>>>>>>>>>>>>>> of them with bigger heaps
>>>>>>>>>>>>>>>>>>>>>> A couple of job
>>>>>>>>>>>>>>>>>>>>>> ids: 2018-01-30_02_57_28-751284895952373783
>>>>>>>>>>>>>>>>>>>>>> and 2018-01-29_03_09_28-5756483832988685011
>>>>>>>>>>>>>>>>>>>>>> About relevant parts of my code, here:
>>>>>>>>>>>>>>>>>>>>>> you can see the most relevant bits with comments, I hope that is easy to
>>>>>>>>>>>>>>>>>>>>>> understand, let me know otherwise.
>>>>>>>>>>>>>>>>>>>>>> Thanks for your help!!
>>>>>>>>>>>>>>>>>>>>>> [image: Screen Shot 2018-01-30 at 18.27.11.png][image:
>>>>>>>>>>>>>>>>>>>>>> Screen Shot 2018-01-30 at 18.26.07.png]
>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 10:53 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>> Regarding how dynamic writes work: it's considerably
>>>>>>>>>>>>>>>>>>>>>>> more complex than just using destination as the key; it depends also on how
>>>>>>>>>>>>>>>>>>>>>>> you configure your sharding, how many destinations there are etc. - see
>>>>>>>>>>>>>>>>>>>>>>> (it
>>>>>>>>>>>>>>>>>>>>>>> is probably the second most complex transform in all of Beam, second only
>>>>>>>>>>>>>>>>>>>>>>> to BigQueryIO.write()...).
>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 1:50 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>> Hmm it's weird, this heap dump seems to be of size
>>>>>>>>>>>>>>>>>>>>>>>> just 1GB which is way below the limit your workers should have. Are the
>>>>>>>>>>>>>>>>>>>>>>>> dumps all small like that?
>>>>>>>>>>>>>>>>>>>>>>>> Can you share a Dataflow job ID and some relevant
>>>>>>>>>>>>>>>>>>>>>>>> part of your code?
>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 12:02 PM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>> We're using n1-standard-4 (4 vCPUs, 15 GB memory)
>>>>>>>>>>>>>>>>>>>>>>>>> machines (the default ones). And please, find the dominator tree view of
>>>>>>>>>>>>>>>>>>>>>>>>> one of our heap dumps.
>>>>>>>>>>>>>>>>>>>>>>>>> About the code for dynamic writes... Could you
>>>>>>>>>>>>>>>>>>>>>>>>> quickly summarise what does it do? From what I've dive into the code I
>>>>>>>>>>>>>>>>>>>>>>>>> think I saw a reduce by key operation that I guessed uses the file's path
>>>>>>>>>>>>>>>>>>>>>>>>> as the key. Is that correct? Does that mean that the more files the more
>>>>>>>>>>>>>>>>>>>>>>>>> the work can be parallelised?
>>>>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>>>>>>>>> [image: Screen Shot 2018-01-29 at 20.57.15.png]
>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 8:04 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>> The code for doing dynamic writes tries to limit
>>>>>>>>>>>>>>>>>>>>>>>>>> its memory usage, so it wouldn't be my first place to look (but I wouldn't
>>>>>>>>>>>>>>>>>>>>>>>>>> rule it out). Are you using workers with a large number of cores or threads
>>>>>>>>>>>>>>>>>>>>>>>>>> per worker?
>>>>>>>>>>>>>>>>>>>>>>>>>> In MAT or YourKit, the most useful tool is the
>>>>>>>>>>>>>>>>>>>>>>>>>> Dominator Tree. Can you paste a screenshot of the dominator tree expanded
>>>>>>>>>>>>>>>>>>>>>>>>>> to some reasonable depth?
>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018, 10:45 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Eugene!
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your comments. Yes, we've downloaded
>>>>>>>>>>>>>>>>>>>>>>>>>>> a couple of dumps, but TBH, couldn't understand anything (using the Eclipse
>>>>>>>>>>>>>>>>>>>>>>>>>>> MAT), I was wondering if the trace and the fact that "the smaller the
>>>>>>>>>>>>>>>>>>>>>>>>>>> buffer, the more OOM errors" could give any of you a hint as I think it may
>>>>>>>>>>>>>>>>>>>>>>>>>>> be on the writing part...
>>>>>>>>>>>>>>>>>>>>>>>>>>> Do you know how the dynamic writes are
>>>>>>>>>>>>>>>>>>>>>>>>>>> distributed on workers? Based on the full path?
>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 7:38 PM Eugene Kirpichov
>>>>>>>>>>>>>>>>>>>>>>>>>>> <> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, I assume you're using the Dataflow runner.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Have you tried using the OOM debugging flags at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>  ?
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 2:05 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone!!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On our pipeline we're experiencing OOM errors.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It's been a while since we've been trying to nail them down but without any
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> luck.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Our pipeline:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Reads messages from PubSub of different
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> types (about 50 different types).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. Outputs KV elements being the key the type
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and the value the element message itself (JSON)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Applies windowing (fixed windows of one
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> our. With early and late firings after one minute after processing the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> first element in pane). Two days allowed lateness and discarding fired
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> panes.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Buffers the elements (using stateful and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timely processing). We buffer the elements for 15 minutes or until it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reaches a maximum size of 16Mb. This step's objective is to avoid window's
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> panes grow too big.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 5. Writes the outputted elements to files in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> GCS using dynamic routing. Being the route:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> type/window/buffer_index-paneInfo.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Our fist step was without buffering and just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windows with early and late firings on 15 minutes, but. guessing that OOMs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> were because of panes growing too big we built that buffering step to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger on size as well.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The full trace we're seeing can be found here:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> There's an annoying thing we've experienced
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and is that, the smaller the buffer size, the more OOM errors we see which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was a bit disappointing...
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Can anyone please give us any hint?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks in advance!

View raw message