I don't know your use case.
The InputFormat interface is very flexible. Directories can be recursively read. A file can contain one or more objects. You can also make a smarter IF and put multiple (small) files into one split...

It is up to your use case what you need to implement.

2015-08-07 Flavio Pompermaier:
Should this be the case just reading recursively an entire directory containing one object per file?

Fabian Hueske:
You could implement your own InputFormat based on FileInputFormat and overwrite the createInputSplits method to just create a single split per file.

2015-08-07 Flavio Pompermaier:
So what should I do?

Fabian Hueske:
Ah, I checked the code.

The BinaryInputFormat expects metadata which is written be the BinaryOutputFormat.
So you cannot use the BinaryInputFormat to read a file which does not provide the metadata.

2015-08-07 Flavio Pompermaier:
The file containing the serialized object is 7 bytes

Fabian Hueske:
This might be an issue with the blockSize parameter of the BinaryInputFormat.
How large is the file with the single object?

2015-08-07 Flavio Pompermaier:
I also tried with

DataSet<RowBundle> ds = env.createInput(inputFormat).setParallelism(1);

but I get the same error :(

Moreover, in this example I put exactly one object per file so it should be able to deserialize it, right?

Fabian Hueske:
If you create your file by just sequentially writing all objects to the file using Kryo, you can only read it with a parallelism of 1.
Writing binary files in a way that they can be read in parallel is a bit tricky (and not specific to Flink).

2015-08-07 Flavio Pompermaier:
Hi to all,
I;m trying to read a file serialized with kryo but I get this exception (due to the fact that the createInputSplits creates 8 inputsplits, where just one is not empty..).

Caused by: java.io.IOException: Invalid argument
at sun.nio.ch.FileChannelImpl.position0(Native Method)
at sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:285)
at org.apache.flink.core.fs.local.LocalDataInputStream.seek(LocalDataInputStream.java:57)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:257)
at org.apache.flink.api.common.io.BinaryInputFormat.open(BinaryInputFormat.java:46)
at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:151)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)

My program is basically the following:

public static void main(String[] args) throws Exception {

//try-with-resources used to autoclose resources
try (Output output = new Output(new FileOutputStream("/tmp/KryoTest.ser"))) {
//serialise object
Kryo kryo=new Kryo();
kryo.writeClassAndObject(output, myObj);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);

//deserialise object


try (Input input = new Input( new FileInputStream("/tmp/KryoTest.ser"))){
    Kryo kryo=new Kryo();
    myObj =(MyClass)kryo.readClassAndObject(input);
} catch (FileNotFoundException ex) {
LOG.error(ex.getMessage(), ex);

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerTypeWithKryoSerializer(MyClass.class, MyClassSerializer.class);
Configuration configuration = new Configuration();
configuration.setLong(BinaryInputFormat.BLOCK_SIZE_PARAMETER_KEY, 64*1024*1024);

TypeInformation<MyClass> typeInfo = new GenericTypeInfo<>(MyClass.class);
final BinaryInputFormat<MyClass> inputFormat = new TypeSerializerInputFormat<>(typeInfo);

DataSet<MyClass> ds = env.createInput(inputFormat);


private static final class MyClassSerializer extends Serializer<MyClass> {

public void write(Kryo kryo, Output output, MyClass object) {
kryo.writeClassAndObject(output, object);

public MyClass read(Kryo kryo, Input input, Class<MyClass> type) {
return (MyClass) kryo.readClassAndObject(input);

Am I doing something wrong?