flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Handling decompression exceptions
Date Tue, 11 Oct 2016 11:56:27 GMT
Hi,

Flink's String parser does not support escaped quotes. You data contains a
double double quote (""). The parser identifies this as the end of the
string field.
As a workaround, you can read the file as a regular text file, line by line
and do the parsing in a MapFunction.

Best, Fabian

2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com>:

> Forgot to add parseQuotedStrings('"'). After adding it I'm getting the
> same exception with the second code too.
>
> 2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com>:
>
>> Hi Fabian,
>>
>> I tried to debug the code, and it turns out a line in my csv data is
>> causing the ArrayIndexOutOfBoundsException, here is the exception
>> stacktrace:
>>
>> java.lang.ArrayIndexOutOfBoundsException: -1
>> at org.apache.flink.types.parser.StringParser.parseField(String
>> Parser.java:49)
>> at org.apache.flink.types.parser.StringParser.parseField(String
>> Parser.java:28)
>> at org.apache.flink.types.parser.FieldParser.resetErrorStateAnd
>> Parse(FieldParser.java:98)
>> at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRe
>> cord(GenericCsvInputFormat.java:395)
>> at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvIn
>> putFormat.java:110)
>> at org.apache.flink.api.common.io.DelimitedInputFormat.nextReco
>> rd(DelimitedInputFormat.java:470)
>> at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvIn
>> putFormat.java:78)
>> at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputF
>> ormat.java:106)
>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(
>> DataSourceTask.java:162)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> And here is a sample CSV:
>>
>> timestamp,url,id
>> 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infogr
>> aphie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000
>>
>> Using my code, I get the previous exception when parsing the sample CSV.
>> If I use the following code, I get an incorrect result : (2016-08-31
>> 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31
>> 12:08:11.223, 0000000)
>>
>> DataSet<Tuple2<String, String>> withReadCSV = env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
>>         .ignoreFirstLine()
>>         .fieldDelimiter(",")
>>         .includeFields("101")
>>         .ignoreInvalidLines()
>>         .types(String.class, String.class);
>> withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
>>
>>
>> Is it a bug in Flink or is my data not compliant with the csv standards?
>>
>> Thanks,
>> Yassine
>>
>>
>> 2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>
>>> Hi Yassine,
>>>
>>> I ran your code without problems and got the correct result.
>>> Can you provide the Stacktrace of the Exception?
>>>
>>> Thanks, Fabian
>>>
>>> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com>
>>> :
>>>
>>>> Thank you Fabian and Stephan for the suggestions.
>>>> I couldn't override "readLine()" because it's final, so went with
>>>> Fabian's solution, but I'm struggling with csv field masks. Any help is
>>>> appreciated.
>>>> I created an Input Format which is basically TupleCsvInputFormat for
>>>> which I overrode the nextRecord() method to catch the exceptions. But I'm
>>>> having a *java.lang.ArrayIndexOutOfBoundsException*  when I add a
>>>> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field
>>>> mask, the job succeeds but outputs the first and second columns. Here is
my
>>>> code:
>>>>
>>>> TupleTypeInfo<Tuple2<String, String>> typeInfo =
>>>> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
>>>> Path histPath = new Path("hdfs:///shared/file.csv");
>>>>
>>>> CsvInputFormat <Tuple2<String, String>> myInputFormt = new
>>>> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
>>>> myInputFormt.enableQuotedStringParsing('"');
>>>> myInputFormt.setSkipFirstLineAsHeader(true);
>>>> myInputFormt.setLenient(true);
>>>>
>>>> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,t
>>>> ypeInfo).withParameters(parameters);
>>>> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);
>>>>
>>>> and here is the  custom input format:
>>>>
>>>> public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT>
{
>>>>     private static final long serialVersionUID = 1L;
>>>>     private TupleSerializerBase<OUT> tupleSerializer;
>>>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>>>> tupleTypeInfo) {
>>>>         this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
>>>> tupleTypeInfo);
>>>>     }
>>>>     public MyCsvInputFormat(Path filePath, String lineDelimiter, String
>>>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
>>>>         this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo,
>>>> createDefaultMask(tupleTypeInfo.getArity()));
>>>>     }
>>>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>>>> tupleTypeInfo, int[] includedFieldsMask) {
>>>>         this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
>>>> tupleTypeInfo, includedFieldsMask);
>>>>     }
>>>>     public MyCsvInputFormat(Path filePath, String lineDelimiter, String
>>>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[]
>>>> includedFieldsMask) {
>>>>         super(filePath);
>>>>         boolean[] mask = (includedFieldsMask == null)
>>>>                 ? createDefaultMask(tupleTypeInfo.getArity())
>>>>                 : toBooleanMask(includedFieldsMask);
>>>>         configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
>>>>     }
>>>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>>>> tupleTypeInfo, boolean[] includedFieldsMask) {
>>>>         this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
>>>> tupleTypeInfo, includedFieldsMask);
>>>>     }
>>>>     public MyCsvInputFormat(Path filePath, String lineDelimiter, String
>>>> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[]
>>>> includedFieldsMask) {
>>>>         super(filePath);
>>>>         configure(lineDelimiter, fieldDelimiter, tupleTypeInfo,
>>>> includedFieldsMask);
>>>>     }
>>>>     private void configure(String lineDelimiter, String fieldDelimiter,
>>>>                            TupleTypeInfoBase<OUT> tupleTypeInfo,
>>>> boolean[] includedFieldsMask) {
>>>>         if (tupleTypeInfo.getArity() == 0) {
>>>>             throw new IllegalArgumentException("Tuple size must be
>>>> greater than 0.");
>>>>         }
>>>>         if (includedFieldsMask == null) {
>>>>             includedFieldsMask = createDefaultMask(tupleTypeInf
>>>> o.getArity());
>>>>         }
>>>>         tupleSerializer = (TupleSerializerBase<OUT>)
>>>> tupleTypeInfo.createSerializer(new ExecutionConfig());
>>>>         setDelimiter(lineDelimiter);
>>>>         setFieldDelimiter(fieldDelimiter);
>>>>         Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
>>>>         for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
>>>>             classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
>>>>         }
>>>>         setFieldsGeneric(includedFieldsMask, classes);
>>>>     }
>>>>     @Override
>>>>     public OUT fillRecord(OUT reuse, Object[] parsedValues) {
>>>>         return tupleSerializer.createOrReuseInstance(parsedValues,
>>>> reuse);
>>>>     }
>>>>
>>>>     @Override
>>>>     public OUT nextRecord(OUT record) {
>>>>         OUT returnRecord = null;
>>>>         do {
>>>>             try {
>>>>                 returnRecord = super.nextRecord(record);
>>>>             } catch (IOException e) {
>>>>                 e.printStackTrace();
>>>>             }
>>>>         } while (returnRecord == null && !reachedEnd());
>>>>         return returnRecord;
>>>>     }
>>>> }
>>>>
>>>> Thanks,
>>>> Yassine
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 2016-10-04 18:35 GMT+02:00 Stephan Ewen <sewen@apache.org>:
>>>>
>>>>> How about just overriding the "readLine()" method to call
>>>>> "super.readLine()" and catching EOF exceptions?
>>>>>
>>>>> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhueske@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Yassine,
>>>>>>
>>>>>> AFAIK, there is no built-in way to ignore corrupted compressed files.
>>>>>> You could try to implement a FileInputFormat that wraps the
>>>>>> CsvInputFormat and forwards all calls to the wrapped CsvIF.
>>>>>> The wrapper would also catch and ignore the EOFException.
>>>>>>
>>>>>> If you do that, you would not be able to use the env.readCsvFile()
>>>>>> shortcut but would need to create an instance of your own InputFormat
and
>>>>>> add it with
>>>>>> env.readFile(yourIF).
>>>>>>
>>>>>> Hope this helps,
>>>>>> Fabian
>>>>>>
>>>>>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <
>>>>>> y.marzougui@mindlytix.com>:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> I am reading a large number of GZip compressed csv files, nested
in
>>>>>>> a HDFS directory:
>>>>>>>
>>>>>>> Configuration parameters = new Configuration();
>>>>>>> parameters.setBoolean("recursive.file.enumeration", true);
>>>>>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share
>>>>>>> d/logs/")
>>>>>>>                 .ignoreFirstLine()
>>>>>>>                 .fieldDelimiter("|")
>>>>>>>                 .includeFields("011000")
>>>>>>>                 .types(String.class, Long.class)
>>>>>>>                 .withParameters(parameters);
>>>>>>>
>>>>>>> My job is failing with the following exception:
>>>>>>>
>>>>>>> 2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager
               - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.
>>>>>>>
>>>>>>> java.io.EOFException: Unexpected end of ZLIB input stream
>>>>>>>
>>>>>>> 	at java.util.zip.InflaterInputStream.fill(Unknown Source)
>>>>>>>
>>>>>>> 	at java.util.zip.InflaterInputStream.read(Unknown Source)
>>>>>>>
>>>>>>> 	at java.util.zip.GZIPInputStream.read(Unknown Source)
>>>>>>>
>>>>>>> 	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)
>>>>>>>
>>>>>>> 	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)
>>>>>>>
>>>>>>> 	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)
>>>>>>>
>>>>>>> 	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)
>>>>>>>
>>>>>>> 	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
>>>>>>>
>>>>>>> 	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
>>>>>>>
>>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>>>>>>>
>>>>>>> 	at java.lang.Thread.run(Unknown Source)
>>>>>>>
>>>>>>> I think it is due to some unproperly compressed files, how can
I handle and ignore such exceptions? Thanks.
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Yassine
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message