flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lukas Kircher <lukas.kirc...@uni-konstanz.de>
Subject Re: Problems reading Parquet input from HDFS
Date Tue, 02 May 2017 06:31:43 GMT
Hi Flavio,

thanks for your help. With Flink 1.2.0 and avro 1.8.1 it works fine for me too as long as
I run it from the IDE. As soon as I submit it as a job to the cluster I get the described
dependency issues.

* If I use the Flink 1.2.0 binary and just add Flink as a Maven dependency to my project I
get a NoSuchMethodError as the avro functionality I use is not present in avro 1.7.7 which
Flink 1.2.0 relies on. As I use the Flink binary this is only detected at runtime. The reason
is that locally (IDE) Maven chooses avro 1.8.1 but the binary doesn't know about this.

* If I base my complete project on the Flink source code and build Flink on my own (not use
the binary) I run into several compilation errors from the start. If I try to bump up the
avro dependency in Flink to a newer version several issues arise e.g. in flink-core that I
cannot resolve on my own. This also means that if one uses avro 1.8.1 there are incompatibilities
in the code at runtime but it breaks only if this code is accessed.

At the moment I think about switching to e.g. Protobuf but I'm not yet sure if this solves
my issue.

Best,
Lukas


> On 28 Apr 2017, at 18:38, Flavio Pompermaier <pompermaier@okkam.it> wrote:
> 
> Hi Lukas,
> a colleague of mine issued a PR to update the code of that example (https://github.com/FelixNeutatz/parquet-flinktacular/pull/2
<https://github.com/FelixNeutatz/parquet-flinktacular/pull/2>).
> 
> We updated avro to 1.8.1 and the example worked fine (we didn't test on the cluster yet).
> Let me know if it does help..
> 
> Best,
> Flavio
> 
> On Tue, Apr 25, 2017 at 1:53 PM, Lukas Kircher <lukas.kircher@uni-konstanz.de <mailto:lukas.kircher@uni-konstanz.de>>
wrote:
> Thanks for your suggestions.
> 
> @Flavio
> This is very similar to the code I use and yields basically the same problems. The examples
are based on flink-1.0-SNAPSHOT and avro-1.7.6. which is more than three years old. Do you
have a working setup with newer version of avro and flink?
> 
> @Jörn
> I tried to do that but I can't see how to get around the AvroParquetInputFormat (see
below). I can pass a schema for projection as a string but then I get a NullPointerException
as there is no ReadSupport class available in ParquetInputFormat. There is a constructor to
instantiate ParquetInputFormat with a class that extends ReadSupport but I haven't found a
suitable one to pass to the constructor. Do you know of a way around this?
> 
> 
>   public static void main(String[] args) throws Exception {
>       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
>       Job job = Job.getInstance();
>       HadoopInputFormat<Void, Customer> hif = new HadoopInputFormat<>(new
ParquetInputFormat(), Void.class,
>           Customer.class, job);
>       FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new org.apache.hadoop.fs.Path(
>           "/tmp/tpchinput/01/customer_parquet"));
>       job.getConfiguration().set("parquet.avro.projection", "{\"type\":\"record\",\"name\":\"Customer\",\"fields\":[{\"name\":\"c_custkey\",\"type\":\"int\"}]}");
>       env.createInput(hif).print();
>   }
> 
> 
> I am pretty sure that I miss something very basic? Let me know if you need any additional
information.
> 
> Thanks ...
> 
> 
> 
>> On 24 Apr 2017, at 20:51, Flavio Pompermaier <pompermaier@okkam.it <mailto:pompermaier@okkam.it>>
wrote:
>> 
>> I started from this guide:
>> 
>> https://github.com/FelixNeutatz/parquet-flinktacular <https://github.com/FelixNeutatz/parquet-flinktacular>
>> 
>> Best,
>> Flavio 
>> 
>> On 24 Apr 2017 6:36 pm, "Jörn Franke" <jornfranke@gmail.com <mailto:jornfranke@gmail.com>>
wrote:
>> Why not use a parquet only format? Not sure why you need an avtoparquetformat.
>> 
>> On 24. Apr 2017, at 18:19, Lukas Kircher <lukas.kircher@uni-konstanz.de <mailto:lukas.kircher@uni-konstanz.de>>
wrote:
>> 
>>> Hello,
>>> 
>>> I am trying to read Parquet files from HDFS and having problems. I use Avro for
schema. Here is a basic example:
>>> 
>>> public static void main(String[] args) throws Exception {
>>>     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>>> 
>>>     Job job = Job.getInstance();
>>>     HadoopInputFormat<Void, Customer> hif = new HadoopInputFormat<>(new
AvroParquetInputFormat(), Void.class,
>>>         Customer.class, job);
>>>     FileInputFormat.addInputPath((JobConf) job.getConfiguration(), new org.apache.hadoop.fs.Path(
>>>         "/tmp/tpchinput/01/customer_parquet"));
>>>     Schema projection = Schema.createRecord(Customer.class.getSimpleName(), null,
null, false);
>>>     List<Schema.Field> fields = Arrays.asList(
>>>         new Schema.Field("c_custkey", Schema.create(Schema.Type.INT), null, (Object)
null)
>>>     );
>>>     projection.setFields(fields);
>>>     AvroParquetInputFormat.setRequestedProjection(job, projection);
>>> 
>>>     DataSet<Tuple2<Void, Customer>> dataset = env.createInput(hif);
>>>     dataset.print();
>>> }
>>> If I submit this to the job manager I get the following stack trace:
>>> 
>>> java.lang.NoSuchMethodError: org.apache.avro.Schema$Field.<init>(Ljava/lang/String;Lorg/apache/avro/Schema;Ljava/lang/String;Ljava/lang/Object;)V
>>> 	at misc.Misc.main(Misc.java:29)
>>> 
>>> The problem is that I use the parquet-avro dependency (which provides AvroParquetInputFormat)
in version 1.9.0 which relies on the avro dependency 1.8.0. The flink-core itself relies on
the avro dependency in version 1.7.7. Jfyi the dependency tree looks like this:
>>> 
>>> [INFO] --- maven-dependency-plugin:2.8:tree (default-cli) @ flink-experiments
---
>>> [INFO] ...:1.0-SNAPSHOT
>>> [INFO] +- org.apache.flink:flink-java:jar:1.2.0:compile
>>> [INFO] |  +- org.apache.flink:flink-core:jar:1.2.0:compile
>>> [INFO] |  |  \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for conflict
with 1.8.0)
>>> [INFO] |  \- org.apache.flink:flink-shaded-hadoop2:jar:1.2.0:compile
>>> [INFO] |     \- (org.apache.avro:avro:jar:1.7.7:compile - omitted for duplicate)
>>> [INFO] \- org.apache.parquet:parquet-avro:jar:1.9.0:compile
>>> [INFO]    \- org.apache.avro:avro:jar:1.8.0:compile
>>> 
>>> Fixing the above NoSuchMethodError just leads to further problems. Downgrading
parquet-avro to an older version creates other conflicts as there is no version that uses
avro 1.7.7 like Flink does.
>>> 
>>> Is there a way around this or can you point me to another approach to read Parquet
data from HDFS? How do you normally go about this?
>>> 
>>> Thanks for your help,
>>> Lukas
>>> 
>>> 
>>> 
> 
> 


Mime
View raw message