flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: Invalid argument reading a file containing a Kryo object
Date Fri, 07 Aug 2015 09:37:32 GMT
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be
able to deserialize it, right?

On Fri, Aug 7, 2015 at 11:33 AM, Fabian Hueske <fhueske@gmail.com> wrote:

> If you create your file by just sequentially writing all objects to the
> file using Kryo, you can only read it with a parallelism of 1.
> Writing binary files in a way that they can be read in parallel is a bit
> tricky (and not specific to Flink).
>
> 2015-08-07 11:28 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> Hi to all,
>> I;m trying to read a file serialized with kryo but I get this exception
>> (due to the fact that the createInputSplits creates 8 inputsplits, where
>> just one is not empty..).
>>
>> Caused by: java.io.IOException: Invalid argument
>> at sun.nio.ch.FileChannelImpl.position0(Native Method)
>> at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
>> at
>> org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
>> at
>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
>> at
>> org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> -----------------------------------------------
>> My program is basically the following:
>>
>> public static void main(String[] args) throws Exception {
>>
>> ...
>> //try-with-resources used to autoclose resources
>> try (Output output = new Output(new
>> FileOutputStream("/tmp/KryoTest.ser"))) {
>> //serialise object
>> Kryo kryo=new Kryo();
>> kryo.writeClassAndObject(output, myObj);
>> } catch (FileNotFoundException ex) {
>> LOG.error(ex.getMessage(), ex);
>> }
>>
>> //deserialise object
>>
>> myObj=null;
>>
>> try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
>>     Kryo kryo=new Kryo();
>>     myObj =(MyClass)kryo.readClassAndObject(input);
>> } catch (FileNotFoundException ex) {
>> LOG.error(ex.getMessage(), ex);
>> }
>>
>>
>> final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();
>> env.registerTypeWithKryoSerializer(MyClass.class,
>> MyClassSerializer.class);
>> Configuration configuration = new Configuration();
>> configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY,
>> 64*1024*1024);
>>
>> TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
>> final BinaryInputFormat<MyClass> inputFormat = new
>> TypeSerializerInputFormat<>(typeInfo);
>> inputFormat.setFilePath("file:/tmp/KryoTest.ser");
>> inputFormat.configure(configuration);
>>
>> DataSet<MyClass> ds = env.createInput(inputFormat);
>> ds.print();
>>
>> }
>>
>> private static final class MyClassSerializer extends Serializer<MyClass> {
>>
>> @Override
>> public void write(Kryo kryo, Output output, MyClass object) {
>> kryo.writeClassAndObject(output, object);
>> }
>>
>> @Override
>> public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
>> return (MyClass) kryo.readClassAndObject(input);
>> }
>> }
>>
>> Am I doing something wrong?
>>
>> Best,
>> Flavio
>>
>>
>>
>

Mime
View raw message