Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 61D15200C00 for ; Wed, 18 Jan 2017 10:41:49 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 60217160B44; Wed, 18 Jan 2017 09:41:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DA81F160B3A for ; Wed, 18 Jan 2017 10:41:47 +0100 (CET) Received: (qmail 79967 invoked by uid 500); 18 Jan 2017 09:41:46 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 79956 invoked by uid 99); 18 Jan 2017 09:41:46 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Jan 2017 09:41:46 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 74AC7C00B6 for ; Wed, 18 Jan 2017 09:41:46 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 4.541 X-Spam-Level: **** X-Spam-Status: No, score=4.541 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_REPLY=1, HTML_MESSAGE=2, HTML_OBFUSCATE_10_20=1.162, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id rhZePCVHMfye for ; Wed, 18 Jan 2017 09:41:41 +0000 (UTC) Received: from mail-oi0-f50.google.com (mail-oi0-f50.google.com [209.85.218.50]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 8DDEF5FB0F for ; Wed, 18 Jan 2017 09:41:40 +0000 (UTC) Received: by mail-oi0-f50.google.com with SMTP id w204so4038372oiw.0 for ; Wed, 18 Jan 2017 01:41:40 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=STcyqIzjDnSylZq1j6PTLKVDr2UTYHCSfxZooFzRMdw=; b=OUVQl9sxU5G+eygQ4igC/a+jkHUQX1XaPI46t6UdxpXUJk2h1FHcIwhgbus4EiTmdm rCQELYVA4oB2FfIIObwJ1swS9KDW/QISusDz85N0BFulaqm00II6dDSMiwj2Ot+nh8yp pAKazXa7Rfp42hYQoeIR3BaTzwiNGsg1Ht8/zZviqHzUaNLezXE5LFGCn6lUXKmhitp4 2pNtt+G5oZuIxY3hvI5Jj4TWYuxJO1mWpC1oUcbjuzk8g17cmVY7K1Fta5dAzzmqVKNQ ZOZ7pJ632jwvTfRe7nO56hmfs6Zgkdz2ahKlYww44oQEDfqcUbFG6nHj4fau96jayA3K eAjQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=STcyqIzjDnSylZq1j6PTLKVDr2UTYHCSfxZooFzRMdw=; b=KfsXVLWd2ik+6HiLzsv96rRkWcMfy3m7HN9iKTQHmbdNRPakLAln1gBHiRN9nn5+U8 p8Juf1fvPIC02zjj51txiB7pGfWBph6nTZ9K0GrmAIccn4Kack3L6aIkCPyBzM5v5OoB cNwBLVtmrREyYy1EorZKV9X3OcR0JwcUjvZtskfrBrBfsyS+WdzPLnRouUldVJ3EtjLu /JTNIFFKenBmpS56AN91l8sZKWKfs/c2Zga3gENia0oJ0jAQubZlt3X77Fvtzbksbefb NelmnCcFAYYA1K8YRFD2cBA0bNorFzsQVIi88Z8CIlLOVw7FGUNYIaTPq/yX4sa1y1bj dFPQ== X-Gm-Message-State: AIkVDXJNYp8mc1UtNp0OFfPf+gltQX8/5659qc/J6uEPf3wjYUIt1/+Mq4msXrCsvO3xCSqUO5eKCwszQMz+8Q== X-Received: by 10.202.79.151 with SMTP id d145mr1215799oib.159.1484732495984; Wed, 18 Jan 2017 01:41:35 -0800 (PST) MIME-Version: 1.0 Received: by 10.157.1.184 with HTTP; Wed, 18 Jan 2017 01:41:35 -0800 (PST) In-Reply-To: References: From: Yury Ruchin Date: Wed, 18 Jan 2017 12:41:35 +0300 Message-ID: Subject: Re: How to get help on ClassCastException when re-submitting a job To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a113d6b7a027ebd05465b3981 archived-at: Wed, 18 Jan 2017 09:41:49 -0000 --001a113d6b7a027ebd05465b3981 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable For my case I tracked down the culprit. It's been Avro indeed. I'm providing details below, since I believe the pattern is pretty common for such issues. In YARN setup there are several sources where classes are loaded from: Flink lib directory, YARN lib directories, user code. The first two sources are handled by system classloader, the last one is loaded by FlinkUserCodeClassLoader. My streaming job parses Avro-encoded data using SpecificRecord facility. In essence, the job looks like this: Source -> Avro parser (Map) -> Sink. Parallelism is 1. Job operates inside a long-lived YARN session. I have a subclass of SpecificRecord, say it's name is MySpecificRecord. From class loading perspective, Avro library classes, including the SpecificRecord, are loaded by system class loader from YARN lib dir - such classes are shared across different Flink tasks within task manager. On the other side, MySpecificRecord is in the job fat jar, so it gets loaded by FlinkUserCodeClassLoader. Upon every job restart, task gets a new FlinkUserCodeClassLoader instance, so classes from user code are confined to a task instance. Simply put, the parsing itself looks like this: val bean =3D new SpecificDatumReader[MySpecificRecord](MySpecificRecord.getClassSchema).read= (...) Now, the scenario: 1. I start my job. Parsing is initiated, so the SpecificDatumReader and SpecificData get loaded by system classloader. A new FlinkUserCodeClassloader is instantiated, let's denote its instance as "A". MySpecificRecord then gets loaded by A. 2. SpecificData gets a singleton SpecificData.INSTANCE that holds a cache that maps some string key derived from Avro schema to the implementing class. So during parsing I get MySpecificRecord (A) cached there. 3. I stop the job and re-submit it. The JVM process is the same, so all standard Avro classes, including SpecificData, remain loaded. A new task instance is created and gets a new FlinkUserCodeClassLoader instance, let's name it "B". A new MySpecificRecord class incarnation is loaded by B. From JVM standpoint MySpecificRecord (B) is different from MySpecificRecord (A), even though their bytecode is identical. 4. The job starts parsing again. SpecificDatumReader consults SpecificData.INSTANCE's cache for any stashed classes and finds MySpecificRecord (A) there. 5. SpecificDatumReader uses the cached MySpecificRecord (A) to instantiate a bean for filling the parsed data in. 6. SpecificDatumReader hands the filled instance of MySpecificRecord (A) back to job. 7. Job tries to cast MySpecificRecord (A) to MySpecificRecord (B). 8. ClassCastException :^( I fixed the issue by not using the SpecificData.INSTANCE singleton (even though this is considered a common and expected practice). I feed every parser a new instance of SpecificData. This way the class cache is confined to a parser instance and gets recycled along with it. Hope this helps, Yury 2017-01-16 14:03 GMT+03:00 Stephan Ewen : > Hi! > > I think Yury pointed out the correct diagnosis. Caching the classes acros= s > multiple jobs in the same session can cause these types of issues. > > For YARN single-job deployments, Flink 1.2 will not to any dynamic > classloading any more, but start with everything in the application > classpath. > For YARN sessions, Flink 1.2 still uses dynamic loading, to re-use hot > containers. > > Best, > Stephan > > > > On Mon, Jan 16, 2017 at 11:07 AM, Ufuk Celebi wrote: > >> @Giuliano: any updates? Very curious to figure out what's causing >> this. As Fabian said, this is most likely a class loading issue. >> Judging from the stack trace, you are not running with YARN but a >> standalone cluster. Is that correct? Class loading wise nothing >> changed between Flink 1.1 and Flink 1.2 with respect to class loading >> and standalone clusters. Did you put any JARs into the lib folder of >> Flink before submitting the job? >> >> =E2=80=93 Ufuk >> >> On Thu, Jan 12, 2017 at 7:16 PM, Yury Ruchin >> wrote: >> > Hi, >> > >> > I'd like to chime in since I've faced the same issue running Flink >> 1.1.4. I >> > have a long-running YARN session which I use to run multiple streaming >> jobs >> > concurrently. Once after cancelling and resubmitting the job I saw the >> "X >> > cannot be cast to X" ClassCastException exception in logs. I restarted >> YARN >> > session, then the problem disappeared. >> > >> > The class that failed to be cast was autogenerated by Avro compiler. I >> know >> > that Avro's Java binding does caching schemas in some static >> WeakHashMap. >> > I'm wondering whether that may step in the way of Flink classloading >> design. >> > >> > Anyway, I would be interested in watching the issue in Flink JIRA. >> > >> > Giuliano, could you provide the issue number? >> > >> > Thanks, >> > Yury >> > >> > 2017-01-11 14:11 GMT+03:00 Fabian Hueske : >> >> >> >> Hi Guiliano, >> >> >> >> thanks for bringing up this issue. >> >> A "ClassCastException: X cannot be cast to X" often points to a >> >> classloader issue. >> >> So it might actually be a bug in Flink. >> >> >> >> I assume you submit the same application (same jar file) with the sam= e >> >> command right? >> >> Did you cancel the job before resubmitting? >> >> >> >> Can you create a JIRA issue [1] for this bug (hit the read CREATE >> button >> >> on top) and include the commit hash from which you built Flink? >> >> It would be great if you could provide a short example program and >> >> instructions how to reproduce the problem. >> >> >> >> Thank you very much, >> >> Fabian >> >> >> >> [1] https://issues.apache.org/jira/browse/FLINK >> >> >> >> >> >> >> >> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari > >: >> >>> >> >>> Hello, >> >>> >> >>> >> >>> >> >>> I need some guidance on how to report a bug. >> >>> >> >>> >> >>> >> >>> I=E2=80=99m testing version 1.2 on my local cluster and the first ti= me I >> submit >> >>> the job everything works but whenever I re-submit the same job it >> fails with >> >>> >> >>> org.apache.flink.client.program.ProgramInvocationException: The >> program >> >>> execution failed: Job execution failed. >> >>> >> >>> at >> >>> org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:427) >> >>> >> >>> at >> >>> org.apache.flink.client.program.StandaloneClusterClient.subm >> itJob(StandaloneClusterClient.java:101) >> >>> >> >>> at >> >>> org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:400) >> >>> >> >>> at >> >>> org.apache.flink.streaming.api.environment.StreamContextEnvi >> ronment.execute(StreamContextEnvironment.java:66) >> >>> >> >>> at >> >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironm >> ent.execute(StreamExecutionEnvironment.scala:634) >> >>> >> >>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147) >> >>> >> >>> at >> >>> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoi >> nt$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22) >> >>> >> >>> at >> >>> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$ >> body.apply(TraitorApp.scala:21) >> >>> >> >>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34) >> >>> >> >>> at >> >>> scala.runtime.AbstractFunction0.apply$mcV$sp( >> AbstractFunction0.scala:12) >> >>> >> >>> at scala.App$$anonfun$main$1.apply(App.scala:76) >> >>> >> >>> at scala.App$$anonfun$main$1.apply(App.scala:76) >> >>> >> >>> at scala.collection.immutable.List.foreach(List.scala:381) >> >>> >> >>> at >> >>> scala.collection.generic.TraversableForwarder$class.foreach( >> TraversableForwarder.scala:35) >> >>> >> >>> at scala.App$class.main(App.scala:76) >> >>> >> >>> at >> >>> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorA >> pp.scala:21) >> >>> >> >>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorAp >> p.scala) >> >>> >> >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >>> >> >>> at >> >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >> ssorImpl.java:62) >> >>> >> >>> at >> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >> thodAccessorImpl.java:43) >> >>> >> >>> at java.lang.reflect.Method.invoke(Method.java:498) >> >>> >> >>> at >> >>> org.apache.flink.client.program.PackagedProgram.callMainMeth >> od(PackagedProgram.java:528) >> >>> >> >>> at >> >>> org.apache.flink.client.program.PackagedProgram.invokeIntera >> ctiveModeForExecution(PackagedProgram.java:419) >> >>> >> >>> at >> >>> org.apache.flink.client.program.ClusterClient.run(ClusterCli >> ent.java:339) >> >>> >> >>> at >> >>> org.apache.flink.client.CliFrontend.executeProgram(CliFronte >> nd.java:831) >> >>> >> >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) >> >>> >> >>> at >> >>> org.apache.flink.client.CliFrontend.parseParameters(CliFront >> end.java:1073) >> >>> >> >>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) >> >>> >> >>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) >> >>> >> >>> at >> >>> org.apache.flink.runtime.security.NoOpSecurityContext.runSec >> ured(NoOpSecurityContext.java:29) >> >>> >> >>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) >> >>> >> >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Jo= b >> >>> execution failed. >> >>> >> >>> at >> >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ >> handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900= ) >> >>> >> >>> at >> >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ >> handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) >> >>> >> >>> at >> >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ >> handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) >> >>> >> >>> at >> >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.lifte >> dTree1$1(Future.scala:24) >> >>> >> >>> at >> >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(F >> uture.scala:24) >> >>> >> >>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >> >>> >> >>> at >> >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask. >> exec(AbstractDispatcher.scala:397) >> >>> >> >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask. >> java:260) >> >>> >> >>> at >> >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask( >> ForkJoinPool.java:1339) >> >>> >> >>> at >> >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo >> l.java:1979) >> >>> >> >>> at >> >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW >> orkerThread.java:107) >> >>> >> >>> Caused by: java.lang.RuntimeException: Could not forward element to >> next >> >>> operator >> >>> >> >>> at >> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.collect(OperatorChain.java:415) >> >>> >> >>> at >> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.collect(OperatorChain.java:397) >> >>> >> >>> at >> >>> org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor$CountingOutput.collect(AbstractStreamOperator.java:749) >> >>> >> >>> at >> >>> org.apache.flink.streaming.api.operators.AbstractStreamOpera >> tor$CountingOutput.collect(AbstractStreamOperator.java:727) >> >>> >> >>> at >> >>> org.apache.flink.streaming.api.operators.StreamSourceContext >> s$ManualWatermarkContext.collectWithTimestamp(StreamSourceCo >> ntexts.java:272) >> >>> >> >>> at >> >>> org.apache.flink.streaming.connectors.kafka.internals.Abstra >> ctFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261) >> >>> >> >>> at >> >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka01 >> 0Fetcher.emitRecord(Kafka010Fetcher.java:88) >> >>> >> >>> at >> >>> org.apache.flink.streaming.connectors.kafka.internal.Kafka09 >> Fetcher.runFetchLoop(Kafka09Fetcher.java:157) >> >>> >> >>> at >> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum >> erBase.run(FlinkKafkaConsumerBase.java:255) >> >>> >> >>> at >> >>> org.apache.flink.streaming.api.operators.StreamSource.run( >> StreamSource.java:78) >> >>> >> >>> at >> >>> org.apache.flink.streaming.api.operators.StreamSource.run( >> StreamSource.java:55) >> >>> >> >>> at >> >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask. >> run(SourceStreamTask.java:56) >> >>> >> >>> at >> >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >> StreamTask.java:269) >> >>> >> >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654) >> >>> >> >>> at java.lang.Thread.run(Thread.java:745) >> >>> >> >>> Caused by: java.lang.ClassCastException: >> >>> au.com.my.package.schema.p.WowTransaction cannot be cast to >> >>> au.com.my.package.schema.p.WowTransaction >> >>> >> >>> at >> >>> au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply >> (Traitor.scala:132) >> >>> >> >>> at >> >>> org.apache.flink.streaming.api.scala.DataStream$$anon$1.extr >> actAscendingTimestamp(DataStream.scala:763) >> >>> >> >>> at >> >>> org.apache.flink.streaming.api.functions.timestamps.Ascendin >> gTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72= ) >> >>> >> >>> at >> >>> org.apache.flink.streaming.runtime.operators.TimestampsAndPe >> riodicWatermarksOperator.processElement(TimestampsAndPe >> riodicWatermarksOperator.java:65) >> >>> >> >>> at >> >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Copyi >> ngChainingOutput.collect(OperatorChain.java:412) >> >>> >> >>> ... 14 more >> >>> >> >>> >> >>> I'm running a flink cluster built from the "release-1.2" branch on >> >>> github. >> >>> >> >>> >> >>> How can I validate that this is a Flink big? >> >>> >> >>> Where can I report this? >> >>> >> >>> What sort of information do I need to provide? >> >>> >> >>> >> >>> >> >>> Cheers, >> >>> >> >>> Giuliano Caliari >> >>> -- >> >>> -- >> >>> Giuliano Caliari (+55 11 984898464) >> >>> +Google >> >>> Twitter >> >>> >> >>> Master Software Engineer by Escola Polit=C3=A9cnica da USP >> >>> Bachelor in Computer Science by Instituto de Matem=C3=A1tica e Estat= =C3=ADstica >> da >> >>> USP >> >>> >> >> >> > >> > > --001a113d6b7a027ebd05465b3981 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
For my case I tracked down the culprit. It's been Avro= indeed. I'm providing details below, since I believe the pattern is pr= etty common for such issues.

In YARN setup there are sev= eral sources where classes are loaded from: Flink lib directory, YARN lib d= irectories, user code. The first two sources are handled by system classloa= der, the last one is loaded by FlinkUserCodeClassLoader.

My streaming job parses Avro-encoded data using SpecificRecord facil= ity. In essence, the job looks like this: Source -> Avro parser (Map) -> Sink. Parallelism is 1. Job = operates inside a long-lived YARN session. I have a subclass of SpecificRec= ord, say it's name is MySpecificRecord. From class loading perspective,= Avro library classes, including the SpecificRecord, are loaded by system c= lass loader from YARN lib dir - such classes are shared across different Fl= ink tasks within task manager. On the other side, MySpecificRecord is in th= e job fat jar, so it gets loaded by FlinkUserCodeClassLoader. Upon every jo= b restart, task gets a new FlinkUserCodeClassLoader instance, so classes fr= om user code are confined to a task instance.

Simp= ly put, the parsing itself looks like this:

val bean =3D new SpecificDatumReader[MySpecif= icRecord](MySpecificRecord.getClassSchema).read(...)

<= /div>
Now, the scenario:

1. I start my job. Pa= rsing is initiated, so the SpecificDatumReader and SpecificData get loaded = by system classloader. A new FlinkUserCodeClassloader is instantiated, let&= #39;s denote its instance as "A". MySpecificRecord then gets load= ed by A.

2. SpecificData gets a singleton Specific= Data.INSTANCE that holds a cache that maps some string key derived from Avr= o schema to the implementing class. So during parsing I get MySpecificRecor= d (A) cached there.

3. I stop the job and re-submi= t it. The JVM process is the same, so all standard Avro classes, including = SpecificData, remain loaded. A new task instance is created and gets a new = FlinkUserCodeClassLoader instance, let's name it "B". A new M= ySpecificRecord class incarnation is loaded by B. From JVM standpoint MySpe= cificRecord (B) is different from MySpecificRecord (A), even though their b= ytecode is identical.

4. The job starts parsing ag= ain. SpecificDatumReader consults SpecificData.INSTANCE's cache for any= stashed classes and finds MySpecificRecord (A) there.

=
5. SpecificDatumReader uses the cached MySpecificRecord (A) to instant= iate a bean for filling the parsed data in.

6. Spe= cificDatumReader hands the filled instance of MySpecificRecord (A) back to = job.

7. Job tries to cast MySpecificRecord (A) to = MySpecificRecord (B).

8. ClassCastException :^(

I fixed the issue by not using the SpecificData.INST= ANCE singleton (even though this is considered a common and expected practi= ce). I feed every parser a new instance of SpecificData. This way the class= cache is confined to a parser instance and gets recycled along with it.

Hope this helps,
Yury

2017-01-16 14:03 GMT+03:00= Stephan Ewen <sewen@apache.org>:
Hi!

I think Yury pointed out the= correct diagnosis. Caching the classes across multiple jobs in the same se= ssion can cause these types of issues.

For YARN si= ngle-job deployments, Flink 1.2 will not to any dynamic classloading any mo= re, but start with everything in the application classpath.
For Y= ARN sessions, Flink 1.2 still uses dynamic loading, to re-use hot container= s.

Best,
Stephan



On Mon, Jan 16, 2017 at 11:07 AM, = Ufuk Celebi <uce@apache.org> wrote:
@Giuliano: any updates? Very curious to figure out what's causi= ng
this. As Fabian said, this is most likely a class loading issue.
Judging from the stack trace, you are not running with YARN but a
standalone cluster. Is that correct? Class loading wise nothing
changed between Flink 1.1 and Flink 1.2 with respect to class loading
and standalone clusters. Did you put any JARs into the lib folder of
Flink before submitting the job?

=E2=80=93 Ufuk

On Thu, Jan 12, 2017 at 7:16 PM, Yury Ruchin <yuri.ruchin@gmail.com> wrote:
> Hi,
>
> I'd like to chime in since I've faced the same issue running F= link 1.1.4. I
> have a long-running YARN session which I use to run multiple streaming= jobs
> concurrently. Once after cancelling and resubmitting the job I saw the= "X
> cannot be cast to X" ClassCastException exception in logs. I rest= arted YARN
> session, then the problem disappeared.
>
> The class that failed to be cast was autogenerated by Avro compiler. I= know
> that Avro's Java binding does caching schemas in some static WeakH= ashMap.
> I'm wondering whether that may step in the way of Flink classloadi= ng design.
>
> Anyway, I would be interested in watching the issue in Flink JIRA.
>
> Giuliano, could you provide the issue number?
>
> Thanks,
> Yury
>
> 2017-01-11 14:11 GMT+03:00 Fabian Hueske <fhueske@gmail.com>:
>>
>> Hi Guiliano,
>>
>> thanks for bringing up this issue.
>> A "ClassCastException: X cannot be cast to X" often poin= ts to a
>> classloader issue.
>> So it might actually be a bug in Flink.
>>
>> I assume you submit the same application (same jar file) with the = same
>> command right?
>> Did you cancel the job before resubmitting?
>>
>> Can you create a JIRA issue [1] for this bug (hit the read CREATE = button
>> on top) and include the commit hash from which you built Flink? >> It would be great if you could provide a short example program and=
>> instructions how to reproduce the problem.
>>
>> Thank you very much,
>> Fabian
>>
>> [1] https://issues.apache.org/jira/browse/F= LINK
>>
>>
>>
>> 2017-01-11 1:22 GMT+01:00 Giuliano Caliari <giuliano.caliari@gmail.com= >:
>>>
>>> Hello,
>>>
>>>
>>>
>>> I need some guidance on how to report a bug.
>>>
>>>
>>>
>>> I=E2=80=99m testing version 1.2 on my local cluster and the fi= rst time I submit
>>> the job everything works but whenever I re-submit the same job= it fails with
>>>
>>> org.apache.flink.client.program.ProgramInvocationExceptio= n: The program
>>> execution failed: Job execution failed.
>>>
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(Cluster= Client.java:427)
>>>
>>> at
>>> org.apache.flink.client.program.StandaloneClusterClient.s= ubmitJob(StandaloneClusterClient.java:101)
>>>
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(Cluster= Client.java:400)
>>>
>>> at
>>> org.apache.flink.streaming.api.environment.StreamContextE= nvironment.execute(StreamContextEnvironment.java:66)
>>>
>>> at
>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvir= onment.execute(StreamExecutionEnvironment.scala:634)
>>>
>>> at au.com.my.package.pTraitor.OneTrait.execute(Traitor.sc= ala:147)
>>>
>>> at
>>> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEnd= point$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>>>
>>> at
>>> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit= $body.apply(TraitorApp.scala:21)
>>>
>>> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)=
>>>
>>> at
>>> scala.runtime.AbstractFunction0.apply$mcV$sp(Abstrac= tFunction0.scala:12)
>>>
>>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>>
>>> at scala.App$$anonfun$main$1.apply(App.scala:76)
>>>
>>> at scala.collection.immutable.List.foreach(List.scala:381= )
>>>
>>> at
>>> scala.collection.generic.TraversableForwarder$class.forea= ch(TraversableForwarder.scala:35)
>>>
>>> at scala.App$class.main(App.scala:76)
>>>
>>> at
>>> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(Trait= orApp.scala:21)
>>>
>>> at au.com.my.package.pTraitor.TraitorAppOneTrait.main(Tra= itorApp.scala)
>>>
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Me= thod)
>>>
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodA= ccessorImpl.java:62)
>>>
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(Delegatin= gMethodAccessorImpl.java:43)
>>>
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainM= ethod(PackagedProgram.java:528)
>>>
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInt= eractiveModeForExecution(PackagedProgram.java:419)
>>>
>>> at
>>> org.apache.flink.client.program.ClusterClient.run(Cluster= Client.java:339)
>>>
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgram(CliFro= ntend.java:831)
>>>
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.ja= va:256)
>>>
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFr= ontend.java:1073)
>>>
>>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend= .java:1120)
>>>
>>> at org.apache.flink.client.CliFrontend$2.call(CliFrontend= .java:1117)
>>>
>>> at
>>> org.apache.flink.runtime.security.NoOpSecurityContext.run= Secured(NoOpSecurityContext.java:29)
>>>
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.j= ava:1116)
>>>
>>> Caused by: org.apache.flink.runtime.client.JobExecutionEx= ception: Job
>>> execution failed.
>>>
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$<= wbr>handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManage= r.scala:900)
>>>
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$<= wbr>handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala= :843)
>>>
>>> at
>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$<= wbr>handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala= :843)
>>>
>>> at
>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.li= ftedTree1$1(Future.scala:24)
>>>
>>> at
>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.ru= n(Future.scala:24)
>>>
>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.sc= ala:40)
>>>
>>> at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTa= sk.exec(AbstractDispatcher.scala:397)
>>>
>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoin= Task.java:260)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(= ForkJoinPool.java:1339)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoin= Pool.java:1979)
>>>
>>> at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJo= inWorkerThread.java:107)
>>>
>>> Caused by: java.lang.RuntimeException: Could not forward eleme= nt to next
>>> operator
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Co= pyingChainingOutput.collect(OperatorChain.java:415)
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Co= pyingChainingOutput.collect(OperatorChain.java:397)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOp= erator$CountingOutput.collect(AbstractStreamOperator.java:749) >>>
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOp= erator$CountingOutput.collect(AbstractStreamOperator.java:727) >>>
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceCont= exts$ManualWatermarkContext.collectWithTimestamp(StreamSourceContexts.java:272)
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.Abs= tractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261) >>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.Kafk= a010Fetcher.emitRecord(Kafka010Fetcher.java:88)
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.Kafk= a09Fetcher.runFetchLoop(Kafka09Fetcher.java:157)
>>>
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaCon= sumerBase.run(FlinkKafkaConsumerBase.java:255)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run= (StreamSource.java:78)
>>>
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run= (StreamSource.java:55)
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask= .run(SourceStreamTask.java:56)
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invok= e(StreamTask.java:269)
>>>
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.jav= a:654)
>>>
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> Caused by: java.lang.ClassCastException:
>>> au.com.my.package.schema.p.WowTransaction cannot be cast = to
>>> au.com.my.package.schema.p.WowTransaction
>>>
>>> at
>>> au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.ap= ply(Traitor.scala:132)
>>>
>>> at
>>> org.apache.flink.streaming.api.scala.DataStream$$anon$1.e= xtractAscendingTimestamp(DataStream.scala:763)
>>>
>>> at
>>> org.apache.flink.streaming.api.functions.timestamps.Ascen= dingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72)
>>>
>>> at
>>> org.apache.flink.streaming.runtime.operators.TimestampsAn= dPeriodicWatermarksOperator.processElement(TimestampsAndPeri= odicWatermarksOperator.java:65)
>>>
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$Co= pyingChainingOutput.collect(OperatorChain.java:412)
>>>
>>> ... 14 more
>>>
>>>
>>> I'm running a flink cluster built from the "release-1= .2" branch on
>>> github.
>>>
>>>
>>> How can I validate that this is a Flink big?
>>>
>>> Where can I report this?
>>>
>>> What sort of information do I need to provide?
>>>
>>>
>>>
>>> Cheers,
>>>
>>> Giuliano Caliari
>>> --
>>> --
>>> Giuliano Caliari (+55 11 984898464)
>>> +Google
>>> Twitter
>>>
>>> Master Software Engineer by Escola Polit=C3=A9cnica da USP
>>> Bachelor in Computer Science by Instituto de Matem=C3=A1tica e= Estat=C3=ADstica da
>>> USP
>>>
>>
>


--001a113d6b7a027ebd05465b3981--