Hi Till and Fabian,

 

My apologies for taking a week to reply; it took some time to reproduce the issue with debug logging. I’ve attached logs from a two minute period when the problem happened. I’m just sending this to you two to avoid sending the log file all over the place. If you’d like to have our conversation in the user group mailing list, that’s fine.

 

The job was submitted by using the job manager REST api starting at 20:33:46.262 and finishing at 20:34:01.547. This worked normally, and the job started running. We then run a monitor that polls the /overview endpoint of the JM REST api. This started polling at 20:34:31.380 and resulted in the JM throwing the FencingTokenException at 20:34:31:393, and the JM returned a 500 to our monitor. This will happen every time we poll until the monitor times out and then we tear down the cluster, even though the job is running, we can’t tell that it is. This is somewhat rare, happening maybe 5% of the time.

 

We’re running Flink 1.7.1. This issue only happens when we run in Job Manager High Availability mode. We provision two Job Managers, a 3-node zookeeper cluster, task managers and our monitor all in their own Kubernetes namespace. I can send you Zookeeper logs too if that would be helpful.

 

Thanks in advance for any help you can provide!

 

-Bruce

-- 

 

 

From: Till Rohrmann <trohrmann@apache.org>
Date: Wednesday, October 2, 2019 at 6:10 AM
To: Fabian Hueske <fhueske@gmail.com>
Cc: "Hanson, Bruce" <bruce.hanson@here.com>, "user@flink.apache.org" <user@flink.apache.org>
Subject: Re: Fencing token exceptions from Job Manager High Availability mode

 

Hi Bruce, are you able to provide us with the full debug logs? From the excerpt itself it is hard to tell what is going on.

 

Cheers,

Till

 

On Wed, Oct 2, 2019 at 2:24 PM Fabian Hueske <fhueske@gmail.com> wrote:

Hi Bruce,

 

I haven't seen such an exception yet, but maybe Till (in CC) can help.

 

Best,

Fabian

 

Am Di., 1. Okt. 2019 um 05:51 Uhr schrieb Hanson, Bruce <bruce.hanson@here.com>:

Hi all,

 

We are running some of our Flink jobs with Job Manager High Availability. Occasionally we get a cluster that comes up improperly and doesn’t respond. Attempts to submit the job seem to hang and when we hit the /overview REST endpoint in the Job Manager we get a 500 error and a fencing token exception like this:

 

2019-09-21 05:04:07.785 [flink-akka.actor.default-dispatcher-4428] level=ERROR o.a.f.runtime.rest.handler.cluster.ClusterOverviewHandler  - Implementation error: Unhandled exception.

org.apache.flink.runtime.rpc.exceptions.FencingTokenException: Fencing token not set: Ignoring message LocalFencedMessage(null, LocalRpcInvocation(requestResourceOverview(Time))) sent to akka.tcp://flink@job-ef80a156-3350-4e85-8761-b0e42edc346f-jm-0.job-ef80a156-3350-4e85-8761-b0e42edc346f-jm-svc.olp-here-test-j-ef80a156-3350-4e85-8761-b0e42edc346f.svc.cluster.local:6126/user/resourcemanager because the fencing token is null.

        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:59)

        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)

        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)

        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)

        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)

        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

        at akka.actor.ActorCell.invoke(ActorCell.scala:495)

        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

        at akka.dispatch.Mailbox.run(Mailbox.scala:224)

        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 

 

We are running Flink 1.7.1 in Kubernetes and run each job in its own namespace with a three-node Zookeeper cluster and two Job Managers, plus one or more Task Managers. I have been able to replicate the issue, but don’t find any difference in the logs between a failing cluster and a good one.

 

Does anyone here have any ideas as to what’s happening, or what I should be looking for?

 

-Bruce

 

 

 

 

cid:image001.png@01D2B473.0F7F85E0

 

Bruce Hanson

Principal Engineer

M: +1 425 681 0422

 

HERE Technologies

701 Pike Street, Suite 2000

Seattle, WA 98101 USA

47° 36' 41" N 122° 19' 57" W

 

cid:image002.png@01D2B473.0F7F85E0    cid:image003.png@01D2B473.0F7F85E0   cid:image004.png@01D2B473.0F7F85E0    cid:image005.png@01D2B473.0F7F85E0    cid:image006.png@01D2B473.0F7F85E0