flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shivam Sharma <28shivamsha...@gmail.com>
Subject Re: [jira] [Created] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
Date Thu, 14 Dec 2017 19:41:38 GMT
Hi Stephan,

Thanks for your help. Basically reverted the classloading to parent
first, *resolved
this issue*. Thanks for this but I have one question:

I am building a fat jar without any dependency as Provided. And in my case
I am using proto-java version 3.4.0  but I think fink uses pretty old
version(I think 2.5.0)
and when I submit my jar which version it will pick 2.5.0 or 3.4.0 in case
of parent-first classloading.

Thanks

On Thu, Dec 14, 2017 at 5:30 PM, Stephan Ewen <sewen@apache.org> wrote:

> @Shivam and @Ryan:
>
> My first feeling would be the following: You have the Scala library in your
> user code, and thus through the reversed class loading, the scala function
> types get duplicated.
>
> The right way to fix that is to make sure you build a proper jar file
> without any provided dependencies. Make sure you set "-Pbuild-jar" when
> packaging your program.
>
>   - You could also set "classloader.resolve-order: parent-first" in your
> configuration to restore the old class loading style.
>
>   - We should add "scala" to the default value for
> "classloader.parent-first-patterns". You can add it yourself in the
> configuration (make sure you keep all existing parent-first-patterns as
> well).
>
>
>
> On Thu, Dec 14, 2017 at 12:14 PM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
> > Hi,
> >
> > I tried to reproduce the problem given your description Ryan. I submitted
> > the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version
> > downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from
> > flink.apache.org and a cluster built from sources). However, I was not
> > able
> > to reproduce the problem. Therefore I suspect that it has something to do
> > with your or my setup.
> >
> > In order to further diagnose the problem, it would be tremendously
> helpful
> > if you could share the logs contained in the logs directory with us.
> >
> > Cheers,
> > Till
> >
> > On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <sewen@apache.org> wrote:
> >
> > > Hi!
> > >
> > > This may be due to the changed classloading semantics.
> > >
> > > Just to verify this, can you check if it gets solved by setting the
> > > following in the Flink configuration: "classloader.resolve-order:
> parent-
> > > first"
> > >
> > > By default, Flink 1.4 uses now inverted classloading to allow users to
> > use
> > > their own copies of dependencies, irrespective of what the underlying
> > > classpath is spoiled with. You can for example use a different Avro
> > > versions than Hadoop pull in, even without shading, or even different
> > Akka
> > > / Jackson / etc versions.
> > >
> > > That is a nice improvement, but it may have some impacts on tools that
> > have
> > > been build before. When you see classcast exceptions (like X cannot be
> > cast
> > > to X), that is probably caused by the fact that the classloader
> > duplicates
> > > a dependency from the JVM classpath in user-space, but objects/classes
> > move
> > > between the domains.
> > >
> > > Stephan
> > >
> > > On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <
> 28shivamsharma@gmail.com
> > >
> > > wrote:
> > >
> > > > Same Issue I am facing :-
> > > > http://apache-flink-mailing-list-archive.1008284.n3.nabble.
> > > > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html
> > > >
> > > > Can anyone explain the exception
> > > >
> > > > Thanks
> > > >
> > > > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <
> jira@apache.org>
> > > > wrote:
> > > >
> > > > > Ryan Brideau created FLINK-8256:
> > > > > -----------------------------------
> > > > >
> > > > >              Summary: Cannot use Scala functions to filter in 1.4
-
> > > > > java.lang.ClassCastException
> > > > >                  Key: FLINK-8256
> > > > >                  URL: https://issues.apache.org/
> > jira/browse/FLINK-8256
> > > > >              Project: Flink
> > > > >           Issue Type: Bug
> > > > >           Components: DataStream API
> > > > >     Affects Versions: 1.4.0
> > > > >          Environment: macOS, Local Flink v1.4.0, Scala 2.11
> > > > >             Reporter: Ryan Brideau
> > > > >
> > > > >
> > > > > I built the newest release locally today, but when I try to filter
> a
> > > > > stream using an anonymous or named function, I get an error.
> Here's a
> > > > > simple example:
> > > > >
> > > > >
> > > > > {code:java}
> > > > > import org.apache.flink.api.java.utils.ParameterTool
> > > > > import org.apache.flink.streaming.api.scala._
> > > > >
> > > > > object TestFunction {
> > > > >
> > > > >   def main(args: Array[String]): Unit = {
> > > > >
> > > > >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> > > > >     val params = ParameterTool.fromArgs(args)
> > > > >     env.getConfig.setGlobalJobParameters(params)
> > > > >
> > > > >     val someArray = Array(1,2,3)
> > > > >     val stream = env.fromCollection(someArray).filter(_ => true)
> > > > >     stream.print().setParallelism(1)
> > > > >     env.execute("Testing Function")
> > > > >   }
> > > > > }
> > > > > {code}
> > > > >
> > > > > This results in:
> > > > >
> > > > >
> > > > > {code:java}
> > > > > Job execution switched to status FAILING.
> > > > > org.apache.flink.streaming.runtime.tasks.StreamTaskException:
> Cannot
> > > > > instantiate user function.
> > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > getStreamOperator(StreamConfig.java:235)
> > > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > > createChainedOperator(OperatorChain.java:355)
> > > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > > createOutputCollector(OperatorChain.java:282)
> > > > >         at org.apache.flink.streaming.
> runtime.tasks.OperatorChain.<
> > > > > init>(OperatorChain.java:126)
> > > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > > invoke(StreamTask.java:231)
> > > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > > java:718)
> > > > >         at java.lang.Thread.run(Thread.java:748)
> > > > > Caused by: java.lang.ClassCastException: cannot assign instance of
> > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6
> > of
> > > > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > > > api.scala.DataStream$$anon$7
> > > > >         at java.io.ObjectStreamClass$FieldReflector.
> > setObjFieldValues(
> > > > > ObjectStreamClass.java:2233)
> > > > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > > > ObjectStreamClass.java:1405)
> > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > ObjectInputStream.java:2288)
> > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > ObjectInputStream.java:2206)
> > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > ObjectInputStream.java:2064)
> > > > >         at java.io.ObjectInputStream.
> readObject0(ObjectInputStream.
> > > > > java:1568)
> > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > ObjectInputStream.java:2282)
> > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > ObjectInputStream.java:2206)
> > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > ObjectInputStream.java:2064)
> > > > >         at java.io.ObjectInputStream.
> readObject0(ObjectInputStream.
> > > > > java:1568)
> > > > >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > > > > java:428)
> > > > >         at org.apache.flink.util.InstantiationUtil.
> > deserializeObject(
> > > > > InstantiationUtil.java:290)
> > > > >         at org.apache.flink.util.InstantiationUtil.
> > > readObjectFromConfig(
> > > > > InstantiationUtil.java:248)
> > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > getStreamOperator(StreamConfig.java:220)
> > > > >         ... 6 more
> > > > > 12/13/2017 15:10:01     Job execution switched to status FAILED.
> > > > >
> > > > > ------------------------------------------------------------
> > > > >  The program finished with the following exception:
> > > > >
> > > > > org.apache.flink.client.program.ProgramInvocationException: The
> > > program
> > > > > execution failed: Job execution failed.
> > > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > > ClusterClient.java:492)
> > > > >         at org.apache.flink.client.program.
> StandaloneClusterClient.
> > > > > submitJob(StandaloneClusterClient.java:105)
> > > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > > ClusterClient.java:456)
> > > > >         at org.apache.flink.streaming.api.environment.
> > > > > StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> > > > >         at org.apache.flink.streaming.api.scala.
> > > > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.
> > > scala:638)
> > > > >         at org.peopleinmotion.TestFunction$.main(
> > > TestFunction.scala:20)
> > > > >         at org.peopleinmotion.TestFunction.main(
> TestFunction.scala)
> > > > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
> > Method)
> > > > >         at sun.reflect.NativeMethodAccessorImpl.invoke(
> > > > > NativeMethodAccessorImpl.java:62)
> > > > >         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > > > DelegatingMethodAccessorImpl.java:43)
> > > > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > > > >         at org.apache.flink.client.program.PackagedProgram.
> > > callMainMeth
> > > > od(
> > > > > PackagedProgram.java:525)
> > > > >         at org.apache.flink.client.program.PackagedProgram.
> > > > > invokeInteractiveModeForExecution(PackagedProgram.java:417)
> > > > >         at org.apache.flink.client.program.ClusterClient.run(
> > > > > ClusterClient.java:396)
> > > > >         at org.apache.flink.client.CliFrontend.executeProgram(
> > > > > CliFrontend.java:802)
> > > > >         at org.apache.flink.client.CliFrontend.run(CliFrontend.
> > > java:282)
> > > > >         at org.apache.flink.client.CliFrontend.parseParameters(
> > > > > CliFrontend.java:1054)
> > > > >         at org.apache.flink.client.CliFrontend$1.call(
> > > > > CliFrontend.java:1101)
> > > > >         at org.apache.flink.client.CliFrontend$1.call(
> > > > > CliFrontend.java:1098)
> > > > >         at java.security.AccessController.doPrivileged(Native
> > Method)
> > > > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > > > >         at org.apache.hadoop.security.UserGroupInformation.doAs(
> > > > > UserGroupInformation.java:1556)
> > > > >         at org.apache.flink.runtime.security.
> HadoopSecurityContext.
> > > > > runSecured(HadoopSecurityContext.java:41)
> > > > >         at org.apache.flink.client.CliFrontend.main(CliFrontend.
> > java:
> > > > 1098)
> > > > > Caused by: org.apache.flink.runtime.client.JobExecutionException:
> > Job
> > > > > execution failed.
> > > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> > > > > mcV$sp(JobManager.scala:897)
> > > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > > > ger.scala:840)
> > > > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > > > ger.scala:840)
> > > > >         at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > > > > liftedTree1$1(Future.scala:24)
> > > > >         at scala.concurrent.impl.Future$
> > PromiseCompletingRunnable.run(
> > > > > Future.scala:24)
> > > > >         at akka.dispatch.TaskInvocation.
> > run(AbstractDispatcher.scala:
> > > 39)
> > > > >         at akka.dispatch.ForkJoinExecutorConfigurator$
> > > > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> > > > >         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> > > > > ForkJoinTask.java:260)
> > > > >         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > > > > runTask(ForkJoinPool.java:1339)
> > > > >         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > > > > ForkJoinPool.java:1979)
> > > > >         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > > > > ForkJoinWorkerThread.java:107)
> > > > > Caused by: org.apache.flink.streaming.runtime.tasks.
> > > StreamTaskException:
> > > > > Cannot instantiate user function.
> > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > getStreamOperator(StreamConfig.java:235)
> > > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > > createChainedOperator(OperatorChain.java:355)
> > > > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > > > createOutputCollector(OperatorChain.java:282)
> > > > >         at org.apache.flink.streaming.
> runtime.tasks.OperatorChain.<
> > > > > init>(OperatorChain.java:126)
> > > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > > invoke(StreamTask.java:231)
> > > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > > java:718)
> > > > >         at java.lang.Thread.run(Thread.java:748)
> > > > > Caused by: java.lang.ClassCastException: cannot assign instance of
> > > > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6
> > of
> > > > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > > > api.scala.DataStream$$anon$7
> > > > >         at java.io.ObjectStreamClass$FieldReflector.
> > setObjFieldValues(
> > > > > ObjectStreamClass.java:2233)
> > > > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > > > ObjectStreamClass.java:1405)
> > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > ObjectInputStream.java:2288)
> > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > ObjectInputStream.java:2206)
> > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > ObjectInputStream.java:2064)
> > > > >         at java.io.ObjectInputStream.
> readObject0(ObjectInputStream.
> > > > > java:1568)
> > > > >         at java.io.ObjectInputStream.defaultReadFields(
> > > > > ObjectInputStream.java:2282)
> > > > >         at java.io.ObjectInputStream.readSerialData(
> > > > > ObjectInputStream.java:2206)
> > > > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > > > ObjectInputStream.java:2064)
> > > > >         at java.io.ObjectInputStream.
> readObject0(ObjectInputStream.
> > > > > java:1568)
> > > > >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > > > > java:428)
> > > > >         at org.apache.flink.util.InstantiationUtil.
> > deserializeObject(
> > > > > InstantiationUtil.java:290)
> > > > >         at org.apache.flink.util.InstantiationUtil.
> > > readObjectFromConfig(
> > > > > InstantiationUtil.java:248)
> > > > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > > > getStreamOperator(StreamConfig.java:220)
> > > > >
> > > > > {code}
> > > > >
> > > > > However, replacing the function with this results in everything
> > working
> > > > as
> > > > > expected:
> > > > >
> > > > > {code:java}
> > > > > val stream = env.fromCollection(someArray).filter(new
> > > > FilterFunction[Int]
> > > > > {
> > > > >       override def filter(t: Int): Boolean = true
> > > > >     })
> > > > > {code}
> > > > >
> > > > > Perhaps something changed in the new build compared to the
> previous,
> > as
> > > > > this was working without issue before?
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > This message was sent by Atlassian JIRA
> > > > > (v6.4.14#64029)
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > Shivam Sharma
> > > > Data Engineer @ Goibibo
> > > > Indian Institute Of Information Technology, Design and Manufacturing
> > > > Jabalpur
> > > > Mobile No- (+91) 8882114744
> > > > Email:- 28shivamsharma@gmail.com
> > > > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> > > > <https://www.linkedin.com/in/28shivamsharma>*
> > > >
> > >
> >
>



-- 
Shivam Sharma
Data Engineer @ Goibibo
Indian Institute Of Information Technology, Design and Manufacturing
Jabalpur
Mobile No- (+91) 8882114744
Email:- 28shivamsharma@gmail.com
LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
<https://www.linkedin.com/in/28shivamsharma>*

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message