flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Connie Yang <conniewongy...@gmail.com>
Subject How to anchor a job level singleton object as part of TaskManager JVM?
Date Sun, 03 Jun 2018 22:44:58 GMT
Hi,

I'm using Flink 1.4.2 release.

My Flink streaming job needs a way to make a singleton object available
throughout the job graph which consists of Kafka Source, ProcessFunction,
RichAsyncFunction, RichSinkFunction.  What's the best way to achieve this?

As an attempt, I have tried anchoring my object in a jar that is included
as part of the TaskManager JVM class path.  But it does not seem to work in
the following scenario:
1. When the job manager process was restarted (due to a pod failure) while
my job is running
2. When the job manager comes back, it attempts to restart my job, which
seems a bit odd.
3. But, my job kept failing with a ClassCastException where it attempts to
retrieve the singleton from TaskManager class loader.

2018-06-02 01:58:56,614 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
monstor -> Process -> async wait operator -> P
rocess -> Sink: deadletter (10/12) (289e6ee24be520d81890dfe5a1a164fa)
switched from RUNNING to FAILED.
java.lang.ClassCastException: org.monstor.flink.coordination.Coordinator
cannot be cast to org.monstor.flink.coordination.Coordinator
        at
org.monstor.flink.coordination.CoordinatorHolder.<init>(CoordinatorHolder.java:48)
        at
org.monstor.myjob.flink.MyAsyncWriter.open(GsiStoreAsyncWriter.java:102)
        at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
        at
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:164)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
        at java.lang.Thread.run(Thread.java:748)
2018-06-02 01:58:56,614 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
monstor -> Process -> async wait operator -> P
rocess -> Sink: deadletter (2/12) (e55b878f5c9d91a0dc557dd60b80e867)
switched from RUNNING to CANCELING.

Thanks,
Connie

Mime
View raw message