flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: EOF reading file from HDFS
Date Fri, 08 May 2015 16:02:49 GMT
The value of the parameter is not important for correctness but it must be
the same when writing and reading.
Try setting it to 64 MB.


2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:

> How can I retrieve the right one..?I I write with a block size different
> from the one of HDFS can I still read it then..?
>
> On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Have you tried to explicitly set the blocksize parameter when writing and
>> reading?
>> The default value might be different when reading from local FS and HDFS.
>>
>> 2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>
>>> Hi to all,
>>>
>>> I've created a dataset of Tuple2<String,byte[]> and I saved it on my
>>> local fs (a folder with 8 files because I run the program with parallelism
>>> 8) with the following code:
>>>
>>> Configuration configuration = new Configuration();
>>> TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat =
new
>>> TypeSerializerOutputFormat<>();
>>> outputFormat.setOutputFilePath(new Path(targetDir));
>>> outputFormat.setWriteMode(WriteMode.OVERWRITE);
>>> outputFormat.configure(configuration);
>>> ds.output(outputFormat);
>>>
>>> Then, if I read such a folder from the local fs everything is fine,
>>> otherwise if I read it from HDFS I get the following exception:
>>>
>>> java.io.EOFException
>>> at java.io.DataInputStream.readFully(DataInputStream.java:197)
>>> at java.io.DataInputStream.readFully(DataInputStream.java:169)
>>> at
>>> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
>>> at
>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
>>> at
>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
>>> at
>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>> at
>>> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
>>> at
>>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>>> at
>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>>>
>>> Could you help in understanding what's going on?
>>>
>>> The code I use to read the serialized ds is:
>>>
>>> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new
>>> TupleTypeInfo<Tuple2<String, byte[]>>(
>>> BasicTypeInfo.STRING_TYPE_INFO,
>>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
>>> TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new
>>> TypeSerializerInputFormat<>(tInfo);
>>> inputFormat.setFilePath(new Path(inputDir));
>>> inputFormat.configure(conf);
>>> DataSet<Tuple6<String, String, String, String, String, String>> ret
=
>>> env.createInput(inputFormat).flatMap(XXX);
>>>
>>>
>>> Best,
>>> Flavio
>>>
>>
>>
>
>
>

Mime
View raw message