beam-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raghu Angadi <rang...@google.com>
Subject Re: Chasing OOM errors
Date Thu, 08 Feb 2018 00:17:27 GMT
Thanks for the help with the issue Eugene.

On Wed, Feb 7, 2018 at 2:41 PM, Carlos Alonso <carlos@mrcalonso.com> wrote:

> That's great!! The job is running smoothly without a single OOM so far...
>
> I'm moving now to increasing the load (rate of incoming messages) on the
> job and got into a situation where the job got completely stuck. It started
> with one worker and tried to scale to 8 (maxNumWorkers), but could only get
> to 7 due to a quota issue. At that point the job stopped moving, as if it
> couldn't distribute work across workers. Eventually had an OOM whose trace
> you can see here: https://pastebin.com/iVVXaiL2 and the job ID
> is: 2018-02-07_08_59_35-14489372822523694308. I don't think it is
> relevant though, but just in case.
>
> The important bit I'd like to understand is why the job gets stuck.
>
> After cancelling that job and restarting removing all 'Metrics.counter'
> (because I was submitting too much of those uselessly), numWorkers=4 and
> maxNumWorkers=7 (to avoid the quota issue), the job managed to run
> successfully (it is actually still running with this
> id: 2018-02-07_09_49_57-11129391338943683710). It first scaled to 7
> workers and after clearing the piled unacknowledged messages the previous
> job left, it scaled down again to 2. There are a couple OOMs though, but I
> don't think they are that important as they seem very scarce.
>
> Could you please help me understand why the scaling did not worked in that
> first scenario? I wonder if it was due to the Metrics, the jump from 1 to 7
> workers or the fact that it tried to get to 8 but just could get 7 or a
> combination of those or a different issue...
>

In your case, the first job never went about 1 worker. Looks like it
requested 8 workers, but that request never succeeded. So it continued to
run with single worker. CPU was fairly busy. May be it was OOMing and
retrying processing.

