flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Classloader issue using AvroParquetInputFormat via HadoopInputFormat
Date Tue, 09 Aug 2016 10:41:01 GMT
Hi Shannon!

It seams that the something in the maven deployment went wrong with this
release.

There should be:
  - *flink-java* (the default, with a transitive dependency to hadoop 2.x
for hadoop compatibility features)
  - *flink-java-hadoop1* (with a transitive dependency for hadoop 1.x fir
older hadoop compatibility features)

Apparently the "flink-java" artifact git overwritten with the
"flink-java-hadoop1" artifact. Damn.

I think we need to release new artifacts that fix these dependency
descriptors.

That needs to be a 1.1.1 release, because maven artifacts cannot be changed
after they were deployed.

Greetings,
Stephan






On Mon, Aug 8, 2016 at 11:08 PM, Shannon Carey <scarey@expedia.com> wrote:

> Correction: I cannot work around the problem. If I exclude hadoop1, I get
> the following exception which appears to be due to flink-java-1.1.0's
> dependency on Hadoop1.
>
> Failed to submit job 4b6366d101877d38ef33454acc6ca500 (
> com.expedia.www.flink.jobs.DestinationCountsHistoryJob$)
> org.apache.flink.runtime.client.JobExecutionException: Failed to submit
> job 4b6366d101877d38ef33454acc6ca500 (com.expedia.www.flink.jobs.
> DestinationCountsHistoryJob$)
> at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1281)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1.applyOrElse(JobManager.scala:478)
> at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
> at org.apache.flink.runtime.LeaderSessionMessageFilter$$
> anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
> at scala.runtime.AbstractPartialFunction.apply(
> AbstractPartialFunction.scala:36)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:33)
> at org.apache.flink.runtime.LogMessages$$anon$1.apply(
> LogMessages.scala:28)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> at org.apache.flink.runtime.LogMessages$$anon$1.
> applyOrElse(LogMessages.scala:28)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at org.apache.flink.runtime.jobmanager.JobManager.
> aroundReceive(JobManager.scala:121)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.JobException: Creating the input
> splits caused an error: Found interface org.apache.hadoop.mapreduce.JobContext,
> but class was expected
> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:172)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> attachJobGraph(ExecutionGraph.java:695)
> at org.apache.flink.runtime.jobmanager.JobManager.org$
> apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:
> 1178)
> ... 19 more
> Caused by: java.lang.IncompatibleClassChangeError: Found interface
> org.apache.hadoop.mapreduce.JobContext, but class was expected
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.
> createInputSplits(HadoopInputFormatBase.java:158)
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopInputFormatBase.
> createInputSplits(HadoopInputFormatBase.java:56)
> at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(
> ExecutionJobVertex.java:156)
> ... 21 more
>
> And if I exclude hadoop2, I get the exception from my previous email with
> AvroParquetInputFormat.
>
>
>
> From: Shannon Carey <scarey@expedia.com>
> Date: Monday, August 8, 2016 at 2:46 PM
> To: "user@flink.apache.org" <user@flink.apache.org>
> Subject: Classloader issue using AvroParquetInputFormat via
> HadoopInputFormat
>
> Hi folks, congrats on 1.1.0!
>
> FYI, after updating to Flink 1.1.0 I get the exception at bottom when
> attempting to run a job that uses AvroParquetInputFormat wrapped in a Flink
> HadoopInputFormat. The ContextUtil.java:71 is trying to execute:
>
> Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl");
>
> I am using Scala * 2.11*.7. JobContextImpl is coming from flink-shaded-
> *hadoop2*:1.1.0. However, its parent class (JobContext) is actually being
> loaded (according to output with JVM param "-verbose:class") from the
> flink-shaded-*hadoop1_2.10* jar.
>
> After adding an exclusion on flink-shaded-hadoop1_2.10, the problem
> appears to be resolved. Is that the right way to fix the problem?
>
> From what I can tell, the problem is that the JARs that are deployed to
> Maven Central were built with different versions of Hadoop (as controlled
> by hadoop.profile):
>
> flink-runtime_2.11 depends on Hadoop 2
> flink-java depends on Hadoop 1 (Scala 2.10)
> flink-core depends on Hadoop 1 (Scala 2.10)
>
> This seems like a problem with Flink's build process.
>
> As an aside: would it be possible to change the interface of
> HadoopInputFormat to take a Configuration instead of a Job? That would
> reduce the dependence on the Hadoop API somewhat. It doesn't look like the
> Job itself is ever actually used for anything. I'm glad to see you already
> have https://issues.apache.org/jira/browse/FLINK-4316 and
> https://issues.apache.org/jira/browse/FLINK-4315
>
> Thanks,
> Shannon
>
> java.lang.IncompatibleClassChangeError: Implementing class
> at java.lang.ClassLoader.defineClass1(Native Method)
> at java.lang.ClassLoader.defineClass(ClassLoader.java:760)
> at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
> at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
> at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at org.apache.parquet.hadoop.util.ContextUtil.<clinit>(
> ContextUtil.java:71)
> at org.apache.parquet.avro.AvroParquetInputFormat.setRequestedProjection(
> AvroParquetInputFormat.java:54)
> at com.expedia.www.sdk.flink.HistoricalDataIngestionJob.
> readHistoricalParquetFile(HistoricalDataIngestionJob.scala:63)
> at com.expedia.www.flink.jobs.DestinationCountsHistoryJob$.main(
> DestinationCountsHistoryJob.scala:25)
> at com.expedia.www.flink.jobs.DestinationCountsHistoryTest$$
> anonfun$1.apply$mcV$sp(DestinationCountsHistoryTest.scala:23)
> at com.expedia.www.flink.jobs.DestinationCountsHistoryTest$$
> anonfun$1.apply(DestinationCountsHistoryTest.scala:20)
> at com.expedia.www.flink.jobs.DestinationCountsHistoryTest$$
> anonfun$1.apply(DestinationCountsHistoryTest.scala:20)
> at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(
> Transformer.scala:22)
> at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> at org.scalatest.Transformer.apply(Transformer.scala:22)
> at org.scalatest.Transformer.apply(Transformer.scala:20)
> at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647)
> at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
> at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683)
> at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(
> FlatSpecLike.scala:1644)
> at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(
> FlatSpecLike.scala:1656)
> at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(
> FlatSpecLike.scala:1656)
> at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
> at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656)
> at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1683)
> at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(
> FlatSpecLike.scala:1714)
> at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(
> FlatSpecLike.scala:1714)
> at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.
> apply(Engine.scala:413)
> at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.
> apply(Engine.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
> at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$
> runTestsInBranch(Engine.scala:390)
> at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.
> apply(Engine.scala:427)
> at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.
> apply(Engine.scala:401)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
> at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$
> runTestsInBranch(Engine.scala:396)
> at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
> at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1714)
> at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1683)
> at org.scalatest.Suite$class.run(Suite.scala:1424)
> at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$
> run(FlatSpec.scala:1683)
> at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(
> FlatSpecLike.scala:1760)
> at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(
> FlatSpecLike.scala:1760)
> at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
> at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1760)
> at com.expedia.www.flink.jobs.DestinationCountsHistoryTest.org$scalatest$
> BeforeAndAfterAll$$super$run(DestinationCountsHistoryTest.scala:12)
> at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(
> BeforeAndAfterAll.scala:257)
> at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
> at com.expedia.www.flink.jobs.DestinationCountsHistoryTest.run(
> DestinationCountsHistoryTest.scala:12)
> at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
> at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$
> 3.apply(Runner.scala:2563)
> at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$
> 3.apply(Runner.scala:2557)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
> at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailRepor
> ter$2.apply(Runner.scala:1044)
> at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailRepor
> ter$2.apply(Runner.scala:1043)
> at org.scalatest.tools.Runner$.withClassLoaderAndDispatchRepo
> rter(Runner.scala:2722)
> at org.scalatest.tools.Runner$.runOptionallyWithPassFailRepor
> ter(Runner.scala:1043)
> at org.scalatest.tools.Runner$.run(Runner.scala:883)
> at org.scalatest.tools.Runner.run(Runner.scala)
> at org.jetbrains.plugins.scala.testingSupport.scalaTest.
> ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:138)
> at org.jetbrains.plugins.scala.testingSupport.scalaTest.
> ScalaTestRunner.main(ScalaTestRunner.java:28)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(
> NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
>
>
>

Mime
View raw message