flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: EOF reading file from HDFS
Date Fri, 08 May 2015 15:40:56 GMT
Have you tried to explicitly set the blocksize parameter when writing and
reading?
The default value might be different when reading from local FS and HDFS.

2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:

> Hi to all,
>
> I've created a dataset of Tuple2<String,byte[]> and I saved it on my local
> fs (a folder with 8 files because I run the program with parallelism 8)
> with the following code:
>
> Configuration configuration = new Configuration();
> TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new
> TypeSerializerOutputFormat<>();
> outputFormat.setOutputFilePath(new Path(targetDir));
> outputFormat.setWriteMode(WriteMode.OVERWRITE);
> outputFormat.configure(configuration);
> ds.output(outputFormat);
>
> Then, if I read such a folder from the local fs everything is fine,
> otherwise if I read it from HDFS I get the following exception:
>
> java.io.EOFException
> at java.io.DataInputStream.readFully(DataInputStream.java:197)
> at java.io.DataInputStream.readFully(DataInputStream.java:169)
> at
> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
> at
> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
> at
> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
> at
> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
> at
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>
> Could you help in understanding what's going on?
>
> The code I use to read the serialized ds is:
>
> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new
> TupleTypeInfo<Tuple2<String, byte[]>>(
> BasicTypeInfo.STRING_TYPE_INFO,
> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
> TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new
> TypeSerializerInputFormat<>(tInfo);
> inputFormat.setFilePath(new Path(inputDir));
> inputFormat.configure(conf);
> DataSet<Tuple6<String, String, String, String, String, String>> ret =
> env.createInput(inputFormat).flatMap(XXX);
>
>
> Best,
> Flavio
>

Mime
View raw message