flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Till Rohrmann (JIRA)" <j...@apache.org>
Subject [jira] [Assigned] (FLINK-10011) Old job resurrected during HA failover
Date Fri, 03 Aug 2018 17:33:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-10011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Till Rohrmann reassigned FLINK-10011:
-------------------------------------

    Assignee: Till Rohrmann

> Old job resurrected during HA failover
> --------------------------------------
>
>                 Key: FLINK-10011
>                 URL: https://issues.apache.org/jira/browse/FLINK-10011
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 1.4.2, 1.5.2, 1.6.0
>            Reporter: Elias Levy
>            Assignee: Till Rohrmann
>            Priority: Blocker
>
> For the second time we've observed Flink resurrect an old job during JobManager high-availability
fail over.
> h4. Configuration
>  * AWS environment
>  * Flink 1.4.2 standalong cluster in HA mode
>  * 2 JMs, 3 TMs
>  * 3 node ZK ensemble
>  * 1 job consuming to/from Kafka
>  * Checkpoints in S3 using the Presto file system adaptor
> h4. Timeline 
>  * 15:18:10 JM 2 completes checkpoint 69256.
>  * 15:19:10 JM 2 completes checkpoint 69257.
>  * 15:19:57 ZK 1 (follower) loses connectivity to the leader as a result of a SocketTimeoutException
>  * 15:19:57 ZK 1 closes connection to JM 2 (leader)
>  * 15:19:57 ZK 2 (leader) reports a network error and closes connection to ZK 1
>  * 15:19:57 JM 2 reports it can't read data from ZK
>  ** {{Unable to read additional data from server sessionid 0x30000003f4a0003, likely
server has closed socket, closing socket connection and attempting reconnect)}}
>  ** {{org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn}}
>  * 15:19:57 JM 2 ZK Curator changes connection state to SUSPENDED
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.}}
>  ** {{ZooKeeper connection SUSPENDED. }}{{Changes to the submitted job graphs are not
monitored (temporarily).}}
>  ** {{Connection to ZooKeeper suspended. The contender akka.tcp://flink@flink-jm-2:6123/user/jobmanager
no longer participates in the leader election}}{{ }}
>  ** {{Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.}}
>  * 15:19:57 JM 2 gives up leadership
>  ** {{JobManager akka://flink/user/jobmanager#33755521 was revoked leadership.}}
>  * 15:19:57 JM 2 changes job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color} status
to SUSPENDED
>  ** {{Stopping checkpoint coordinator for job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}}}
>  * 15:19:57 TMs start disasociating with JM 2, but JM 2 discard the messages because
there is no leader
>  ** {{Discard message LeaderSessionMessage(d29e9f38-a16d-4c87-b34f-5212caab0473,Disconnect(b97363d53ad22aedfebdc8e5ba3c672f,java.lang.Exception:
TaskManager akka://flink/user/taskmanager is disassociating)) because there is currently no
valid leader id known.}}
>  * 15:19:57 JM 2 connects to ZK 2 and renews its session
>  ** {{Opening socket connection to server ip-10-210-43-221.ec2.internal/10.210.43.221:2181}}
>  ** {{Socket connection established to ip-10-210-43-221.ec2.internal/10.210.43.221:2181,
initiating session}}
>  ** {{Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.}}
>  ** {{Session establishment complete on server ip-10-210-43-221.ec2.internal/10.210.43.221:2181,
sessionid = 0x30000003f4a0003, negotiated timeout = 40000}}
>  ** {{Connection to ZooKeeper was reconnected. Leader election can be restarted.}}
>  ** {{ZooKeeper connection RECONNECTED. Changes to the submitted job graphs are monitored
again.}}
>  ** {{State change: RECONNECTED}}
>  * 15:19:57: JM 1 reports JM 1 has been granted leadership:
>  ** {{JobManager akka.tcp://flink@flink-jm-1:6123/user/jobmanager was granted leadership
with leader session ID Some(ae0a1a17-eccc-40b4-985d-93bc59f5b936).}}
>  * 15:19:57 JM 2 reports the job has been suspended
>  ** {{org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter Shutting down.}}
>  ** {{Job 2a4eff355aef849c5ca37dbac04f2ff1 has been suspended.}}
>  * 15:19:57 JM 2 reports it has lost leadership:
>  ** {{Associated JobManager Actor[akka://flink/user/jobmanager#33755521|#33755521] lost
leader status}}
>  ** {{Received leader address but not running in leader ActorSystem. Cancelling registration.}}
>  * 15:19:57 TMs register with JM 1
>  * 15:20:07 JM 1 Attempts to recover jobs and find there are two jobs:
>  ** {{Attempting to recover all jobs.}}
>  ** {{There are 2 jobs to recover. Starting the job recovery.}}
>  ** {{Attempting to recover job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.}}
>  ** {{Attempting to recover job {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.}}
>  * 15:20:08 – 15:32:27 ZK 2 reports a large number of errors of the form:
>  ** {{Got user-level KeeperException when processing sessionid:0x2000001d2330001 type:create
cxid:0x4211 zxid:0x60009dc70 txntype:-1 reqpath:n/a Error Path:/flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1
Error:KeeperErrorCode = NodeExists for /flink/cluster_a/checkpoint-counter/2a4eff355aef849c5ca37dbac04f2ff1}}
>  ** {{Got user-level KeeperException when processing sessionid:0x2000001d2330001 type:create
cxid:0x4230 zxid:0x60009dc78 txntype:-1 reqpath:n/a Error Path:/flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2
Error:KeeperErrorCode = NodeExists for /flink/cluster_a/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1/0000000000000069255/37d25086-374f-4969-b763-4261e87022a2}}
>  * 15:29:08 JM 1 starts to recover job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
>  ** {{Recovered SubmittedJobGraph(2a4eff355aef849c5ca37dbac04f2ff1, JobInfo(clients:
Set((Actor[akka.tcp://flink@ip-10-210-42-62.ec2.internal:37564/temp/$c],DETACHED)), start:
1528833686265)).}}
>  ** {{Submitting recovered job 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Submitting job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job) (Recovery).}}
>  ** {{Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
delayBetweenRestartAttempts=30000) for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Successfully ran initialization on master in 0 ms.}}
>  ** {{Job recovers via failover strategy: full graph restart}}
>  ** {{Running initialization on master for job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).}}
>  ** {{Initialized in '/checkpoints/2a4eff355aef849c5ca37dbac04f2ff1'.}}
>  ** {{Using application-defined state backend for checkpoint/savepoint metadata: File
State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
>  ** {{Persisting periodic checkpoints externally at s3://bucket/flink/cluster_1/checkpoints-external.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Found 3 checkpoints in ZooKeeper.}}
>  ** {{Trying to retrieve checkpoint 69255.}}
>  ** {{Trying to fetch 3 checkpoints from storage.}}
>  ** {{Trying to retrieve checkpoint 69256.}}
>  ** {{Trying to retrieve checkpoint 69257.}}
>  ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Scheduling job 2a4eff355aef849c5ca37dbac04f2ff1 (Some Job).}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state CREATED to
RUNNING.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RUNNING to
FAILING.}}{{ org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not
enough free slots available to run the job. You can decrease the operator parallelism or increase
the number of slots per TaskManager in the configuration.}}
>  * 15:20:09 JM 1 starts to recover {color:#d04437}61bca496065cd05e4263070a5e923a05{color}
>  ** {{Recovered SubmittedJobGraph(61bca496065cd05e4263070a5e923a05, JobInfo(clients:
Set((Actor[akka.tcp://flink@ip-10-210-22-167.ec2.internal:41001/temp/$c],DETACHED)), start:
1525728377900)).}}
>  ** {{Submitting recovered job 61bca496065cd05e4263070a5e923a05.}}
>  ** {{Submitting job 61bca496065cd05e4263070a5e923a05 (Some Job) (Recovery).}}
>  ** {{Using restart strategy FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
delayBetweenRestartAttempts=30000) for 61bca496065cd05e4263070a5e923a05.}}
>  ** {{Job recovers via failover strategy: full graph restart}}
>  ** {{Successfully ran initialization on master in 0 ms.}}
>  ** {{Running initialization on master for job Some Job (61bca496065cd05e4263070a5e923a05).}}
>  ** {{Initialized in '/checkpoints/61bca496065cd05e4263070a5e923a05'.}}
>  ** {{Using application-defined state backend for checkpoint/savepoint metadata: File
State Backend @ s3://bucket/flink/cluster_1/checkpoints.}}
>  ** {{Persisting periodic checkpoints externally at s3://bucket/flink/cluster_1/checkpoints-external.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Scheduling job 61bca496065cd05e4263070a5e923a05 (Some Job).}}
>  ** {{Job Some Job (61bca496065cd05e4263070a5e923a05) switched from state CREATED to
RUNNING.}}
>  ** {{Trying to fetch 0 checkpoints from storage}}
>  ** {{Found 0 checkpoints in ZooKeeper.}}
>  * 15:20:09 JM 1 reports a bunch of metric collisions because of the two jobs:
>  ** {{Name collision: Group already contains a Metric with the name 'lastCheckpointSize'.
Metric will not be reported.[jobmanager, job]}}
>  ** {{Name collision: Group already contains a Metric with the name 'lastCheckpointAlignmentBuffered'.
Metric will not be reported.[jobmanager, job]}}
>  ** etc
>  * 15:20:39 JM 1 begins attempting to restart the {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
job repeatedly
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state FAILING to
RESTARTING.}}
>  ** {{Restarting the job Some Job (2a4eff355aef849c5ca37dbac04f2ff1).}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RESTARTING
to CREATED.}}
>  ** {{Recovering checkpoints from ZooKeeper.}}
>  ** {{Found 3 checkpoints in ZooKeeper.}}
>  ** {{Trying to fetch 3 checkpoints from storage.}}
>  ** {{Trying to retrieve checkpoint 69255.}}
>  ** {{Trying to retrieve checkpoint 69256.}}
>  ** {{Trying to retrieve checkpoint 69257.}}
>  ** {{Restoring from latest valid checkpoint: Checkpoint 69257 @ 1532989148882 for 2a4eff355aef849c5ca37dbac04f2ff1.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state CREATED to
RUNNING.}}
>  ** {{Job Some Job (2a4eff355aef849c5ca37dbac04f2ff1) switched from state RUNNING to
FAILING.}}
>  * 15:35:39 ZK 1 reestablishes connection with ZK 2 and 3 and becomes a follower
>  
> h4. Analysis
>  
> The cluster was running job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.
 The JM HA leader was JM 2, which was connected to ZK 1.  ZK 1 was a follower in the ZK
ensemble.  The ZK leader was ZK 2.  
> ZK 1 lost network connectivity for about 16 minutes.  Upon loss of connectivity to ZK
1, JM 2 gives up leadership and shutdown the  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
job.  JM 2 then immediately connects to ZK 2, without its ZK session having expired.  Nonetheless,
as it gave up leadership JM 1 is elected the new JM leader.
> JM 1 then erroneously decides there are two jobs to restore.  The previously running
job,  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}, and {color:#d04437}61bca496065cd05e4263070a5e923a05{color}.
 JM 1 decides there are no checkpoints for {color:#d04437}61bca496065cd05e4263070a5e923a05{color},
which starts right away.   JM 1 attempts to restore {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
from the latest checkpoint, but it fails because there aren't enough task slots in the cluster
as a result of the other job starting.  Thus,  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}
entered a restart loop.
> We manually stopped both jobs and starts a new job based on the last known checkpoint
for  {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}.
>  
> Job {color:#d04437}61bca496065cd05e4263070a5e923a05{color}  is an old job that we ran
for a month between May 7th and June 5th.
> After our manual intervention, the the HA state directory in S3 looks like this:
> {{s3cmd ls s3://bucket/flink/cluster_1/recovery/}}
> {{ DIR s3://bucket/flink/cluster_1/recovery/some_job/}}
> {{2018-07-31 17:33 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint12e06bef01c5}}
> {{2018-07-31 17:34 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint187e0d2ae7cb}}
> {{2018-07-31 17:32 35553 s3://bucket/flink/cluster_1/recovery/completedCheckpoint22fc8ca46f02}}
> {{2018-06-12 20:01 284626 s3://bucket/flink/cluster_1/recovery/submittedJobGraph7f627a661cec}}
> {{2018-07-30 23:01 285257 s3://bucket/flink/cluster_1/recovery/submittedJobGraphf3767780c00c}}
> submittedJobGraph7f627a661cec appears to be job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color},
the long running job that failed during the ZK failover
> submittedJobGraphf3767780c00c appears to be job {color:#205081}d77948df92813a68ea6dfd6783f40e7e{color},
the job we started restoring from a checkpoint after shutting down the duplicate jobs
>  
> A few questions come to mind.
> h5. Why does the JM terminate running jobs when it can immediately connect to another
ZK node and its ZK session has not expired?
> This seems to be a result of using the LeaderLatch recipe in Curator.  It's [docs|https://github.com/Netflix/curator/wiki/Leader-Latch]
state: 
> {quote}LeaderLatch instances add a ConnectionStateListener to watch for connection problems.
If SUSPENDED or LOST is reported, the LeaderLatch that is the *leader will report that it
is no longer the leader* (i.e. there will not be a leader until the connection is re-established).
If a LOST connection is RECONNECTED, the LeaderLatch *will delete its previous ZNode and create
a new one*.
> Users of LeaderLatch must take account that connection issues can cause leadership to
be lost. i.e. hasLeadership() returns true but some time later the connection is SUSPENDED
or LOST. At that point hasLeadership() will return false. It is highly recommended that LeaderLatch
users register a ConnectionStateListener.
> {quote}
> So not only is leadership lost while disconnected, but will likely stay lost when reconnecting
as a result of the znode deletion and recreation.
> This can probably be solved by using the Curator LeaderElection recipe instead, which
[states|https://github.com/Netflix/curator/wiki/Leader-Election]:
> {quote}The {{LeaderSelectorListener}} class extends {{ConnectionStateListener}}. When
the LeaderSelector is started, it adds the listener to the Curator instance. Users of the
{{LeaderSelector}} must pay attention to any connection state changes. If an instance becomes
the leader, it should respond to notification of being SUSPENDED or LOST.
> If the SUSPENDED state is reported, the instance must assume that it might no longer
be the leader until it receives a RECONNECTED state. If the LOST state is reported, the instance
is no longer the leader and its {{takeLeadership}} method should exit.
> {quote}
> So with LeaderElection it seems that what to do during the SUSPENDED state is left up
to the application, which may choose to continue being leader until the state becomes LOST.
> Obviously there are dangers with doing so, but at the very least you would expect the
JM not to give up leadership until it tried to connect to other ZK members within the remaining
ZK session timeout.
> This problem has been [previously discussed|http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Zookeeper-failure-handling-td19611.html]
in the mailing list, which led to FLINK-6174 and this [PR|https://github.com/apache/flink/pull/3599],
which appears to be a modification of the Curator LeaderLatch recipe.  It also lead to FLINK-5703,
which seems like an attempt to keep jobs running during JM failover.  While that is a valuable
addition, I argue that is not required to avoid this failure, as the previous leader can continue
being leader so long as it connects to a new ZK before its ZK session expires.
>  
> h5. Why did JM 1 resurrect the old job?
> Something must have been off with the state stored in the S3 HA recovery directory.  The
job must have not been fully removed.  
> In fact, right now the recovery directory has the file submittedJobGraph7f627a661cec
which appears to be job {color:#14892c}2a4eff355aef849c5ca37dbac04f2ff1{color}. Is that expected?
 That job is no longer running.  Shouldn't that file no longer exist in the recovery directory?
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message