flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: [jira] [Created] (FLINK-8256) Cannot use Scala functions to filter in 1.4 - java.lang.ClassCastException
Date Thu, 14 Dec 2017 11:14:11 GMT
Hi,

I tried to reproduce the problem given your description Ryan. I submitted
the test job to a vanilla Flink 1.4.0 cluster (Hadoop-free version
downloaded from flink.apache.org, Hadoop 2.7 version donwloaded from
flink.apache.org and a cluster built from sources). However, I was not able
to reproduce the problem. Therefore I suspect that it has something to do
with your or my setup.

In order to further diagnose the problem, it would be tremendously helpful
if you could share the logs contained in the logs directory with us.

Cheers,
Till

On Thu, Dec 14, 2017 at 11:19 AM, Stephan Ewen <sewen@apache.org> wrote:

> Hi!
>
> This may be due to the changed classloading semantics.
>
> Just to verify this, can you check if it gets solved by setting the
> following in the Flink configuration: "classloader.resolve-order: parent-
> first"
>
> By default, Flink 1.4 uses now inverted classloading to allow users to use
> their own copies of dependencies, irrespective of what the underlying
> classpath is spoiled with. You can for example use a different Avro
> versions than Hadoop pull in, even without shading, or even different Akka
> / Jackson / etc versions.
>
> That is a nice improvement, but it may have some impacts on tools that have
> been build before. When you see classcast exceptions (like X cannot be cast
> to X), that is probably caused by the fact that the classloader duplicates
> a dependency from the JVM classpath in user-space, but objects/classes move
> between the domains.
>
> Stephan
>
> On Thu, Dec 14, 2017 at 8:57 AM, Shivam Sharma <28shivamsharma@gmail.com>
> wrote:
>
> > Same Issue I am facing :-
> > http://apache-flink-mailing-list-archive.1008284.n3.nabble.
> > com/Issues-in-migrating-code-from-1-3-2-to-1-4-0-td20461.html
> >
> > Can anyone explain the exception
> >
> > Thanks
> >
> > On Thu, Dec 14, 2017 at 1:45 AM, Ryan Brideau (JIRA) <jira@apache.org>
> > wrote:
> >
> > > Ryan Brideau created FLINK-8256:
> > > -----------------------------------
> > >
> > >              Summary: Cannot use Scala functions to filter in 1.4 -
> > > java.lang.ClassCastException
> > >                  Key: FLINK-8256
> > >                  URL: https://issues.apache.org/jira/browse/FLINK-8256
> > >              Project: Flink
> > >           Issue Type: Bug
> > >           Components: DataStream API
> > >     Affects Versions: 1.4.0
> > >          Environment: macOS, Local Flink v1.4.0, Scala 2.11
> > >             Reporter: Ryan Brideau
> > >
> > >
> > > I built the newest release locally today, but when I try to filter a
> > > stream using an anonymous or named function, I get an error. Here's a
> > > simple example:
> > >
> > >
> > > {code:java}
> > > import org.apache.flink.api.java.utils.ParameterTool
> > > import org.apache.flink.streaming.api.scala._
> > >
> > > object TestFunction {
> > >
> > >   def main(args: Array[String]): Unit = {
> > >
> > >     val env = StreamExecutionEnvironment.getExecutionEnvironment
> > >     val params = ParameterTool.fromArgs(args)
> > >     env.getConfig.setGlobalJobParameters(params)
> > >
> > >     val someArray = Array(1,2,3)
> > >     val stream = env.fromCollection(someArray).filter(_ => true)
> > >     stream.print().setParallelism(1)
> > >     env.execute("Testing Function")
> > >   }
> > > }
> > > {code}
> > >
> > > This results in:
> > >
> > >
> > > {code:java}
> > > Job execution switched to status FAILING.
> > > org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> > > instantiate user function.
> > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > getStreamOperator(StreamConfig.java:235)
> > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > createChainedOperator(OperatorChain.java:355)
> > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > createOutputCollector(OperatorChain.java:282)
> > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> > > init>(OperatorChain.java:126)
> > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > invoke(StreamTask.java:231)
> > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:718)
> > >         at java.lang.Thread.run(Thread.java:748)
> > > Caused by: java.lang.ClassCastException: cannot assign instance of
> > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of
> > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > api.scala.DataStream$$anon$7
> > >         at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> > > ObjectStreamClass.java:2233)
> > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > ObjectStreamClass.java:1405)
> > >         at java.io.ObjectInputStream.defaultReadFields(
> > > ObjectInputStream.java:2288)
> > >         at java.io.ObjectInputStream.readSerialData(
> > > ObjectInputStream.java:2206)
> > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > ObjectInputStream.java:2064)
> > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > java:1568)
> > >         at java.io.ObjectInputStream.defaultReadFields(
> > > ObjectInputStream.java:2282)
> > >         at java.io.ObjectInputStream.readSerialData(
> > > ObjectInputStream.java:2206)
> > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > ObjectInputStream.java:2064)
> > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > java:1568)
> > >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > > java:428)
> > >         at org.apache.flink.util.InstantiationUtil.deserializeObject(
> > > InstantiationUtil.java:290)
> > >         at org.apache.flink.util.InstantiationUtil.
> readObjectFromConfig(
> > > InstantiationUtil.java:248)
> > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > getStreamOperator(StreamConfig.java:220)
> > >         ... 6 more
> > > 12/13/2017 15:10:01     Job execution switched to status FAILED.
> > >
> > > ------------------------------------------------------------
> > >  The program finished with the following exception:
> > >
> > > org.apache.flink.client.program.ProgramInvocationException: The
> program
> > > execution failed: Job execution failed.
> > >         at org.apache.flink.client.program.ClusterClient.run(
> > > ClusterClient.java:492)
> > >         at org.apache.flink.client.program.StandaloneClusterClient.
> > > submitJob(StandaloneClusterClient.java:105)
> > >         at org.apache.flink.client.program.ClusterClient.run(
> > > ClusterClient.java:456)
> > >         at org.apache.flink.streaming.api.environment.
> > > StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
> > >         at org.apache.flink.streaming.api.scala.
> > > StreamExecutionEnvironment.execute(StreamExecutionEnvironment.
> scala:638)
> > >         at org.peopleinmotion.TestFunction$.main(
> TestFunction.scala:20)
> > >         at org.peopleinmotion.TestFunction.main(TestFunction.scala)
> > >         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> > >         at sun.reflect.NativeMethodAccessorImpl.invoke(
> > > NativeMethodAccessorImpl.java:62)
> > >         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> > > DelegatingMethodAccessorImpl.java:43)
> > >         at java.lang.reflect.Method.invoke(Method.java:498)
> > >         at org.apache.flink.client.program.PackagedProgram.
> callMainMeth
> > od(
> > > PackagedProgram.java:525)
> > >         at org.apache.flink.client.program.PackagedProgram.
> > > invokeInteractiveModeForExecution(PackagedProgram.java:417)
> > >         at org.apache.flink.client.program.ClusterClient.run(
> > > ClusterClient.java:396)
> > >         at org.apache.flink.client.CliFrontend.executeProgram(
> > > CliFrontend.java:802)
> > >         at org.apache.flink.client.CliFrontend.run(CliFrontend.
> java:282)
> > >         at org.apache.flink.client.CliFrontend.parseParameters(
> > > CliFrontend.java:1054)
> > >         at org.apache.flink.client.CliFrontend$1.call(
> > > CliFrontend.java:1101)
> > >         at org.apache.flink.client.CliFrontend$1.call(
> > > CliFrontend.java:1098)
> > >         at java.security.AccessController.doPrivileged(Native Method)
> > >         at javax.security.auth.Subject.doAs(Subject.java:422)
> > >         at org.apache.hadoop.security.UserGroupInformation.doAs(
> > > UserGroupInformation.java:1556)
> > >         at org.apache.flink.runtime.security.HadoopSecurityContext.
> > > runSecured(HadoopSecurityContext.java:41)
> > >         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:
> > 1098)
> > > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> > > execution failed.
> > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$
> > > mcV$sp(JobManager.scala:897)
> > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > ger.scala:840)
> > >         at org.apache.flink.runtime.jobmanager.JobManager$$
> > > anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobMana
> > ger.scala:840)
> > >         at scala.concurrent.impl.Future$PromiseCompletingRunnable.
> > > liftedTree1$1(Future.scala:24)
> > >         at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(
> > > Future.scala:24)
> > >         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:
> 39)
> > >         at akka.dispatch.ForkJoinExecutorConfigurator$
> > > AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> > >         at scala.concurrent.forkjoin.ForkJoinTask.doExec(
> > > ForkJoinTask.java:260)
> > >         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > > runTask(ForkJoinPool.java:1339)
> > >         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > > ForkJoinPool.java:1979)
> > >         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > > ForkJoinWorkerThread.java:107)
> > > Caused by: org.apache.flink.streaming.runtime.tasks.
> StreamTaskException:
> > > Cannot instantiate user function.
> > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > getStreamOperator(StreamConfig.java:235)
> > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > createChainedOperator(OperatorChain.java:355)
> > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.
> > > createOutputCollector(OperatorChain.java:282)
> > >         at org.apache.flink.streaming.runtime.tasks.OperatorChain.<
> > > init>(OperatorChain.java:126)
> > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > invoke(StreamTask.java:231)
> > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:718)
> > >         at java.lang.Thread.run(Thread.java:748)
> > > Caused by: java.lang.ClassCastException: cannot assign instance of
> > > org.peopleinmotion.TestFunction$$anonfun$1 to field
> > > org.apache.flink.streaming.api.scala.DataStream$$anon$7.cleanFun$6 of
> > > type scala.Function1 in instance of org.apache.flink.streaming.
> > > api.scala.DataStream$$anon$7
> > >         at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> > > ObjectStreamClass.java:2233)
> > >         at java.io.ObjectStreamClass.setObjFieldValues(
> > > ObjectStreamClass.java:1405)
> > >         at java.io.ObjectInputStream.defaultReadFields(
> > > ObjectInputStream.java:2288)
> > >         at java.io.ObjectInputStream.readSerialData(
> > > ObjectInputStream.java:2206)
> > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > ObjectInputStream.java:2064)
> > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > java:1568)
> > >         at java.io.ObjectInputStream.defaultReadFields(
> > > ObjectInputStream.java:2282)
> > >         at java.io.ObjectInputStream.readSerialData(
> > > ObjectInputStream.java:2206)
> > >         at java.io.ObjectInputStream.readOrdinaryObject(
> > > ObjectInputStream.java:2064)
> > >         at java.io.ObjectInputStream.readObject0(ObjectInputStream.
> > > java:1568)
> > >         at java.io.ObjectInputStream.readObject(ObjectInputStream.
> > > java:428)
> > >         at org.apache.flink.util.InstantiationUtil.deserializeObject(
> > > InstantiationUtil.java:290)
> > >         at org.apache.flink.util.InstantiationUtil.
> readObjectFromConfig(
> > > InstantiationUtil.java:248)
> > >         at org.apache.flink.streaming.api.graph.StreamConfig.
> > > getStreamOperator(StreamConfig.java:220)
> > >
> > > {code}
> > >
> > > However, replacing the function with this results in everything working
> > as
> > > expected:
> > >
> > > {code:java}
> > > val stream = env.fromCollection(someArray).filter(new
> > FilterFunction[Int]
> > > {
> > >       override def filter(t: Int): Boolean = true
> > >     })
> > > {code}
> > >
> > > Perhaps something changed in the new build compared to the previous, as
> > > this was working without issue before?
> > >
> > >
> > >
> > > --
> > > This message was sent by Atlassian JIRA
> > > (v6.4.14#64029)
> > >
> >
> >
> >
> > --
> > Shivam Sharma
> > Data Engineer @ Goibibo
> > Indian Institute Of Information Technology, Design and Manufacturing
> > Jabalpur
> > Mobile No- (+91) 8882114744
> > Email:- 28shivamsharma@gmail.com
> > LinkedIn:-*https://www.linkedin.com/in/28shivamsharma
> > <https://www.linkedin.com/in/28shivamsharma>*
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message