flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Flavio Pompermaier <pomperma...@okkam.it>
Subject Re: EOF reading file from HDFS
Date Fri, 08 May 2015 16:07:02 GMT
If I haven't set that param in the code of the job do you think Flink
automatically infer that param from somewher in the hadoop xxx-site.xml
files or from the hadoop cluster?

On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> The value of the parameter is not important for correctness but it must be
> the same when writing and reading.
> Try setting it to 64 MB.
>
>
> 2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>
>> How can I retrieve the right one..?I I write with a block size different
>> from the one of HDFS can I still read it then..?
>>
>> On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> Have you tried to explicitly set the blocksize parameter when writing
>>> and reading?
>>> The default value might be different when reading from local FS and HDFS.
>>>
>>> 2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>
>>>> Hi to all,
>>>>
>>>> I've created a dataset of Tuple2<String,byte[]> and I saved it on my
>>>> local fs (a folder with 8 files because I run the program with parallelism
>>>> 8) with the following code:
>>>>
>>>> Configuration configuration = new Configuration();
>>>> TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat
= new
>>>> TypeSerializerOutputFormat<>();
>>>> outputFormat.setOutputFilePath(new Path(targetDir));
>>>> outputFormat.setWriteMode(WriteMode.OVERWRITE);
>>>> outputFormat.configure(configuration);
>>>> ds.output(outputFormat);
>>>>
>>>> Then, if I read such a folder from the local fs everything is fine,
>>>> otherwise if I read it from HDFS I get the following exception:
>>>>
>>>> java.io.EOFException
>>>> at java.io.DataInputStream.readFully(DataInputStream.java:197)
>>>> at java.io.DataInputStream.readFully(DataInputStream.java:169)
>>>> at
>>>> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
>>>> at
>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
>>>> at
>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
>>>> at
>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>> at
>>>> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
>>>> at
>>>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>>>> at
>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>>>>
>>>> Could you help in understanding what's going on?
>>>>
>>>> The code I use to read the serialized ds is:
>>>>
>>>> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new
>>>> TupleTypeInfo<Tuple2<String, byte[]>>(
>>>> BasicTypeInfo.STRING_TYPE_INFO,
>>>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
>>>> TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat
= new
>>>> TypeSerializerInputFormat<>(tInfo);
>>>> inputFormat.setFilePath(new Path(inputDir));
>>>> inputFormat.configure(conf);
>>>> DataSet<Tuple6<String, String, String, String, String, String>>
ret =
>>>> env.createInput(inputFormat).flatMap(XXX);
>>>>
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>
>>>
>>
>>
>>
>

Mime
View raw message