flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yassine MARZOUGUI <y.marzou...@mindlytix.com>
Subject Handling decompression exceptions
Date Tue, 04 Oct 2016 15:43:39 GMT
Hi all,

I am reading a large number of GZip compressed csv files, nested in a HDFS
directory:

Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///shared/logs/")
                .ignoreFirstLine()
                .fieldDelimiter("|")
                .includeFields("011000")
                .types(String.class, Long.class)
                .withParameters(parameters);

My job is failing with the following exception:

2016-10-04 17:19:59,933 INFO
org.apache.flink.runtime.jobmanager.JobManager                - Status
of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.

java.io.EOFException: Unexpected end of ZLIB input stream

	at java.util.zip.InflaterInputStream.fill(Unknown Source)

	at java.util.zip.InflaterInputStream.read(Unknown Source)

	at java.util.zip.GZIPInputStream.read(Unknown Source)

	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)

	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)

	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)

	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)

	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)

	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)

	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)

	at java.lang.Thread.run(Unknown Source)

I think it is due to some unproperly compressed files, how can I
handle and ignore such exceptions? Thanks.


Best,
Yassine

Mime
View raw message