Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 54E78200ACA for ; Thu, 9 Jun 2016 09:49:14 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 5399C160A58; Thu, 9 Jun 2016 07:49:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0325A160A2B for ; Thu, 9 Jun 2016 09:49:12 +0200 (CEST) Received: (qmail 61819 invoked by uid 500); 9 Jun 2016 07:49:12 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 61810 invoked by uid 99); 9 Jun 2016 07:49:12 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 09 Jun 2016 07:49:12 +0000 Received: from mail-wm0-f41.google.com (mail-wm0-f41.google.com [74.125.82.41]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 7141C1A015B for ; Thu, 9 Jun 2016 07:49:11 +0000 (UTC) Received: by mail-wm0-f41.google.com with SMTP id k204so47864083wmk.0 for ; Thu, 09 Jun 2016 00:49:11 -0700 (PDT) X-Gm-Message-State: ALyK8tL7o2pabNd5FyExmPlFVaNHKMtDPbHHRQLkWr2v9elJgS5VMjgbRHIY8rREWRRpdCeJVRMZLilmsuC3Cw== X-Received: by 10.28.47.71 with SMTP id v68mr430844wmv.49.1465458549956; Thu, 09 Jun 2016 00:49:09 -0700 (PDT) MIME-Version: 1.0 Received: by 10.194.99.103 with HTTP; Thu, 9 Jun 2016 00:49:09 -0700 (PDT) In-Reply-To: References: From: Till Rohrmann Date: Thu, 9 Jun 2016 09:49:09 +0200 X-Gmail-Original-Message-ID: Message-ID: Subject: Re: ClassCastException when redeploying Flink job on running cluster To: user@flink.apache.org Content-Type: multipart/alternative; boundary=001a11423eb84dd0710534d3a8a0 archived-at: Thu, 09 Jun 2016 07:49:14 -0000 --001a11423eb84dd0710534d3a8a0 Content-Type: text/plain; charset=UTF-8 Great to hear :-) On Wed, Jun 8, 2016 at 7:45 PM, Josh wrote: > Thanks Till, your suggestion worked! > > I actually just created a new SpecificData for each > AvroDeserializationSchema instance, so I think it's still just as efficient. > > Josh > > On Wed, Jun 8, 2016 at 4:41 PM, Till Rohrmann > wrote: > >> The only thing I could think of is to not use the SpecificData singleton >> but instead creating a new SpecificData object for each SpecificDatumReader >> (you can pass it as a third argument to the constructor). This, of course, >> is not really efficient. But you could try it out to see whether it solves >> your problem. >> >> Cheers, >> Till >> >> On Wed, Jun 8, 2016 at 4:24 PM, Josh wrote: >> >>> Sorry - I forgot to include my stack trace too. Here it is: >>> >>> The program finished with the following exception: >>> >>> org.apache.flink.client.program.ProgramInvocationException: The program >>> execution failed: Job execution failed. >>> at org.apache.flink.client.program.Client.runBlocking(Client.java:381) >>> at org.apache.flink.client.program.Client.runBlocking(Client.java:355) >>> at >>> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65) >>> at >>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:536) >>> at com.me.flink.job.MyFlinkJob$.main(MyFlinkJob.scala:85) >>> at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.scala) >>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:498) >>> at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>> at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>> at >>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>> at >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192) >>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243) >>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>> execution failed. >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717) >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663) >>> at >>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663) >>> at >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >>> at >>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401) >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> Caused by: java.lang.Exception: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150) >>> at >>> org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:285) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >>> at >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >>> at >>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225) >>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >>> at java.lang.Thread.run(Thread.java:745) >>> Caused by: java.lang.RuntimeException: Could not forward element to next >>> operator >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >>> at >>> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) >>> at >>> org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerThread.run(ShardConsumerThread.java:141) >>> Caused by: java.lang.ClassCastException: com.me.avro.MyAvroType cannot >>> be cast to com.me.avro.MyAvroType >>> at com.me.flink.job.MyFlinkJob$$anonfun$1.apply(MyFlinkJob.scala:61) >>> at >>> org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:746) >>> at >>> org.apache.flink.streaming.api.functions.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:71) >>> at >>> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:63) >>> at >>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) >>> ... 3 more >>> >>> On Wed, Jun 8, 2016 at 3:19 PM, Josh wrote: >>> >>>> Hi Till, >>>> >>>> Thanks for the reply! I see - yes it does sound very much like >>>> FLINK-1390. >>>> >>>> Please see my AvroDeserializationSchema implementation here: >>>> http://pastebin.com/mK7SfBQ8 >>>> >>>> I think perhaps the problem is caused by this line: >>>> val readerSchema = SpecificData.get().getSchema(classTag[T]. >>>> runtimeClass) >>>> >>>> Looking at SpecificData, it contains a classCache which is a map of >>>> strings to classes, similar to what's described in FLINK-1390. >>>> >>>> I'm not sure how to change my AvroDeserializationSchema to prevent this >>>> from happening though! Do you have any ideas? >>>> >>>> Josh >>>> >>>> >>>> >>>> On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann >>>> wrote: >>>> >>>>> Hi Josh, >>>>> >>>>> the error message you've posted usually indicates that there is a >>>>> class loader issue. When you first run your program the class >>>>> com.me.avro.MyAvroType will be first loaded (by the user code class >>>>> loader). I suspect that this class is now somewhere cached (e.g. the avro >>>>> serializer) and when you run your program a second time, then there is a >>>>> new user code class loader which has loaded the same class and now you want >>>>> to convert an instance of the first class into the second class. However, >>>>> these two classes are not identical since they were loaded by different >>>>> class loaders. >>>>> >>>>> In order to find the culprit, it would be helpful to see the full >>>>> stack trace of the ClassCastException and the code of the >>>>> AvroDeserializationSchema. I suspect that something similar to >>>>> https://issues.apache.org/jira/browse/FLINK-1390 is happening. >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Wed, Jun 8, 2016 at 10:38 AM, Josh wrote: >>>>> >>>>>> Hi all, >>>>>> >>>>>> Currently I have to relaunch my Flink cluster every time I want to >>>>>> upgrade/redeploy my Flink job, because otherwise I get a ClassCastException: >>>>>> >>>>>> java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast >>>>>> to com.me.avro.MyAvroType >>>>>> >>>>>> It's related to MyAvroType which is an class generated from an Avro >>>>>> schema. The ClassCastException occurs every time I redeploy the job without >>>>>> killing the Flink cluster (even if there have been no changes to the >>>>>> job/jar). >>>>>> >>>>>> I wrote my own AvroDeserializationSchema in Scala which does >>>>>> something a little strange to get the avro type information (see below), >>>>>> and I'm wondering if that's causing the problem when the Flink job creates >>>>>> an AvroDeserializationSchema[MyAvroType]. >>>>>> >>>>>> Does anyone have any ideas? >>>>>> >>>>>> Thanks, >>>>>> Josh >>>>>> >>>>>> >>>>>> >>>>>> class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag] >>>>>> extends DeserializationSchema[T] { >>>>>> >>>>>> ... >>>>>> >>>>>> private val avroType = >>>>>> classTag[T].runtimeClass.asInstanceOf[Class[T]] >>>>>> >>>>>> private val typeInformation = TypeExtractor.getForClass(avroType) >>>>>> >>>>>> ... >>>>>> >>>>>> override def getProducedType: TypeInformation[T] = typeInformation >>>>>> >>>>>> ... >>>>>> >>>>>> } >>>>>> >>>>> >>>>> >>>> >>> >> > --001a11423eb84dd0710534d3a8a0 Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Great to hear :-)

