flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Invalid argument reading a file containing a Kryo object
Date Fri, 07 Aug 2015 10:04:58 GMT
You could implement your own InputFormat based on FileInputFormat and
overwrite the createInputSplits method to just create a single split per
file.

2015-08-07 12:02 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:

> So what should I do?
>
> On Fri, Aug 7, 2015 at 12:01 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Ah, I checked the code.
>>
>> The BinaryInputFormat expects metadata which is written be the
>> BinaryOutputFormat.
>> So you cannot use the BinaryInputFormat to read a file which does not
>> provide the metadata.
>>
>> 2015-08-07 11:53 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>
>>> The file containing the serialized object is 7 bytes
>>>
>>> On Fri, Aug 7, 2015 at 11:49 AM, Fabian Hueske <fhueske@gmail.com>
>>> wrote:
>>>
>>>> This might be an issue with the blockSize parameter of the
>>>> BinaryInputFormat.
>>>> How large is the file with the single object?
>>>>
>>>> 2015-08-07 11:37 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>
>>>>> I also tried with
>>>>>
>>>>> DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);
>>>>>
>>>>> but I get the same error :(
>>>>>
>>>>> Moreover, in this example I put exactly one object per file so it
>>>>> should be able to deserialize it, right?
>>>>>
>>>>> On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhueske@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> If you create your file by just sequentially writing all objects
to
>>>>>> the file using Kryo, you can only read it with a parallelism of 1.
>>>>>> Writing binary files in a way that they can be read in parallel is
a
>>>>>> bit tricky (and not specific to Flink).
>>>>>>
>>>>>> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>>
>>>>>>> Hi to all,
>>>>>>> I;m trying to read a file serialized with kryo but I get this
>>>>>>> exception (due to the fact that the createInputSplits creates
8
>>>>>>> inputsplits, where just one is not empty..).
>>>>>>>
>>>>>>> Caused by: java.io.IOException: Invalid argument
>>>>>>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>>>>>>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>>>>>>> at
>>>>>>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>>>>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>>>>>>> at java.lang.Thread.run(Thread.java:745)
>>>>>>>
>>>>>>> -----------------------------------------------
>>>>>>> My program is basically the following:
>>>>>>>
>>>>>>> public static void main(String[] args) throws Exception {
>>>>>>>
>>>>>>> ...
>>>>>>> //try-with-resources used to autoclose resources
>>>>>>> try (Output output = new Output(new
>>>>>>> FileOutputStream("/tmp/KryoTest.ser"))) {
>>>>>>> //serialise object
>>>>>>> Kryo kryo=new Kryo();
>>>>>>> kryo.writeClassAndObject(output, myObj);
>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>> }
>>>>>>>
>>>>>>> //deserialise object
>>>>>>>
>>>>>>> myObj=null;
>>>>>>>
>>>>>>> try (Input input = new Input( new
>>>>>>> FileInputStream("/tmp/KryoTest.ser"))){
>>>>>>>     Kryo kryo=new Kryo();
>>>>>>>     myObj =(MyClass)kryo.readClassAndObject(input);
>>>>>>> } catch (FileNotFoundException ex) {
>>>>>>> LOG.error(ex.getMessage(), ex);
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> final ExecutionEnvironment env =
>>>>>>> ExecutionEnvironment.getExecutionEnvironment();
>>>>>>> env.registerTypeWithKryoSerializer(MyClass.class,
>>>>>>> MyClassSerializer.class);
>>>>>>> Configuration configuration = new Configuration();
>>>>>>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>>>>>>> 64*1024*1024);
>>>>>>>
>>>>>>> TypeInformation<MyClass> typeInfo = new
>>>>>>> GenericTypeInfo<>(MyClass.class);
>>>>>>> final BinaryInputFormat<MyClass> inputFormat = new
>>>>>>> TypeSerializerInputFormat<>(typeInfo);
>>>>>>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>>>>>>> inputFormat.configure(configuration);
>>>>>>>
>>>>>>> DataSet<MyClass> ds = env.createInput(inputFormat);
>>>>>>> ds.print();
>>>>>>>
>>>>>>> }
>>>>>>>
>>>>>>> private static final class MyClassSerializer extends
>>>>>>> Serializer<MyClass> {
>>>>>>>
>>>>>>> @Override
>>>>>>> public void write(Kryo kryo, Output output, MyClass object) {
>>>>>>> kryo.writeClassAndObject(output, object);
>>>>>>> }
>>>>>>>
>>>>>>> @Override
>>>>>>> public MyClass read(Kryo kryo, Input input, Class<MyClass>
type) {
>>>>>>> return (MyClass) kryo.readClassAndObject(input);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> Am I doing something wrong?
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Mime
View raw message