flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Connie Yang <conniewongy...@gmail.com>
Subject Re: How to anchor a job level singleton object as part of TaskManager JVM?
Date Sun, 03 Jun 2018 23:32:17 GMT
BTW, the ClassCastException error does not happen when the job was first
cancelled and started.  My Flink cluster is setup with Job Manager HA
enabled.

So, what's the difference between the job restart when the Job Manager was
bounced and when the usual job "cancel" and "start" combination.

On Sun, Jun 3, 2018 at 3:44 PM, Connie Yang <conniewongyang@gmail.com>
wrote:

> Hi,
>
> I'm using Flink 1.4.2 release.
>
> My Flink streaming job needs a way to make a singleton object available
> throughout the job graph which consists of Kafka Source, ProcessFunction,
> RichAsyncFunction, RichSinkFunction.  What's the best way to achieve this?
>
> As an attempt, I have tried anchoring my object in a jar that is included
> as part of the TaskManager JVM class path.  But it does not seem to work in
> the following scenario:
> 1. When the job manager process was restarted (due to a pod failure) while
> my job is running
> 2. When the job manager comes back, it attempts to restart my job, which
> seems a bit odd.
> 3. But, my job kept failing with a ClassCastException where it attempts to
> retrieve the singleton from TaskManager class loader.
>
> 2018-06-02 01:58:56,614 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>       - Source: monstor -> Process -> async wait operator -> P
> rocess -> Sink: deadletter (10/12) (289e6ee24be520d81890dfe5a1a164fa)
> switched from RUNNING to FAILED.
> java.lang.ClassCastException: org.monstor.flink.coordination.Coordinator
> cannot be cast to org.monstor.flink.coordination.Coordinator
>         at org.monstor.flink.coordination.CoordinatorHolder.<init>(
> CoordinatorHolder.java:48)
>         at org.monstor.myjob.flink.MyAsyncWriter.open(
> GsiStoreAsyncWriter.java:102)
>         at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>         at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>         at org.apache.flink.streaming.api.operators.async.
> AsyncWaitOperator.open(AsyncWaitOperator.java:164)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:393)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:254)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)
> 2018-06-02 01:58:56,614 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
>       - Source: monstor -> Process -> async wait operator -> P
> rocess -> Sink: deadletter (2/12) (e55b878f5c9d91a0dc557dd60b80e867)
> switched from RUNNING to CANCELING.
>
> Thanks,
> Connie
>

Mime
View raw message