On Wed, Jun 8, 2016 at 7:45 PM, Josh <jofo90@gmail.c= om> wrote:
Thanks Till, your suggestion worked!

I actually just cr= eated a new SpecificData for each AvroDeserializationSchema instance, so I = think it's still just as efficient.

Josh

On Wed, Jun 8, 2016 at 4:41 PM, Till Rohrmann <trohrma= nn@apache.org> wrote:
The only thing I could think of is to not use the SpecificData = singleton but instead creating a new SpecificData object for each SpecificD= atumReader (you can pass it as a third argument to the constructor). This, = of course, is not really efficient. But you could try it out to see whether= it solves your problem.

Cheers,
Till

On W= ed, Jun 8, 2016 at 4:24 PM, Josh <jofo90@gmail.com> wrote:
Sorry - I forgot to includ= e my stack trace too. Here it is:

The program finis= hed with the following exception:

org.apache.flink= .client.program.ProgramInvocationException: The program execution failed: J= ob execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.cli= ent.program.Client.runBlocking(Client.java:355)
at org.apache.flink.streaming.api.environment.S= treamContextEnvironment.execute(StreamContextEnvironment.java:65)
at org.apache.flink.streaming= .api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.sc= ala:536)
at com.me.f= link.job.MyFlinkJob$.main(MyFlinkJob.scala:85)
at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.s= cala)
at sun.reflect= .NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke= (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Delegatin= gMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program= .PackagedProgram.callMainMethod(PackagedProgram.java:505)
at org.apache.flink.client.program.Pa= ckagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.c= lient.program.Client.runBlocking(Client.java:248)
at org.apache.flink.client.CliFrontend.execut= eProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.j= ava:333)
at org.apac= he.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.C= liFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flin= k.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobma= nager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$= sp(JobManager.scala:717)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessa= ge$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at org.apache.flink.runtime.jobmanager.= JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager= .scala:663)
at scala= .concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scal= a:24)
at scala.concu= rrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
= at akka.dispatch.TaskInvocatio= n.run(AbstractDispatcher.scala:41)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTa= sk.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoin= Task.java:260)
at sc= ala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.= java:1253)
at scala.= concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)<= /div>
at scala.concurrent.= forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWo= rkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lan= g.Exception: Could not forward element to next operator
at org.apache.flink.streaming.connector= s.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150)
at org.apache.flink.str= eaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.jav= a:285)
at org.apache= .flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
=
at org.apache.flink.strea= ming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at org.apache.flink.streaming= .runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.T= ask.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.= RuntimeException: Could not forward element to next operator
at org.apache.flink.streaming.runt= ime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:35= 4)
at org.apache.fli= nk.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(Oper= atorChain.java:337)
= at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkCon= text.collect(StreamSource.java:318)
at org.apache.flink.streaming.connectors.kinesis.internals.= ShardConsumerThread.run(ShardConsumerThread.java:141)
Caused by: = java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to com.= me.avro.MyAvroType
a= t com.me.flink.job.MyFlinkJob$$anonfun$1.apply(MyFlinkJob.scala:61)
at org.apache.flink.streami= ng.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:= 746)
at org.apache.f= link.streaming.api.functions.AscendingTimestampExtractor.extractTimestamp(A= scendingTimestampExtractor.java:71)
at org.apache.flink.streaming.runtime.operators.TimestampsA= ndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarks= Operator.java:63)
at= org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutp= ut.collect(OperatorChain.java:351)
... 3 more

