flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8943) Jobs will not recover if DFS is temporarily unavailable
Date Wed, 28 Mar 2018 12:31:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16417270#comment-16417270
] 

ASF GitHub Bot commented on FLINK-8943:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5746#discussion_r177731740
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
---
    @@ -396,15 +398,22 @@ public void testFailingJobRecovery() throws Exception {
     
     			jobManager = system.actorOf(jobManagerProps);
     
    +			final TestProbe testProbe = new TestProbe(system);
    +
    +			testProbe.watch(jobManager);
    +
     			Future<Object> started = Patterns.ask(jobManager, new Identify(42), deadline.timeLeft().toMillis());
     
     			Await.ready(started, deadline.timeLeft());
     
     			// make the job manager the leader --> this triggers the recovery of all jobs
     			myLeaderElectionService.isLeader(leaderSessionID);
     
    -			// check that we have successfully recovered the second job
    -			assertThat(recoveredJobs, containsInAnyOrder(jobId2));
    +			// check that we did not recover any jobs
    +			assertThat(recoveredJobs, Matchers.is(empty()));
    --- End diff --
    
    True, will make it consistent.


> Jobs will not recover if DFS is temporarily unavailable
> -------------------------------------------------------
>
>                 Key: FLINK-8943
>                 URL: https://issues.apache.org/jira/browse/FLINK-8943
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.0, 1.4.2, 1.6.0
>            Reporter: Gary Yao
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: flip6
>             Fix For: 1.5.0
>
>
> *Description*
> Job graphs will be recovered only once from the DFS. If the DFS is unavailable at the
recovery attempt, the jobs will simply be not running until the master is restarted again.
> *Steps to reproduce*
> # Submit job on Flink Cluster with HDFS as HA storage dir.
> # Trigger job recovery by killing the master
> # Stop HDFS NameNode
> # Enable HDFS NameNode after job recovery is over
> # Verify that job is not running.
> *Expected behavior*
> The new master should fail fast and exit. The new master should re-attempt the recovery.
> *Stacktrace*
> {noformat}
> 2018-03-14 14:01:37,704 ERROR org.apache.flink.runtime.dispatcher.StandaloneDispatcher
     - Could not recover the job graph for a41d50b6f3ac16a730dd12792a847c97.
> org.apache.flink.util.FlinkException: Could not retrieve submitted JobGraph from state
handle under /a41d50b6f3ac16a730dd12792a847c97. This indicates that the retrieved state handle
is broken. Try cleaning the state handle store.
> 	at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:192)
> 	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$recoverJobs$5(Dispatcher.java:557)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.net.ConnectException: Call From ip-172-31-43-54/172.31.43.54 to ip-172-31-32-118.eu-central-1.compute.internal:9000
failed on connection exception: java.net.ConnectException: Connection refused; For more details
see:  http://wiki.apache.org/hadoop/ConnectionRefused
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> 	at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:801)
> 	at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:732)
> 	at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1493)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1435)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1345)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:227)
> 	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:116)
> 	at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
> 	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:259)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:409)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:163)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:155)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:95)
> 	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:346)
> 	at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
> 	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:843)
> 	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:832)
> 	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:821)
> 	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:325)
> 	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:285)
> 	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:270)
> 	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1132)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:325)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:322)
> 	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
> 	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:322)
> 	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:787)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:119)
> 	at org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:36)
> 	at org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
> 	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:64)
> 	at org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:57)
> 	at org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore.recoverJobGraph(ZooKeeperSubmittedJobGraphStore.java:186)
> 	... 7 more
> Caused by: java.net.ConnectException: Connection refused
> 	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> 	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> 	at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206)
> 	at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:531)
> 	at org.apache.hadoop.ipc.Client$Connection.setupConnection(Client.java:685)
> 	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:788)
> 	at org.apache.hadoop.ipc.Client$Connection.access$3500(Client.java:410)
> 	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1550)
> 	at org.apache.hadoop.ipc.Client.call(Client.java:1381)
> 	... 40 more
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message