I think that's actually very use case specific.

You're code will never see the malformed record because it is dropped by the input format.
Other applications might rely on complete input and would prefer an exception to be notified about invalid input.
Flink's CsvInputFormat has a parameter "lenient" which makes this property configurable.

I agree with Stephan that we should add a record-size parameter to the DelimitedOutputFormat (which is the basis for the CsvInputFormat).

Cheers, Fabain

2015-10-08 19:33 GMT+02:00 KOSTIANTYN Kudriavtsev <kudryavtsev.konstantin@gmail.com>:

Yes it's,
I'm checking number of columns per line to filter out mailformed

*Sent from my ZenFone

On Oct 8, 2015 1:19 PM, "Stephan Ewen" <sewen@apache.org> wrote:
There is probably a different CSV input format implementation which drops invalid lines (too long lines).

Is that actually desired behavior, simply dropping malformatted input?

On Thu, Oct 8, 2015 at 7:12 PM, KOSTIANTYN Kudriavtsev <kudryavtsev.konstantin@gmail.com> wrote:
Hm, you was write

I checked all files, one by one and found an issue with a line in one of them... It's really unexpected for me as far as I run spark job on the same dataset and "wrong" rows were filtered out without issues. 

Thanks for help!

Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:35 PM, Stephan Ewen <sewen@apache.org> wrote:
Ah, that makes sense!

The problem is not in the core runtime, it is in the delimited input format. It probably looks for the line split character and never finds it, so that it starts buffering a super large line (gigabytes) which leads to the OOM exception.

Can you check whether the line split character and the encoding are properly defined?

Would actually be good to define a max line length (sane default + configurable value) that reports when lines seem to extend a maximum length (which is usually a misconfiguration of the split character)

Greetings,
Stephan


On Thu, Oct 8, 2015 at 6:29 PM, KOSTIANTYN Kudriavtsev <kudryavtsev.konstantin@gmail.com> wrote:
10/08/2015 16:25:48     CHAIN DataSource (at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31) (org.apache.flink.api.java.io.TextInputFormat)) -> Filter (Filter at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:31)) -> FlatMap (FlatMap at count(DataSet.scala:523))(1/1) switched to FAILED
java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)

10/08/2015 16:25:48     Job execution switched to status FAILING.
10/08/2015 16:25:48     DataSink (org.apache.flink.api.java.io.DiscardingOutputFormat@58dbb8cf)(1/1) switched to CANCELED
10/08/2015 16:25:48     Job execution switched to status FAILED.
org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed.
        at org.apache.flink.client.program.Client.run(Client.java:413)
        at org.apache.flink.client.program.Client.run(Client.java:356)
        at org.apache.flink.client.program.Client.run(Client.java:349)
        at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:63)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:789)
        at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:576)
        at org.apache.flink.api.scala.DataSet.count(DataSet.scala:524)
        at com.epam.AirSetJobExample$.main(AirSetJobExample.scala:35)
        at com.epam.AirSetJobExample.main(AirSetJobExample.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:437)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:353)
        at org.apache.flink.client.program.Client.run(Client.java:315)
        at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:582)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:288)
        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:878)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:920)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.yarn.ApplicationMasterActor$$anonfun$receiveYarnMessages$1.applyOrElse(ApplicationMasterActor.scala:100)
        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:543)
        at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:453)
        at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


Thank you,
Konstantin Kudryavtsev

On Thu, Oct 8, 2015 at 12:23 PM, Stephan Ewen <sewen@apache.org> wrote:
Can you paste the exception stack trace?

On Thu, Oct 8, 2015 at 6:15 PM, KOSTIANTYN Kudriavtsev <kudryavtsev.konstantin@gmail.com> wrote:

It's DataSet program that performs simple filtering, crossjoin and aggregation.

I'm using Hadoop S3 FileSystem (not Emr) as far as Flink's s3 connector doesn't work at all.

Currently I have 3 taskmanagers each 5k MB, but I tried different configurations and all leads to the same exception

*Sent from my ZenFone

On Oct 8, 2015 12:05 PM, "Stephan Ewen" <sewen@apache.org> wrote:
Can you give us a bit more background?  What exactly is your program doing? 

  - Are you running a DataSet program, or a DataStream program?
  - Is it one simple source that reads from S3, or are there multiple sources? 
  - What operations do you apply on the CSV file?
  - Are you using Flink's S3 connector, or the Hadoop S3 file system?

Greetings,
Stephan


On Thu, Oct 8, 2015 at 5:58 PM, KOSTIANTYN Kudriavtsev <kudryavtsev.konstantin@gmail.com> wrote:
Hi guys,

I'm running FLink on EMR with 2 m3.xlarge (each 16 GB RAM) and trying to process 3.8 GB CSV data from S3. I'm surprised the fact that Flink failed with OutOfMemory: Java Heap space

I tried to find the reason:
1) to identify TaskManager with a command ps aux | grep TaskManager
2) then build Heap histo: 
$ jmap -histo:live 19648 | head -n23
 num     #instances         #bytes  class name
----------------------------------------------
   1:        131018     3763501304  [B
   2:         61022        7820352  <methodKlass>
   3:         61022        7688456  <constMethodKlass>
   4:          4971        5454408  <constantPoolKlass>
   5:          4966        4582232  <instanceKlassKlass>
   6:          4169        3003104  <constantPoolCacheKlass>
   7:         15696        1447168  [C
   8:          1291         638824  [Ljava.lang.Object;
   9:          5318         506000  java.lang.Class


Do you have any ideas what can be the reason and how it can be fixed? 
Is Flink uses out-of-heap memory?


Thank you,
Konstantin Kudryavtsev