How about just overriding the "readLine()" method to call "super.readLine()" and catching EOF exceptions?

On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhueske@gmail.com> wrote:
Hi Yassine,

AFAIK, there is no built-in way to ignore corrupted compressed files.
You could try to implement a FileInputFormat that wraps the CsvInputFormat and forwards all calls to the wrapped CsvIF.
The wrapper would also catch and ignore the EOFException.

If you do that, you would not be able to use the env.readCsvFile() shortcut but would need to create an instance of your own InputFormat and add it with
env.readFile(yourIF).

Hope this helps,
Fabian

2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com>:
Hi all,

I am reading a large number of GZip compressed csv files, nested in a HDFS directory:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/")
                .ignoreFirstLine()
                .fieldDelimiter("|")
                .includeFields("011000")
                .types(String.class, Long.class)
                .withParameters(parameters);

My job is failing with the following exception:
2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager                - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.

java.io.EOFException: Unexpected end of ZLIB input stream

	at java.util.zip.InflaterInputStream.fill(Unknown Source)

	at java.util.zip.InflaterInputStream.read(Unknown Source)

	at java.util.zip.GZIPInputStream.read(Unknown Source)

	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)

	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)

	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)

	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)

	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)

	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)

	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)

	at java.lang.Thread.run(Unknown Source)
I think it is due to some unproperly compressed files, how can I handle and ignore such exceptions? Thanks.

Best,
Yassine