flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From KOSTIANTYN Kudriavtsev <kudryavtsev.konstan...@gmail.com>
Subject Re: Debug OutOfMemory
Date Thu, 08 Oct 2015 17:33:09 GMT
Yes it's,
I'm checking number of columns per line to filter out mailformed

*Sent from my ZenFone
On Oct 8, 2015 1:19 PM, "Stephan Ewen" <sewen@apache.org> wrote:

> There is probably a different CSV input format implementation which drops
> invalid lines (too long lines).
>
> Is that actually desired behavior, simply dropping malformatted input?
>
> On Thu, Oct 8, 2015 at 7:12 PM, KOSTIANTYN Kudriavtsev <
> kudryavtsev.konstantin@gmail.com> wrote:
>
>> Hm, you was write
>>
>> I checked all files, one by one and found an issue with a line in one of
>> them... It's really unexpected for me as far as I run spark job on the same
>> dataset and "wrong" rows were filtered out without issues.
>>
>> Thanks for help!
>>
>> Thank you,
>> Konstantin Kudryavtsev
>>
>> On Thu, Oct 8, 2015 at 12:35 PM, Stephan Ewen <sewen@apache.org> wrote:
>>
>>> Ah, that makes sense!
>>>
>>> The problem is not in the core runtime, it is in the delimited input
>>> format. It probably looks for the line split character and never finds it,
>>> so that it starts buffering a super large line (gigabytes) which leads to
>>> the OOM exception.
>>>
>>> Can you check whether the line split character and the encoding are
>>> properly defined?
>>>
>>> Would actually be good to define a max line length (sane default +
>>> configurable value) that reports when lines seem to extend a maximum length
>>> (which is usually a misconfiguration of the split character)
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>> On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev <
>>> kudryavtsev.konstantin@gmail.com> wrote:
>>>
>>>> 10/08/2015 16:25:48     CHAIN DataSource (at
>>>> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)
>>>> (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at
>>>> com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap
>>>> (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>         at
>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
>>>>         at
>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
>>>>         at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> 10/08/2015 16:25:48     Job execution switched to status FAILING.
>>>> 10/08/2015 16:25:48     DataSink
>>>> (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1)
>>>> switched to CANCELED
>>>> 10/08/2015 16:25:48     Job execution switched to status FAILED.
>>>> org.apache.flink.client.program.ProgramInvocationException: The program
>>>> execution failed: Job execution failed.
>>>>         at org.apache.flink.client.program.Client.run(Client.java:413)
>>>>         at org.apache.flink.client.program.Client.run(Client.java:356)
>>>>         at org.apache.flink.client.program.Client.run(Client.java:349)
>>>>         at
>>>> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
>>>>         at
>>>> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
>>>>         at
>>>> org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
>>>>         at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524)
>>>>         at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35)
>>>>         at com.epam.AirSetJobExample.main(AirSetJobExample.scala)
>>>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>         at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>>>         at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>         at java.lang.reflect.Method.invoke(Method.java:606)
>>>>         at
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
>>>>         at
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
>>>>         at org.apache.flink.client.program.Client.run(Client.java:315)
>>>>         at
>>>> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
>>>>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
>>>>         at
>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
>>>>         at
>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>> execution failed.
>>>>         at
>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
>>>>         at
>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>>>         at
>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>>>         at
>>>> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>>>         at
>>>> org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
>>>>         at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>>>>         at
>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
>>>>         at
>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
>>>>         at
>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>>         at
>>>> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
>>>>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>         at
>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
>>>>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>>         at
>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>         at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>         at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>         at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: java.lang.OutOfMemoryError: Java heap space
>>>>         at
>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
>>>>         at
>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
>>>>         at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
>>>>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>         at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>> Thank you,
>>>> Konstantin Kudryavtsev
>>>>
>>>> On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>>
>>>>> Can you paste the exception stack trace?
>>>>>
>>>>> On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <
>>>>> kudryavtsev.konstantin@gmail.com> wrote:
>>>>>
>>>>>> It's DataSet program that performs simple filtering, crossjoin and
>>>>>> aggregation.
>>>>>>
>>>>>> I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3
>>>>>> connector doesn't work at all.
>>>>>>
>>>>>> Currently I have 3 taskmanagers each 5k MB, but I tried different
>>>>>> configurations and all leads to the same exception
>>>>>>
>>>>>> *Sent from my ZenFone
>>>>>> On Oct 8, 2015 12:05 PM, "Stephan Ewen" <sewen@apache.org>
wrote:
>>>>>>
>>>>>>> Can you give us a bit more background?  What exactly is your
program
>>>>>>> doing?
>>>>>>>
>>>>>>>   - Are you running a DataSet program, or a DataStream program?
>>>>>>>   - Is it one simple source that reads from S3, or are there
>>>>>>> multiple sources?
>>>>>>>   - What operations do you apply on the CSV file?
>>>>>>>   - Are you using Flink's S3 connector, or the Hadoop S3 file
system?
>>>>>>>
>>>>>>> Greetings,
>>>>>>> Stephan
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <
>>>>>>> kudryavtsev.konstantin@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi guys,
>>>>>>>>
>>>>>>>> I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM)
and
>>>>>>>> trying to process 3.8 GB CSV data from S3. I'm surprised
the fact that
>>>>>>>> Flink failed with OutOfMemory: Java Heap space
>>>>>>>>
>>>>>>>> I tried to find the reason:
>>>>>>>> 1) to identify TaskManager with a command ps aux | grep TaskManager
>>>>>>>> 2) then build Heap histo:
>>>>>>>> $ jmap -histo:live 19648 | head -n23
>>>>>>>>  num     #instances         #bytes  class name
>>>>>>>> ----------------------------------------------
>>>>>>>>    1:        131018     3763501304  [B
>>>>>>>>    2:         61022        7820352  <methodKlass>
>>>>>>>>    3:         61022        7688456  <constMethodKlass>
>>>>>>>>    4:          4971        5454408  <constantPoolKlass>
>>>>>>>>    5:          4966        4582232  <instanceKlassKlass>
>>>>>>>>    6:          4169        3003104  <constantPoolCacheKlass>
>>>>>>>>    7:         15696        1447168  [C
>>>>>>>>    8:          1291         638824  [Ljava.lang.Object;
>>>>>>>>    9:          5318         506000  java.lang.Class
>>>>>>>>
>>>>>>>>
>>>>>>>> Do you have any ideas what can be the reason and how it can
be
>>>>>>>> fixed?
>>>>>>>> Is Flink uses out-of-heap memory?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thank you,
>>>>>>>> Konstantin Kudryavtsev
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message