flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh <jof...@gmail.com>
Subject Re: ClassCastException when redeploying Flink job on running cluster
Date Wed, 08 Jun 2016 14:24:08 GMT
Sorry - I forgot to include my stack trace too. Here it is:

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program
execution failed: Job execution failed.
at org.apache.flink.client.program.Client.runBlocking(Client.java:381)
at org.apache.flink.client.program.Client.runBlocking(Client.java:355)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:65)
at
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:536)
at com.me.flink.job.MyFlinkJob$.main(MyFlinkJob.scala:85)
at com.me.flink.job.MyFlinkJob.main(MyFlinkJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:717)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:663)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Could not forward element to next operator
at
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.run(KinesisDataFetcher.java:150)
at
org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer.run(FlinkKinesisConsumer.java:285)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: Could not forward element to next
operator
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:354)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
at
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
at
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumerThread.run(ShardConsumerThread.java:141)
Caused by: java.lang.ClassCastException: com.me.avro.MyAvroType cannot be
cast to com.me.avro.MyAvroType
at com.me.flink.job.MyFlinkJob$$anonfun$1.apply(MyFlinkJob.scala:61)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:746)
at
org.apache.flink.streaming.api.functions.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:71)
at
org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:63)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
... 3 more

On Wed, Jun 8, 2016 at 3:19 PM, Josh <jofo90@gmail.com> wrote:

> Hi Till,
>
> Thanks for the reply! I see - yes it does sound very much like FLINK-1390.
>
> Please see my AvroDeserializationSchema implementation here:
> http://pastebin.com/mK7SfBQ8
>
> I think perhaps the problem is caused by this line:
> val readerSchema = SpecificData.get().getSchema(classTag[T].runtimeClass)
>
> Looking at SpecificData, it contains a classCache which is a map of
> strings to classes, similar to what's described in FLINK-1390.
>
> I'm not sure how to change my AvroDeserializationSchema to prevent this
> from happening though! Do you have any ideas?
>
> Josh
>
>
>
> On Wed, Jun 8, 2016 at 11:23 AM, Till Rohrmann <trohrmann@apache.org>
> wrote:
>
>> Hi Josh,
>>
>> the error message you've posted usually indicates that there is a class
>> loader issue. When you first run your program the class
>> com.me.avro.MyAvroType will be first loaded (by the user code class
>> loader). I suspect that this class is now somewhere cached (e.g. the avro
>> serializer) and when you run your program a second time, then there is a
>> new user code class loader which has loaded the same class and now you want
>> to convert an instance of the first class into the second class. However,
>> these two classes are not identical since they were loaded by different
>> class loaders.
>>
>> In order to find the culprit, it would be helpful to see the full stack
>> trace of the ClassCastException and the code of the
>> AvroDeserializationSchema. I suspect that something similar to
>> https://issues.apache.org/jira/browse/FLINK-1390 is happening.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jun 8, 2016 at 10:38 AM, Josh <jofo90@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Currently I have to relaunch my Flink cluster every time I want to
>>> upgrade/redeploy my Flink job, because otherwise I get a ClassCastException:
>>>
>>> java.lang.ClassCastException: com.me.avro.MyAvroType cannot be cast to
>>> com.me.avro.MyAvroType
>>>
>>> It's related to MyAvroType which is an class generated from an Avro
>>> schema. The ClassCastException occurs every time I redeploy the job without
>>> killing the Flink cluster (even if there have been no changes to the
>>> job/jar).
>>>
>>> I wrote my own AvroDeserializationSchema in Scala which does something a
>>> little strange to get the avro type information (see below), and I'm
>>> wondering if that's causing the problem when the Flink job creates an
>>> AvroDeserializationSchema[MyAvroType].
>>>
>>> Does anyone have any ideas?
>>>
>>> Thanks,
>>> Josh
>>>
>>>
>>>
>>> class AvroDeserializationSchema[T <: SpecificRecordBase :ClassTag]
>>> extends DeserializationSchema[T] {
>>>
>>>   ...
>>>
>>>   private val avroType = classTag[T].runtimeClass.asInstanceOf[Class[T]]
>>>
>>>   private val typeInformation = TypeExtractor.getForClass(avroType)
>>>
>>>   ...
>>>
>>>   override def getProducedType: TypeInformation[T] = typeInformation
>>>
>>>   ...
>>>
>>> }
>>>
>>
>>
>

Mime
View raw message