Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2A3CA200C15 for ; Wed, 25 Jan 2017 06:53:34 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 28BF2160B4B; Wed, 25 Jan 2017 05:53:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 457EF160B3E for ; Wed, 25 Jan 2017 06:53:33 +0100 (CET) Received: (qmail 68349 invoked by uid 500); 25 Jan 2017 05:53:32 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 68338 invoked by uid 99); 25 Jan 2017 05:53:32 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jan 2017 05:53:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id CEA6AC158A for ; Wed, 25 Jan 2017 05:53:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.686 X-Spam-Level: X-Spam-Status: No, score=-0.686 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-2.999, URI_HEX=1.313] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id HZfaXWXF22f4 for ; Wed, 25 Jan 2017 05:53:30 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id E6F345F23A for ; Wed, 25 Jan 2017 05:53:29 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id A2385E0238 for ; Wed, 25 Jan 2017 05:53:26 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 5F67B25288 for ; Wed, 25 Jan 2017 05:53:26 +0000 (UTC) Date: Wed, 25 Jan 2017 05:53:26 +0000 (UTC) From: "Giuliano Caliari (JIRA)" To: dev@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Created] (FLINK-5633) ClassCastException: X cannot be cast to X when re-submitting a job. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Wed, 25 Jan 2017 05:53:34 -0000 Giuliano Caliari created FLINK-5633: --------------------------------------- Summary: ClassCastException: X cannot be cast to X when re-sub= mitting a job. Key: FLINK-5633 URL: https://issues.apache.org/jira/browse/FLINK-5633 Project: Flink Issue Type: Bug Components: Job-Submission, YARN Affects Versions: 1.1.4 Reporter: Giuliano Caliari Priority: Minor I=E2=80=99m running a job on my local cluster and the first time I submit t= he job everything works but whenever I cancel and re-submit the same job it= fails with: {quote} org.apache.flink.client.program.ProgramInvocationException: The program exe= cution failed: Job execution failed. =09at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:= 427) =09at org.apache.flink.client.program.StandaloneClusterClient.submitJob(Sta= ndaloneClusterClient.java:101) =09at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:= 400) =09at org.apache.flink.streaming.api.environment.StreamContextEnvironment.e= xecute(StreamContextEnvironment.java:66) =09at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execu= te(StreamExecutionEnvironment.scala:634) =09at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) =09at au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com= $my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) =09at au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(= TraitorApp.scala:21) =09at scala.Function0$class.apply$mcV$sp(Function0.scala:34) =09at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:= 12) =09at scala.App$$anonfun$main$1.apply(App.scala:76) =09at scala.App$$anonfun$main$1.apply(App.scala:76) =09at scala.collection.immutable.List.foreach(List.scala:381) =09at scala.collection.generic.TraversableForwarder$class.foreach(Traversab= leForwarder.scala:35) =09at scala.App$class.main(App.scala:76) =09at au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:= 21) =09at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala) =09at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) =09at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.= java:62) =09at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAcces= sorImpl.java:43) =09at java.lang.reflect.Method.invoke(Method.java:498) =09at org.apache.flink.client.program.PackagedProgram.callMainMethod(Packag= edProgram.java:528) =09at org.apache.flink.client.program.PackagedProgram.invokeInteractiveMode= ForExecution(PackagedProgram.java:419) =09at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:= 339) =09at org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:8= 31) =09at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) =09at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:= 1073) =09at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) =09at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) =09at org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOp= SecurityContext.java:29) =09at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execu= tion failed. =09at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage= $1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) =09at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage= $1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) =09at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage= $1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) =09at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(= Future.scala:24) =09at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.sca= la:24) =09at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) =09at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abst= ractDispatcher.scala:397) =09at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) =09at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool= .java:1339) =09at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:19= 79) =09at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThre= ad.java:107) Caused by: java.lang.RuntimeException: Could not forward element to next op= erator =09at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainin= gOutput.collect(OperatorChain.java:415) =09at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainin= gOutput.collect(OperatorChain.java:397) =09at org.apache.flink.streaming.api.operators.AbstractStreamOperator$Count= ingOutput.collect(AbstractStreamOperator.java:749) =09at org.apache.flink.streaming.api.operators.AbstractStreamOperator$Count= ingOutput.collect(AbstractStreamOperator.java:727) =09at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualW= atermarkContext.collectWithTimestamp(StreamSourceContexts.java:272) =09at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher= .emitRecordWithTimestamp(AbstractFetcher.java:261) =09at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.= emitRecord(Kafka010Fetcher.java:88) =09at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.r= unFetchLoop(Kafka09Fetcher.java:157) =09at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.ru= n(FlinkKafkaConsumerBase.java:255) =09at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSourc= e.java:78) =09at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSourc= e.java:55) =09at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceS= treamTask.java:56) =09at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask= .java:269) =09at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) =09at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassCastException: au.com.my.package.schema.p.WowTran= saction cannot be cast to au.com.my.package.schema.p.WowTransaction =09at au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply(Traitor.= scala:132) =09at org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscend= ingTimestamp(DataStream.scala:763) =09at org.apache.flink.streaming.api.functions.timestamps.AscendingTimestam= pExtractor.extractTimestamp(AscendingTimestampExtractor.java:72) =09at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWat= ermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java= :65) =09at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainin= gOutput.collect(OperatorChain.java:412) =09... 14 more {quote} This happens on versions 1.1.4 and 1.2 Here's a great description of the problem, provided by Yury Ruchin: {quote} In YARN setup there are several sources where classes are loaded from: Flin= k lib directory, YARN lib directories, user code. The first two sources are= handled by system classloader, the last one is loaded by FlinkUserCodeClas= sLoader. My streaming job parses Avro-encoded data using SpecificRecord facility. In= essence, the job looks like this: Source -> Avro parser (Map) -> Sink. Par= allelism is 1. Job operates inside a long-lived YARN session. I have a subc= lass of SpecificRecord, say it's name is MySpecificRecord. From class loadi= ng perspective, Avro library classes, including the SpecificRecord, are loa= ded by system class loader from YARN lib dir - such classes are shared acro= ss different Flink tasks within task manager. On the other side, MySpecific= Record is in the job fat jar, so it gets loaded by FlinkUserCodeClassLoader= . Upon every job restart, task gets a new FlinkUserCodeClassLoader instance= , so classes from user code are confined to a task instance. Simply put, the parsing itself looks like this: val bean =3D new SpecificDatumReader[MySpecificRecord](MySpecificRecord.get= ClassSchema).read(...) Now, the scenario: 1. I start my job. Parsing is initiated, so the SpecificDatumReader and Spe= cificData get loaded by system classloader. A new FlinkUserCodeClassloader = is instantiated, let's denote its instance as "A". MySpecificRecord then ge= ts loaded by A. 2. SpecificData gets a singleton SpecificData.INSTANCE that holds a cache t= hat maps some string key derived from Avro schema to the implementing class= . So during parsing I get MySpecificRecord (A) cached there. 3. I stop the job and re-submit it. The JVM process is the same, so all sta= ndard Avro classes, including SpecificData, remain loaded. A new task insta= nce is created and gets a new FlinkUserCodeClassLoader instance, let's name= it "B". A new MySpecificRecord class incarnation is loaded by B. From JVM = standpoint MySpecificRecord (B) is different from MySpecificRecord (A), eve= n though their bytecode is identical. 4. The job starts parsing again. SpecificDatumReader consults SpecificData.= INSTANCE's cache for any stashed classes and finds MySpecificRecord (A) the= re. 5. SpecificDatumReader uses the cached MySpecificRecord (A) to instantiate = a bean for filling the parsed data in. 6. SpecificDatumReader hands the filled instance of MySpecificRecord (A) ba= ck to job. 7. Job tries to cast MySpecificRecord (A) to MySpecificRecord (B). 8. ClassCastException :^( I fixed the issue by not using the SpecificData.INSTANCE singleton (even th= ough this is considered a common and expected practice). I feed every parse= r a new instance of SpecificData. This way the class cache is confined to a= parser instance and gets recycled along with it. {quote} A discussion the the error can be found at: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-= get-help-on-ClassCastException-when-re-submitting-a-job-td10972.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)