On Wed, Jun 8, 2016 at 3:19 PM, Josh <j= ofo90@gmail.com> wrote:
Hi Till,

Thanks for the reply! I see - yes= it does sound very much like FLINK-1390.

Please s= ee my AvroDeserializationSchema implementation here: http://pastebin.com/mK7SfBQ8
=

I think perhaps the problem is caused by this line:
val=C2=A0readerSchema=C2=A0=3D=C2=A0SpecificData.get().getSchema(cl= assTag[T].runtimeClass)

Looking a= t SpecificData, it contains a classCache which is a map of strings to class= es, similar to what's described in FLINK-1390.

I'm not sure how to change my AvroDeserializationSchema to prevent thi= s from happening though! Do you have any ideas?

Josh


=

On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <= trohrmann@apache.= org> wrote:
Hi Josh,

the error message you've posted usually i= ndicates that there is a class loader issue. When you first run your progra= m the class com.me.avro.MyAvroType will be first loaded (by the user code c= lass loader). I suspect that this class is now somewhere cached (e.g. the a= vro serializer) and when you run your program a second time, then there is = a new user code class loader which has loaded the same class and now you wa= nt to convert an instance of the first class into the second class. However= , these two classes are not identical since they were loaded by different c= lass loaders.

In order to find the culprit, it wou= ld be helpful to see the full stack trace of the ClassCastException and the= code of the AvroDeserializationSchema. I suspect that something similar to= =C2=A0https://issues.apache.org/jira/browse/FLINK-1390 is happeni= ng.

Cheers,
Till

On Wed, Jun 8, 2016= at 10:38 AM, Josh <jofo90@gmail.com> wrote:
Hi all,

Currently I h= ave to relaunch my Flink cluster every time I want to upgrade/redeploy my F= link job, because otherwise I get a ClassCastException:

java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast t= o com.me.avro.MyAvroType

It's related to M= yAvroType which is an class generated from an Avro schema. The ClassCastExc= eption occurs every time I redeploy the job without killing the Flink clust= er (even if there have been no changes to the job/jar).

I wrote my own AvroDeserializationSchema in Scala which does somethin= g a little strange to get the avro type information (see below), and I'= m wondering if that's causing the problem when the Flink job creates an= AvroDeserializationSchema[MyAvroType].

Does anyon= e have any ideas?

Thanks,
Josh



class AvroDeserializationSchema[T <: SpecificRecordBase :ClassT= ag] extends DeserializationSchema[T] {

=C2=A0 ...

=C2=A0 private val avroType =3D classTag[T].runtimeClass.asInstanc= eOf[Class[T]]=C2=A0

=C2=A0 private val typeInformation =3D TypeExtractor.getForClass(a= vroType)

=C2=A0 ...

=C2=A0 override def getProducedType: TypeInformation[T] =3D typeIn= formation

=C2=A0 ...

}







--001a11423eb84dd0710534d3a8a0--