flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Handling decompression exceptions
Date Tue, 11 Oct 2016 12:02:24 GMT
I posted a workaround for that at
https://github.com/okkam-it/flink-examples/blob/master/src/main/java/it/okkam/datalinks/batch/flink/datasourcemanager/importers/Csv2RowExample.java

On 11 Oct 2016 1:57 p.m., "Fabian Hueske" <fhueske@gmail.com> wrote:

> Hi,
>
> Flink's String parser does not support escaped quotes. You data contains a
> double double quote (""). The parser identifies this as the end of the
> string field.
> As a workaround, you can read the file as a regular text file, line by
> line and do the parsing in a MapFunction.
>
> Best, Fabian
>
> 2016-10-11 13:37 GMT+02:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com>:
>
>> Forgot to add parseQuotedStrings('"'). After adding it I'm getting the
>> same exception with the second code too.
>>
>> 2016-10-11 13:29 GMT+02:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com>:
>>
>>> Hi Fabian,
>>>
>>> I tried to debug the code, and it turns out a line in my csv data is
>>> causing the ArrayIndexOutOfBoundsException, here is the exception
>>> stacktrace:
>>>
>>> java.lang.ArrayIndexOutOfBoundsException: -1
>>> at org.apache.flink.types.parser.StringParser.parseField(String
>>> Parser.java:49)
>>> at org.apache.flink.types.parser.StringParser.parseField(String
>>> Parser.java:28)
>>> at org.apache.flink.types.parser.FieldParser.resetErrorStateAnd
>>> Parse(FieldParser.java:98)
>>> at org.apache.flink.api.common.io.GenericCsvInputFormat.parseRe
>>> cord(GenericCsvInputFormat.java:395)
>>> at org.apache.flink.api.java.io.CsvInputFormat.readRecord(CsvIn
>>> putFormat.java:110)
>>> at org.apache.flink.api.common.io.DelimitedInputFormat.nextReco
>>> rd(DelimitedInputFormat.java:470)
>>> at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvIn
>>> putFormat.java:78)
>>> at org.myorg.quickstart.MyCsvInputFormat.nextRecord(MyCsvInputF
>>> ormat.java:106)
>>> at org.apache.flink.runtime.operators.DataSourceTask.invoke(Dat
>>> aSourceTask.java:162)
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> And here is a sample CSV:
>>>
>>> timestamp,url,id
>>> 2016-08-31 12:08:11.223,"https://www.toyota.fr/hybrid-innovation/infogr
>>> aphie.jsontcgcc, ce)_13h00 /""=/-3h00 %=) 1",0000000
>>>
>>> Using my code, I get the previous exception when parsing the sample CSV.
>>> If I use the following code, I get an incorrect result : (2016-08-31
>>> 12:08:11.223, ce)_13h00 /""=/-3h00 %=) 1") instead of (2016-08-31
>>> 12:08:11.223, 0000000)
>>>
>>> DataSet<Tuple2<String, String>> withReadCSV = env.readCsvFile("C:\\Users\\yassine\\Desktop\\test.csv")
>>>         .ignoreFirstLine()
>>>         .fieldDelimiter(",")
>>>         .includeFields("101")
>>>         .ignoreInvalidLines()
>>>         .types(String.class, String.class);
>>> withReadCSV.writeAsText("C:\\Users\\yassine\\Desktop\\withreadcsv.txt", FileSystem.WriteMode.OVERWRITE).setParallelism(1);
>>>
>>>
>>> Is it a bug in Flink or is my data not compliant with the csv standards?
>>>
>>> Thanks,
>>> Yassine
>>>
>>>
>>> 2016-10-11 11:21 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:
>>>
>>>> Hi Yassine,
>>>>
>>>> I ran your code without problems and got the correct result.
>>>> Can you provide the Stacktrace of the Exception?
>>>>
>>>> Thanks, Fabian
>>>>
>>>> 2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <y.marzougui@mindlytix.com
>>>> >:
>>>>
>>>>> Thank you Fabian and Stephan for the suggestions.
>>>>> I couldn't override "readLine()" because it's final, so went with
>>>>> Fabian's solution, but I'm struggling with csv field masks. Any help
is
>>>>> appreciated.
>>>>> I created an Input Format which is basically TupleCsvInputFormat for
>>>>> which I overrode the nextRecord() method to catch the exceptions. But
I'm
>>>>> having a *java.lang.ArrayIndexOutOfBoundsException*  when I add a
>>>>> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field
>>>>> mask, the job succeeds but outputs the first and second columns. Here
is my
>>>>> code:
>>>>>
>>>>> TupleTypeInfo<Tuple2<String, String>> typeInfo =
>>>>> TupleTypeInfo.getBasicTupleTypeInfo(String.class, String.class);
>>>>> Path histPath = new Path("hdfs:///shared/file.csv");
>>>>>
>>>>> CsvInputFormat <Tuple2<String, String>> myInputFormt = new
>>>>> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false,
true});
>>>>> myInputFormt.enableQuotedStringParsing('"');
>>>>> myInputFormt.setSkipFirstLineAsHeader(true);
>>>>> myInputFormt.setLenient(true);
>>>>>
>>>>> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,t
>>>>> ypeInfo).withParameters(parameters);
>>>>> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE
>>>>> );
>>>>>
>>>>> and here is the  custom input format:
>>>>>
>>>>> public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT>
{
>>>>>     private static final long serialVersionUID = 1L;
>>>>>     private TupleSerializerBase<OUT> tupleSerializer;
>>>>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>>>>> tupleTypeInfo) {
>>>>>         this(filePath, DEFAULT_LINE_DELIMITER,
>>>>> DEFAULT_FIELD_DELIMITER, tupleTypeInfo);
>>>>>     }
>>>>>     public MyCsvInputFormat(Path filePath, String lineDelimiter,
>>>>> String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
>>>>>         this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo,
>>>>> createDefaultMask(tupleTypeInfo.getArity()));
>>>>>     }
>>>>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>>>>> tupleTypeInfo, int[] includedFieldsMask) {
>>>>>         this(filePath, DEFAULT_LINE_DELIMITER,
>>>>> DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
>>>>>     }
>>>>>     public MyCsvInputFormat(Path filePath, String lineDelimiter,
>>>>> String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[]
>>>>> includedFieldsMask) {
>>>>>         super(filePath);
>>>>>         boolean[] mask = (includedFieldsMask == null)
>>>>>                 ? createDefaultMask(tupleTypeInfo.getArity())
>>>>>                 : toBooleanMask(includedFieldsMask);
>>>>>         configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
>>>>>     }
>>>>>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
>>>>> tupleTypeInfo, boolean[] includedFieldsMask) {
>>>>>         this(filePath, DEFAULT_LINE_DELIMITER,
>>>>> DEFAULT_FIELD_DELIMITER, tupleTypeInfo, includedFieldsMask);
>>>>>     }
>>>>>     public MyCsvInputFormat(Path filePath, String lineDelimiter,
>>>>> String fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[]
>>>>> includedFieldsMask) {
>>>>>         super(filePath);
>>>>>         configure(lineDelimiter, fieldDelimiter, tupleTypeInfo,
>>>>> includedFieldsMask);
>>>>>     }
>>>>>     private void configure(String lineDelimiter, String fieldDelimiter,
>>>>>                            TupleTypeInfoBase<OUT> tupleTypeInfo,
>>>>> boolean[] includedFieldsMask) {
>>>>>         if (tupleTypeInfo.getArity() == 0) {
>>>>>             throw new IllegalArgumentException("Tuple size must be
>>>>> greater than 0.");
>>>>>         }
>>>>>         if (includedFieldsMask == null) {
>>>>>             includedFieldsMask = createDefaultMask(tupleTypeInf
>>>>> o.getArity());
>>>>>         }
>>>>>         tupleSerializer = (TupleSerializerBase<OUT>)
>>>>> tupleTypeInfo.createSerializer(new ExecutionConfig());
>>>>>         setDelimiter(lineDelimiter);
>>>>>         setFieldDelimiter(fieldDelimiter);
>>>>>         Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
>>>>>         for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
>>>>>             classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
>>>>>         }
>>>>>         setFieldsGeneric(includedFieldsMask, classes);
>>>>>     }
>>>>>     @Override
>>>>>     public OUT fillRecord(OUT reuse, Object[] parsedValues) {
>>>>>         return tupleSerializer.createOrReuseInstance(parsedValues,
>>>>> reuse);
>>>>>     }
>>>>>
>>>>>     @Override
>>>>>     public OUT nextRecord(OUT record) {
>>>>>         OUT returnRecord = null;
>>>>>         do {
>>>>>             try {
>>>>>                 returnRecord = super.nextRecord(record);
>>>>>             } catch (IOException e) {
>>>>>                 e.printStackTrace();
>>>>>             }
>>>>>         } while (returnRecord == null && !reachedEnd());
>>>>>         return returnRecord;
>>>>>     }
>>>>> }
>>>>>
>>>>> Thanks,
>>>>> Yassine
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 2016-10-04 18:35 GMT+02:00 Stephan Ewen <sewen@apache.org>:
>>>>>
>>>>>> How about just overriding the "readLine()" method to call
>>>>>> "super.readLine()" and catching EOF exceptions?
>>>>>>
>>>>>> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhueske@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Yassine,
>>>>>>>
>>>>>>> AFAIK, there is no built-in way to ignore corrupted compressed
files.
>>>>>>> You could try to implement a FileInputFormat that wraps the
>>>>>>> CsvInputFormat and forwards all calls to the wrapped CsvIF.
>>>>>>> The wrapper would also catch and ignore the EOFException.
>>>>>>>
>>>>>>> If you do that, you would not be able to use the env.readCsvFile()
>>>>>>> shortcut but would need to create an instance of your own InputFormat
and
>>>>>>> add it with
>>>>>>> env.readFile(yourIF).
>>>>>>>
>>>>>>> Hope this helps,
>>>>>>> Fabian
>>>>>>>
>>>>>>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <
>>>>>>> y.marzougui@mindlytix.com>:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> I am reading a large number of GZip compressed csv files,
nested in
>>>>>>>> a HDFS directory:
>>>>>>>>
>>>>>>>> Configuration parameters = new Configuration();
>>>>>>>> parameters.setBoolean("recursive.file.enumeration", true);
>>>>>>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share
>>>>>>>> d/logs/")
>>>>>>>>                 .ignoreFirstLine()
>>>>>>>>                 .fieldDelimiter("|")
>>>>>>>>                 .includeFields("011000")
>>>>>>>>                 .types(String.class, Long.class)
>>>>>>>>                 .withParameters(parameters);
>>>>>>>>
>>>>>>>> My job is failing with the following exception:
>>>>>>>>
>>>>>>>> 2016-10-04 17:19:59,933 INFO  org.apache.flink.runtime.jobmanager.JobManager
               - Status of job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.
>>>>>>>>
>>>>>>>> java.io.EOFException: Unexpected end of ZLIB input stream
>>>>>>>>
>>>>>>>> 	at java.util.zip.InflaterInputStream.fill(Unknown Source)
>>>>>>>>
>>>>>>>> 	at java.util.zip.InflaterInputStream.read(Unknown Source)
>>>>>>>>
>>>>>>>> 	at java.util.zip.GZIPInputStream.read(Unknown Source)
>>>>>>>>
>>>>>>>> 	at org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)
>>>>>>>>
>>>>>>>> 	at org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)
>>>>>>>>
>>>>>>>> 	at org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)
>>>>>>>>
>>>>>>>> 	at org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)
>>>>>>>>
>>>>>>>> 	at org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
>>>>>>>>
>>>>>>>> 	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
>>>>>>>>
>>>>>>>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>>>>>>>>
>>>>>>>> 	at java.lang.Thread.run(Unknown Source)
>>>>>>>>
>>>>>>>> I think it is due to some unproperly compressed files, how
can I handle and ignore such exceptions? Thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Yassine
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message