flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: EOF reading file from HDFS
Date Mon, 11 May 2015 19:35:28 GMT
Working around dependency mismatches is a bit tricky such large systems
that have many (transitive) dependencies. You are probably colliding with
the thrift version packaged by Hadoop (which
in turn is packaged by Flink).

You can fix that for now by compiling your own version of Flink, after
setting the Thrift version to 0.9.2 in the root POM file. You can try as a
quick workaround to simply include Thrift 0.9.2 in
your user jar (and that way override for user code the thrift classes).

If Thrift 0.9.2 is backwards compatible, we can bump the Flink internal
version of Thrift ans fix this for the future. If not, we may have to shade
it away like some other dependencies...


On Fri, May 8, 2015 at 7:08 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> I think you are confusing the block sizes of the local FS, HDFS, and the
> BinaryIF/OF. The block size of the local FS is not related. In fact, 4kb is
> much too small. Each block will result in one split. 4kb block size will
> create thousands of splits causing a lot of scheduling overhead.
>
> The HDFS block size should be a multiple of the BinaryIF/OF block size to
> avoid data access over the network.
>
> I would set the default block size to 32 or 64MB given that multiples of
> 64MB are common values for HDFS block sizes.
> I reached this solution, do you think it could be ok (taking into account
> that my local fs block size is 4096):
>
> blockSize = new
> org.apache.hadoop.conf.Configuration().getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
> 4096);
>
> In this way, if I find the hadoop config files in the resources folder I
> use that blockSize, otherwise 4096.
> In this way, also if I run the job locally, I'll have consistent setting
> whether the hadoop config files are there or not..
>
> Now I have another problem..the byte[] of the Tuple2 is written using
> thrift 0.9.2 but the one on the Flink dist is 0.6.1 and I think this is the
> cuase of some errors like
>
>  java.lang.AbstractMethodError:
> org.apache.thrift.TUnion.readValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object;
> at org.apache.thrift.TUnion.read(TUnion.java:135)
> at org.apache.jena.riot.thrift.BinRDF.apply(BinRDF.java:187)
> at org.apache.jena.riot.thrift.BinRDF.applyVisitor(BinRDF.java:176)
> at org.apache.jena.riot.thrift.BinRDF.protocolToStream(BinRDF.java:164)
> at org.apache.jena.riot.thrift.BinRDF.inputStreamToStream(BinRDF.java:149)
>
> What is the best way to fix such version mismatching problems?
>
> On Fri, May 8, 2015 at 6:14 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> The point is that you don't want Flink to automatically infer the
>> parameter because the default parameter depends on the file system.
>> If you write a file to local FS this happens with a different default
>> parameter than if you would write to HDFS.
>> Just set the parameter to 64 MB when reading and writing to the same
>> value.
>>
>> 2015-05-08 18:07 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>
>>> If I haven't set that param in the code of the job do you think Flink
>>> automatically infer that param from somewher in the hadoop xxx-site.xml
>>> files or from the hadoop cluster?
>>>
>>> On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>>>
>>>> The value of the parameter is not important for correctness but it must
>>>> be the same when writing and reading.
>>>> Try setting it to 64 MB.
>>>>
>>>>
>>>> 2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>
>>>>> How can I retrieve the right one..?I I write with a block size
>>>>> different from the one of HDFS can I still read it then..?
>>>>>
>>>>> On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <fhueske@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Have you tried to explicitly set the blocksize parameter when writing
>>>>>> and reading?
>>>>>> The default value might be different when reading from local FS and
>>>>>> HDFS.
>>>>>>
>>>>>> 2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <pompermaier@okkam.it>:
>>>>>>
>>>>>>> Hi to all,
>>>>>>>
>>>>>>> I've created a dataset of Tuple2<String,byte[]> and I saved
it on my
>>>>>>> local fs (a folder with 8 files because I run the program with
parallelism
>>>>>>> 8) with the following code:
>>>>>>>
>>>>>>> Configuration configuration = new Configuration();
>>>>>>> TypeSerializerOutputFormat<Tuple2<String, byte[]>>
outputFormat =
>>>>>>> new TypeSerializerOutputFormat<>();
>>>>>>> outputFormat.setOutputFilePath(new Path(targetDir));
>>>>>>> outputFormat.setWriteMode(WriteMode.OVERWRITE);
>>>>>>> outputFormat.configure(configuration);
>>>>>>> ds.output(outputFormat);
>>>>>>>
>>>>>>> Then, if I read such a folder from the local fs everything is
fine,
>>>>>>> otherwise if I read it from HDFS I get the following exception:
>>>>>>>
>>>>>>> java.io.EOFException
>>>>>>> at java.io.DataInputStream.readFully(DataInputStream.java:197)
>>>>>>> at java.io.DataInputStream.readFully(DataInputStream.java:169)
>>>>>>> at
>>>>>>> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50)
>>>>>>> at
>>>>>>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217)
>>>>>>>
>>>>>>> Could you help in understanding what's going on?
>>>>>>>
>>>>>>> The code I use to read the serialized ds is:
>>>>>>>
>>>>>>> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new
>>>>>>> TupleTypeInfo<Tuple2<String, byte[]>>(
>>>>>>> BasicTypeInfo.STRING_TYPE_INFO,
>>>>>>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
>>>>>>> TypeSerializerInputFormat<Tuple2<String, byte[]>>
inputFormat = new
>>>>>>> TypeSerializerInputFormat<>(tInfo);
>>>>>>> inputFormat.setFilePath(new Path(inputDir));
>>>>>>> inputFormat.configure(conf);
>>>>>>> DataSet<Tuple6<String, String, String, String, String,
String>> ret
>>>>>>> =  env.createInput(inputFormat).flatMap(XXX);
>>>>>>>
>>>>>>>
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>

Mime
View raw message