Instead if 7 worrkers were up but 8th worker had an issue and could not
start up, then the pipeline will stall. When it tries to upscale from say 4
workers to 8 workers, Dataflow distributes some of the persistent disks
from old workers to new workers (#PDs == maxNumWorkers). If one of the
workers can not start, 1/8th of the state for the job is not accessible.
The pipeline does not make progress as a result. Workers store the state
required for streaming pipeline execution on the PDs.

You could reduce '--numberOfWorkerHarnessThreads' even further if that is
holding up buffers (say to something like 20).

Raghu.



>
> Thanks again for all your help!
>
> On Wed, Feb 7, 2018 at 5:46 PM Eugene Kirpichov <kirpichov@google.com>
> wrote:
>
>> Yes, the reason is the same as for 2.2.0, so you can stay on 2.2.0 and do
>> nothing. This actually would be my recommendation.
>> Errors never cause data loss in Beam (unless something is implemented
>> incorrectly, of course), but if your job was stuck repeatedly throwing OOMs
>> then it would be able to write the data only once it stopped doing that.
>> However it turned out that it threw only a couple OOMs and went on its way.
>>
>> I suggest to save those workarounds for a rainy day when you have
>> something that gets stuck in throwing OOMs.
>> - Yes, reducing maximum number of threads will yield fewer parallelism
>> and you'll be likely to need more workers.
>> - Decreasing GCS upload buffer will reduce the performance of writing
>> large files (that are much larger than the buffer).
>>
>> Grouping by destinations uses a GroupByKey by a hash of encoded bytes of
>> the destination (using the destination coder), and also uses a local hash
>> map on the worker in case the hashes collide. So the coder has to be
>> deterministic (the transform enforces this) and consistent with
>> equals()/hashCode() (I don't think the transform enforces this). In other
>> words, it groups by "structural value". That's effectively the same
>> requirements as for any other operation that groups by a key.
>>
>> On Wed, Feb 7, 2018 at 6:49 AM Carlos Alonso <carlos@mrcalonso.com>
>> wrote:
>>
>>> And also...
>>>
>>> Was the OOM error caused for the same reason on the old 2.2.0 version of
>>> the job? If that was the case I could take any of the workarounds and stick
>>> with the "official" 2.2.0, what do you think about it?
>>>
>>> Thanks!
>>>
>>> On Wed, Feb 7, 2018 at 11:26 AM Carlos Alonso <carlos@mrcalonso.com>
>>> wrote:
>>>
>>>> Thanks Eugene!
>>>>
>>>> I'll try those options and will let you know but I'd also like to know
>>>> about the implications of them:
>>>> - What could happen if I reduce the number of threads? It will lose
>>>> "parallelisation capacity" and thus more likely to need more workers, right?
>>>> - What could happen if I reduce the GCS upload buffer size bytes? It
>>>> will need to send more "packets" to complete an upload, right?
>>>>
>>>> Aside from those I have a couple of outstanding questions:
>>>> - From the .by() comments it says that specifies how to partition
>>>> elements into groups (destinations). How does it exactly works? by using
>>>> the hashCode of the object returned?
>>>> - Were the previous versions of the job (using 2.2.0) likely to be
>>>> losing data due to the OOM errors?
>>>>
>>>> Thanks again for all your help. Really appreciate it!
>>>>
>>>> On Wed, Feb 7, 2018 at 1:37 AM Eugene Kirpichov <kirpichov@google.com>
>>>> wrote:
>>>>
>>>>> Ugh. The logging in 2.4.0-SNAPSHOT helped me identify the issue, and
>>>>> it is more mundane than expected.
>>>>>
>>>>> TL;DR Your job seems to be running fine, and it's not losing any data.
>>>>> You can simply do nothing. There was only a handful of OOM errors, they
>>>>> were transient, and were later successfully retried.
>>>>>
>>>>> However, if you want to not worry about any OOMs, your easy
>>>>> workarounds are any of the following:
>>>>> - Specify --numberOfWorkerHarnessThreads=(something not larger than
>>>>> 100 or so - by default in streaming worker it is limited to a few hundred)
>>>>> - Specify --gcsUploadBufferSizeBytes=1048576 or something like that
>>>>> (as opposed to the default value of 8x that)
>>>>> - Use a worker with more memory (I think any machine type larger than
>>>>> the one you're using will do)
>>>>>
>>>>> The issue is that the Dataflow streaming worker by default (currently
>>>>> - do NOT rely on these values to stay fixed) uses up to 300 threads, and
>>>>> about 5.7GB of memory (out of the 15GB of n1-standard-4 - the rest is
>>>>> reserved for various purposes); your code (e.g. storing the stateful DoFn
>>>>> state) uses some part of that. When the worker receives data for many
>>>>> destinations at the same time, it processes them all at the same time with
>>>>> all 300 threads, each of which uses 1 8Mb upload buffer for GCS = 2.4GB
>>>>> which is not that much but pushes the worker over the limit.
>>>>>
>>>>> The "proper" fixes for that on our side could be e.g. throttling
>>>>> number of streaming worker threads based on memory pressure (which is
>>>>> surprisingly difficult to do given Java's GC) - but I'm not sure it's worth
>>>>> it right now given that your code is effectively running successfully.
>>>>>
>>>>> On Tue, Feb 6, 2018 at 2:14 PM Carlos Alonso <carlos@mrcalonso.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Eugene.
>>>>>>
>>>>>> I've updated to 2.4.0-SNAPSHOT because I think 2.3.0-SNAPSHOT's
>>>>>> FileNaming is unusable as this commit: https://github.com/
>>>>>> apache/beam/commit/ece8709526cfb484aaf90c1508b0c1ea99186235 is
>>>>>> missing.
>>>>>>
>>>>>> This is the new code I've developed to use FileIO.writeDynamic API:
>>>>>> https://gist.github.com/calonso/9a708df6f2cbf48dd1a1fb67f2f571a3
>>>>>>
>>>>>> Unfortunately OOM errors are still around :( Here you can see a full
>>>>>> stack trace: https://pastebin.com/YndmFjCb. Although, it is weird I
>>>>>> cannot find the errors on StackDriver logs (I got that stack trace from the
>>>>>> Dataflow UI. Job ID: 2018-02-06_10_11_02-10621326282488809928)
>>>>>>
>>>>>> Please, find the Dominator tree of the heap dump below:
>>>>>>
>>>>>> There's one thing I'd like to know. From the .by() comments it says
>>>>>> that specifies how to partition elements into groups (destinations). How
>>>>>> does it exactly works? by using the hashCode of the object returned?
>>>>>>
>>>>>> Thanks!
>>>>>>
>>>>>>
>>>>>>
>>>>>> [image: Screen Shot 2018-02-06 at 23.08.42.png]
>>>>>>
>>>>>>
>>>>>> On Mon, Feb 5, 2018 at 10:36 PM Eugene Kirpichov <
>>>>>> kirpichov@google.com> wrote:
>>>>>>
>>>>>>> Strongly recommend to move to FileIO.writeDynamic; it's a better API
>>>>>>> in many ways (though underlying implementation is currently the same).
>>>>>>> I think you can use either 2.3.0-SNAPSHOT or 2.4.0-SNAPSHOT - right
>>>>>>> now (in the middle of 2.3.0 release being created) they should be almost
>>>>>>> the same anyway (the former is the last snapshot built from master before
>>>>>>> 2.3.0 tag was created, the latter is the last snapshot built after that tag
>>>>>>> was created).
>>>>>>>
>>>>>>> On Mon, Feb 5, 2018 at 1:27 PM Carlos Alonso <carlos@mrcalonso.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Does it make sense to try my current code `TextIO.writeCustomType`
>>>>>>>> with 2.3 or 2.4 versions or shall I move to `FileIO.writeDynamic` straight
>>>>>>>> away? Which transform on which version shall I try first?
>>>>>>>>
>>>>>>>> Regards
>>>>>>>>
>>>>>>>> On Mon, 5 Feb 2018 at 19:30, Eugene Kirpichov <kirpichov@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> All Beam artifacts have to be at the same version.
>>>>>>>>> Do you have the Apache Maven snapshot repository configured? Its
>>>>>>>>> URL is http://repository.apache.org/snapshots/
>>>>>>>>> If that doesn't work out, I think you should be able to clone the
>>>>>>>>> git repo and do "mvn clean install".
>>>>>>>>>
>>>>>>>>> Your new code looks good!
>>>>>>>>>
>>>>>>>>> On Mon, Feb 5, 2018 at 9:59 AM Carlos Alonso <carlos@mrcalonso.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Sorry Eugene, I've tried to update my dependencies to both
>>>>>>>>>> 2.3.0-RC1 and 2.4.0-SNAPSHOT but none of them were able to fully build as
>>>>>>>>>> other dependencies such as beam-runners-direct-java and
>>>>>>>>>> beam-runners-google-cloud-dataflow-java are not available on
>>>>>>>>>> those versions. Can I point them to 2.2.0 safely? Otherwise, how can I
>>>>>>>>>> build them?
>>>>>>>>>>
>>>>>>>>>> In the meantime I've changed the DynamicDestinations
>>>>>>>>>> implementation to use FileParts and a variant for empty windows that you
>>>>>>>>>> can see here: https://gist.github.com/calonso/
>>>>>>>>>> 15eae12ecaa3a7dfd9f55c6dede09a42 and I'm running a test with it
>>>>>>>>>> to see if that fixes the OOMs issue.
>>>>>>>>>>
>>>>>>>>>> Thanks!
>>>>>>>>>>
>>>>>>>>>> On Sat, Feb 3, 2018 at 12:01 AM Eugene Kirpichov <
>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks. Yeah, please try with 2.4.0-SNAPSHOT!
>>>>>>>>>>> In your code, I'd recommend changing the DynamicDestinations to
>>>>>>>>>>> not use DefaultFilenamePolicy.Params as the destination type. Generally,
>>>>>>>>>>> the destination type should contain just enough information to be able to
>>>>>>>>>>> *construct* a filename policy - in your case, using FileParts as your
>>>>>>>>>>> destination type would be a much better choice.
>>>>>>>>>>>
>>>>>>>>>>> If you'll be trying 2.4.0-SNAPSHOT, I recommend also switching
>>>>>>>>>>> to FileIO.writeDynamic() while you're at it.
>>>>>>>>>>> It would be something like:
>>>>>>>>>>>
>>>>>>>>>>> FileIO.writeDynamic()
>>>>>>>>>>> .by(kv -> FileParts.partsFor(kv.getValue()))
>>>>>>>>>>> .via(kv -> kv.getValue().contents(), TextIO.sink())
>>>>>>>>>>> .to(baseDir)
>>>>>>>>>>> .withNaming(parts -> (window, pane, n, i, compression) ->
>>>>>>>>>>> ...construct filename...)
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Feb 2, 2018 at 8:32 AM Carlos Alonso <
>>>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> That's great!! I'll do everything I can to get to the bottom of
>>>>>>>>>>>> this as well.
>>>>>>>>>>>>
>>>>>>>>>>>> Here: https://gist.github.com/calonso/
>>>>>>>>>>>> 674990e9c57ac364b6eea3f2e6ca799d you have the whole transform
>>>>>>>>>>>> that is responsible for writing the windowed messages into GCS buckets.
>>>>>>>>>>>>
>>>>>>>>>>>> I can definitely run any Beam version required. You let me know.
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Feb 2, 2018 at 5:25 PM Eugene Kirpichov <
>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> The fact that it persisted after switching to
>>>>>>>>>>>>> DynamicDestinations is very concerning and I very much would like to get to
>>>>>>>>>>>>> the bottom of it.
>>>>>>>>>>>>>
>>>>>>>>>>>>> - Can you share your new code? I'm interested only in the code
>>>>>>>>>>>>> of your DynamicDestinations subclass.
>>>>>>>>>>>>> - Is it an option for you to use Beam 2.4.0-SNAPSHOT, which
>>>>>>>>>>>>> contains more logging that will help debug this?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Feb 2, 2018 at 8:03 AM Carlos Alonso <
>>>>>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Great!! That cast worked and I managed to get a pipeline
>>>>>>>>>>>>>> working with that overload of the TypedWrite#to() method. Thanks Eugene!!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The bad news is that I'm still getting OOMs with very similar
>>>>>>>>>>>>>> Stack traces and dominator trees... After a few hours running I got a
>>>>>>>>>>>>>> couple more OOMs with this stacks traces:
>>>>>>>>>>>>>> https://pastebin.com/4MH2xFjH and https://pastebin.
>>>>>>>>>>>>>> com/VfTRNVnk and the dominator trees you can see below...
>>>>>>>>>>>>>> This is the job id if it helps: 2018-02-02_03_07_20-
>>>>>>>>>>>>>> 1859206457474601634
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks!!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Feb 2, 2018 at 12:32 AM Eugene Kirpichov <
>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Sorry, my bad - previous message was wrong. You *do* need to
>>>>>>>>>>>>>>> use TextIO.writeCustomType(). It does *not* force the destination type to
>>>>>>>>>>>>>>> be void - it is indeed void on the return value of writeCustomType(), but
>>>>>>>>>>>>>>> .to(DynamicDestinations) changes the destination type (if you include
>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/4319/files that is -
>>>>>>>>>>>>>>> otherwise you need a raw cast: e.g. TextIO.TypedWrite<MyType,
>>>>>>>>>>>>>>> MyDestinationType> write = (TextIO.TypedWrite) TextIO.writeCustomType().to(dynamicDestinations)
>>>>>>>>>>>>>>> or something like that)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The OOM errors were, I think, caused by having too many GCS
>>>>>>>>>>>>>>> upload buffers active at the same time, because too many GCS writers were
>>>>>>>>>>>>>>> open at the same time here https://github.com/apache/
>>>>>>>>>>>>>>> beam/blob/v2.2.0/sdks/java/core/src/main/java/org/apache/
>>>>>>>>>>>>>>> beam/sdk/io/WriteFiles.java#L522 - the map of writers is
>>>>>>>>>>>>>>> supposed to typically have only 1 element, modulo hash collisions, but for
>>>>>>>>>>>>>>> some reason that I haven't figure out it probably had a lot more. If you
>>>>>>>>>>>>>>> update to a newer version of Beam, it'll also print more logging to debug
>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I'm surprised that this doesn't appear in your OOM dumps, I
>>>>>>>>>>>>>>> suppose that's because they all became unreachable and got GCd after the
>>>>>>>>>>>>>>> exception was thrown. It seems that the Dataflow worker dumps only live
>>>>>>>>>>>>>>> objects in the heap - this investigation indicates we should change that on
>>>>>>>>>>>>>>> Dataflow side.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Feb 1, 2018 at 10:41 AM Carlos Alonso <
>>>>>>>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Eugene!!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thank you very much for your help!! I'm really willing to
>>>>>>>>>>>>>>>> rewrite that bit of code but, TBH, I don't know how to do that. Using
>>>>>>>>>>>>>>>> TextIO.write() forces my DynamicDestinations to be of type <String, ?,
>>>>>>>>>>>>>>>> String>, which means, if I didn't misunderstood anything, that the input
>>>>>>>>>>>>>>>> type has to be a String already... Could you please provide a quick example
>>>>>>>>>>>>>>>> on how could I write it to have a custom type as an input?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The only way I can think of around this is to encode the
>>>>>>>>>>>>>>>> route and the contents within the same string and then split them in the
>>>>>>>>>>>>>>>> DynamicDestinations methods... but I think that's not what you were
>>>>>>>>>>>>>>>> suggesting...
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Aside from that, just out of curiosity, could I know what
>>>>>>>>>>>>>>>> was causing the OOM errors that you saw?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks again for your help, really appreciate it!!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Feb 1, 2018 at 4:17 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi! The full stack trace confirms what I saw in
>>>>>>>>>>>>>>>>> Dataflow-side logs, thanks.
>>>>>>>>>>>>>>>>> To use to(DynamicDestinations) you need to use
>>>>>>>>>>>>>>>>> TextIO.write().to(DynamicDestinations), not
>>>>>>>>>>>>>>>>> writeCustomType(). Note PR https://github.com/apache/
>>>>>>>>>>>>>>>>> beam/pull/4319 that fixes a typo in TextIO.write().to() -
>>>>>>>>>>>>>>>>> you can circumvent that issue with a raw type cast if you really want to
>>>>>>>>>>>>>>>>> stay on officially released Beam 2.2.0.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Jan 31, 2018 at 7:01 AM Carlos Alonso <
>>>>>>>>>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi again Eugene.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Many thanks for your comments!
>>>>>>>>>>>>>>>>>> I've pasted the full stack trace in this paste:
>>>>>>>>>>>>>>>>>> https://pastebin.com/HYJvPppY
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> About using the suggested overload of .to() I've been
>>>>>>>>>>>>>>>>>> trying to do it unsuccessfully as I don't think I understand the way it
>>>>>>>>>>>>>>>>>> should be used.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Basically TextIO.writeCustomType() forces the
>>>>>>>>>>>>>>>>>> DynamicDestinations implementation to be of <UserT, Void, OutputT> which,
>>>>>>>>>>>>>>>>>> in my case would be <KV[String, WindowedDoc], Void, String>, but I don't
>>>>>>>>>>>>>>>>>> know how to provide a destination(path) for each KV[String, WindowedDoc]
>>>>>>>>>>>>>>>>>> through Void, as the override method getDestination will get one of my
>>>>>>>>>>>>>>>>>> KV[String, WindowedDoc] and output Void and getFilenamePolicy receives Void
>>>>>>>>>>>>>>>>>> and outputs a FilenamePolicy.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> My idea is that each windowedDoc has information on the
>>>>>>>>>>>>>>>>>> window it is contained and then, all of the elements within the same window
>>>>>>>>>>>>>>>>>> are saved on the same file, named with the start datetime of the window.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Can you please provide an example of how to do it with
>>>>>>>>>>>>>>>>>> TextIO.writeCustomType and a DynamicDestination implementation? I've been
>>>>>>>>>>>>>>>>>> looking through the examples and only the overload of .to() that I'm
>>>>>>>>>>>>>>>>>> originally using is shown: https://github.com/
>>>>>>>>>>>>>>>>>> apache/beam/blob/29859eb54d05b96a9db477e7bb0453
>>>>>>>>>>>>>>>>>> 7510273bd2/sdks/java/core/src/
>>>>>>>>>>>>>>>>>> test/java/org/apache/beam/sdk/
>>>>>>>>>>>>>>>>>> io/TextIOWriteTest.java#L280
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks again for your help!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Jan 31, 2018 at 12:45 AM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Actually hold on, I'm not sure that what I said is
>>>>>>>>>>>>>>>>>>> correct. This overload of .to() is not my favorite :-| Can you try using
>>>>>>>>>>>>>>>>>>> the more explicit one, with DynamicDestinations - or still better (if you
>>>>>>>>>>>>>>>>>>> can use Beam 2.3), use FileIO.writeDynamic()?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018 at 3:32 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hmm the out of memory errors are actually happening in
>>>>>>>>>>>>>>>>>>>> WriteFiles, not in your code.
>>>>>>>>>>>>>>>>>>>> When you said that https://pastebin.com/0vfE6pUg is
>>>>>>>>>>>>>>>>>>>> the full trace, did you mean that this is all you see in the UI? It seems
>>>>>>>>>>>>>>>>>>>> that this is just the top-level exception but it is omitting the nested
>>>>>>>>>>>>>>>>>>>> chain of causes ("Caused by: ..."), and the root cause is the OOM in
>>>>>>>>>>>>>>>>>>>> WriteFiles/WriteShardedBundles.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I have a hypothesis as to why they might be happening.
>>>>>>>>>>>>>>>>>>>> You're using a type called WindowedDoc as your destination type - does it
>>>>>>>>>>>>>>>>>>>> have hashCode() and equals() properly defined on it? If no, that could lead
>>>>>>>>>>>>>>>>>>>> to this issue (and also to simply incorrect behavior), because it's used as
>>>>>>>>>>>>>>>>>>>> a key in hashmaps inside that transform. And what is the coder used for
>>>>>>>>>>>>>>>>>>>> that type?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018 at 12:28 PM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Sure!: https://gist.github.com/calonso/
>>>>>>>>>>>>>>>>>>>>> 14b06f4f58faf286cc79181cb46a9b85
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> It would be weird though, as the bigger the buffer,
>>>>>>>>>>>>>>>>>>>>> the less OOM errors I see, but that could totally be something I'm
>>>>>>>>>>>>>>>>>>>>> misunderstanding.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks again for your help!!
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018 at 7:32 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> The dumps look fairly consistent. I suspect that the
>>>>>>>>>>>>>>>>>>>>>> memory is being hogged by data you're buffering in BufferMessagesDoFn, can
>>>>>>>>>>>>>>>>>>>>>> you show its code?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Tue, Jan 30, 2018, 10:12 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Not all dumps are 1GB, here you can see a couple
>>>>>>>>>>>>>>>>>>>>>>> more of them with bigger heaps
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> A couple of job ids: 2018-01-30_02_57_28-751284895952373783
>>>>>>>>>>>>>>>>>>>>>>> and 2018-01-29_03_09_28-5756483832988685011
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> About relevant parts of my code, here:
>>>>>>>>>>>>>>>>>>>>>>> https://gist.github.com/calonso/
>>>>>>>>>>>>>>>>>>>>>>> eec2e1f2b8dd034af429732939eed6ec  you can see the
>>>>>>>>>>>>>>>>>>>>>>> most relevant bits with comments, I hope that is easy to understand, let me
>>>>>>>>>>>>>>>>>>>>>>> know otherwise.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks for your help!!
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> [image: Screen Shot 2018-01-30 at 18.27.11.png][image:
>>>>>>>>>>>>>>>>>>>>>>> Screen Shot 2018-01-30 at 18.26.07.png]
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 10:53 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Regarding how dynamic writes work: it's
>>>>>>>>>>>>>>>>>>>>>>>> considerably more complex than just using destination as the key; it
>>>>>>>>>>>>>>>>>>>>>>>> depends also on how you configure your sharding, how many destinations
>>>>>>>>>>>>>>>>>>>>>>>> there are etc. - see https://github.com/apache/
>>>>>>>>>>>>>>>>>>>>>>>> beam/blob/master/sdks/java/
>>>>>>>>>>>>>>>>>>>>>>>> core/src/main/java/org/apache/
>>>>>>>>>>>>>>>>>>>>>>>> beam/sdk/io/WriteFiles.java (it is probably the
>>>>>>>>>>>>>>>>>>>>>>>> second most complex transform in all of Beam, second only to
>>>>>>>>>>>>>>>>>>>>>>>> BigQueryIO.write()...).
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 1:50 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Hmm it's weird, this heap dump seems to be of size
>>>>>>>>>>>>>>>>>>>>>>>>> just 1GB which is way below the limit your workers should have. Are the
>>>>>>>>>>>>>>>>>>>>>>>>> dumps all small like that?
>>>>>>>>>>>>>>>>>>>>>>>>> Can you share a Dataflow job ID and some relevant
>>>>>>>>>>>>>>>>>>>>>>>>> part of your code?
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 12:02 PM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> We're using n1-standard-4 (4 vCPUs, 15 GB memory)
>>>>>>>>>>>>>>>>>>>>>>>>>> machines (the default ones). And please, find the dominator tree view of
>>>>>>>>>>>>>>>>>>>>>>>>>> one of our heap dumps.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> About the code for dynamic writes... Could you
>>>>>>>>>>>>>>>>>>>>>>>>>> quickly summarise what does it do? From what I've dive into the code I
>>>>>>>>>>>>>>>>>>>>>>>>>> think I saw a reduce by key operation that I guessed uses the file's path
>>>>>>>>>>>>>>>>>>>>>>>>>> as the key. Is that correct? Does that mean that the more files the more
>>>>>>>>>>>>>>>>>>>>>>>>>> the work can be parallelised?
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> [image: Screen Shot 2018-01-29 at 20.57.15.png]
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 8:04 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>>>>>> kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> The code for doing dynamic writes tries to limit
>>>>>>>>>>>>>>>>>>>>>>>>>>> its memory usage, so it wouldn't be my first place to look (but I wouldn't
>>>>>>>>>>>>>>>>>>>>>>>>>>> rule it out). Are you using workers with a large number of cores or threads
>>>>>>>>>>>>>>>>>>>>>>>>>>> per worker?
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> In MAT or YourKit, the most useful tool is the
>>>>>>>>>>>>>>>>>>>>>>>>>>> Dominator Tree. Can you paste a screenshot of the dominator tree expanded
>>>>>>>>>>>>>>>>>>>>>>>>>>> to some reasonable depth?
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018, 10:45 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Eugene!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for your comments. Yes, we've downloaded
>>>>>>>>>>>>>>>>>>>>>>>>>>>> a couple of dumps, but TBH, couldn't understand anything (using the Eclipse
>>>>>>>>>>>>>>>>>>>>>>>>>>>> MAT), I was wondering if the trace and the fact that "the smaller the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> buffer, the more OOM errors" could give any of you a hint as I think it may
>>>>>>>>>>>>>>>>>>>>>>>>>>>> be on the writing part...
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Do you know how the dynamic writes are
>>>>>>>>>>>>>>>>>>>>>>>>>>>> distributed on workers? Based on the full path?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regards
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 7:38 PM Eugene
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kirpichov <kirpichov@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, I assume you're using the Dataflow runner.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Have you tried using the OOM debugging flags at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> beam/blob/master/runners/
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> google-cloud-dataflow-java/
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> src/main/java/org/apache/beam/
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> runners/dataflow/options/
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DataflowPipelineDebugOptions.java#L169-L193 ?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Jan 29, 2018 at 2:05 AM Carlos Alonso <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> carlos@mrcalonso.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi everyone!!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On our pipeline we're experiencing OOM
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> errors. It's been a while since we've been trying to nail them down but
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> without any luck.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Our pipeline:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1. Reads messages from PubSub of different
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> types (about 50 different types).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2. Outputs KV elements being the key the type
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and the value the element message itself (JSON)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3. Applies windowing (fixed windows of one
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> our. With early and late firings after one minute after processing the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> first element in pane). Two days allowed lateness and discarding fired
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> panes.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4. Buffers the elements (using stateful and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> timely processing). We buffer the elements for 15 minutes or until it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reaches a maximum size of 16Mb. This step's objective is to avoid window's
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> panes grow too big.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 5. Writes the outputted elements to files in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> GCS using dynamic routing. Being the route: type/window/buffer_index-
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> paneInfo.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Our fist step was without buffering and just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windows with early and late firings on 15 minutes, but. guessing that OOMs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> were because of panes growing too big we built that buffering step to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger on size as well.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The full trace we're seeing can be found
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here: https://pastebin.com/0vfE6pUg
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> There's an annoying thing we've experienced
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and is that, the smaller the buffer size, the more OOM errors we see which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was a bit disappointing...
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Can anyone please give us any hint?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks in advance!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

Mime
View raw message