flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ufuk Celebi <...@apache.org>
Subject Re: Flink 1.1.0 : Hadoop 1/2 compatibility mismatch
Date Tue, 09 Aug 2016 12:11:22 GMT
As noted in the other thread, this is a problem with the Maven
artifacts of 1.1.0 :-( I've added a warning to the release note and
will start a emergency vote for 1.1.1 which only updates the Maven
artifacts.

On Tue, Aug 9, 2016 at 9:45 AM, LINZ, Arnaud <ALINZ@bouyguestelecom.fr> wrote:
> Hello,
>
>
>
> I’ve switched to 1.1.0, but part of my code doesn’t work any longer.
>
>
>
> Despite the fact that I have no Hadoop 1 jar in my dependencies (2.7.1
> clients & flink-hadoop-compatibility_2.10 1.1.0), I have a weird JobContext
> version mismatch error, that I was unable to understand.
>
>
>
> Code is a hive table read in a local batch flink cluster using a M/R job
> (from good package mapreduce, not mapred).
>
>
>
> import org.apache.hadoop.mapreduce.InputFormat;
>
> import org.apache.hadoop.mapreduce.Job;
>
> import org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormat;
>
> (…)
>
>         final Job job = Job.getInstance();
>
>         final InputFormat<NullWritable, DefaultHCatRecord> hCatInputFormat =
> (InputFormat) HCatInputFormat.setInput(job, table.getDbName(),
> table.getTableName(), filter);
>
>
>
>         final HadoopInputFormat<NullWritable, DefaultHCatRecord> inputFormat
> = new HadoopInputFormat<NullWritable,
>
>         DefaultHCatRecord>(hCatInputFormat, NullWritable.class,
> DefaultHCatRecord.class,  job);
>
>
>
>
>
>         final HCatSchema inputSchema =
> HCatInputFormat.getTableSchema(job.getConfiguration());
>
>         return cluster
>
>             .createInput(inputFormat)
>
>             .flatMap(new RichFlatMapFunction<Tuple2<NullWritable,
> DefaultHCatRecord>, T>() {
>
>                     @Override
>
>                     public void flatMap(Tuple2<NullWritable,
> DefaultHCatRecord> value,
>
>                         Collector<T> out) throws Exception { // NOPMD
>
>                         (...)
>
>                     }
>
>                 }).returns(beanClass);
>
>
>
>
>
> Exception is :
>
> org.apache.flink.runtime.client.JobExecutionException: Failed to submit job
> 69dba7e4d79c05d2967dca4d4a27cf38 (Flink Java Job at Tue Aug 09 09:19:41 CEST
> 2016)
>
>                 at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1281)
>
>                 at
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:478)
>
>                 at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>
>                 at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>
>                 at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>
>                 at
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>
>                 at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>
>                 at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>
>                 at
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>
>                 at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>
>                 at
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>
>                 at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>
>                 at
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>
>                 at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>
>                 at
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:121)
>
>                 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>
>                 at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>
>                 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>
>                 at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>
>                 at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
>                 at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: org.apache.flink.runtime.JobException: Creating the input splits
> caused an error: Found interface org.apache.hadoop.mapreduce.JobContext, but
> class was expected
>
>                 at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:172)
>
>                 at
> org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:695)
>
>                 at
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1178)
>
>                 ... 23 more
>
> Caused by: java.lang.IncompatibleClassChangeError: Found interface
> org.apache.hadoop.mapreduce.JobContext, but class was expected
>
>                 at
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:158)
>
>                 at
> org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.createInputSplits(HadoopInputFormatBase.java:56)
>
>                 at
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:156)
>
>                 ... 25 more
>
>
>
> Any idea what has gone wrong ?
>
> Thanks,
>
> Arnaud
>
>
> ________________________________
>
> L'intégrité de ce message n'étant pas assurée sur internet, la société
> expéditrice ne peut être tenue responsable de son contenu ni de ses pièces
> jointes. Toute utilisation ou diffusion non autorisée est interdite. Si vous
> n'êtes pas destinataire de ce message, merci de le détruire et d'avertir
> l'expéditeur.
>
> The integrity of this message cannot be guaranteed on the Internet. The
> company that sent this message cannot therefore be held liable for its
> content nor attachments. Any unauthorized use or dissemination is
> prohibited. If you are not the intended recipient of this message, then
> please delete it and notify the sender.

Mime
View raw message