flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10255) Standby Dispatcher locks submitted JobGraphs
Date Tue, 11 Sep 2018 05:33:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16610117#comment-16610117
] 

ASF GitHub Bot commented on FLINK-10255:
----------------------------------------

TisonKun commented on issue #6678: [FLINK-10255] Only react to onAddedJobGraph signal when
being leader
URL: https://github.com/apache/flink/pull/6678#issuecomment-420151979
 
 
   Travis show relevant failures, will take a close look later.
   
   ```
   testGrantingRevokingLeadership(org.apache.flink.runtime.dispatcher.DispatcherHATest)  Time
elapsed: 0.024 sec  <<< ERROR!
   org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: java.lang.UnsupportedOperationException:
Should not be called.
   	at org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51)
   	at org.apache.flink.runtime.dispatcher.DispatcherHATest.teardown(DispatcherHATest.java:98)
   Caused by: java.lang.UnsupportedOperationException: Should not be called.
   	at org.apache.flink.runtime.dispatcher.DispatcherHATest$BlockingSubmittedJobGraphStore.releaseJobGraph(DispatcherHATest.java:306)
   	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:809)
   	at org.apache.flink.util.function.BiFunctionWithException.apply(BiFunctionWithException.java:49)
   	at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)
   	at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
   	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
   	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
   	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
   	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   ```
   
   ```
   Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 1.45 sec <<< FAILURE!
- in org.apache.flink.runtime.dispatcher.DispatcherHATest
   testGrantingRevokingLeadership(org.apache.flink.runtime.dispatcher.DispatcherHATest)  Time
elapsed: 0.028 sec  <<< ERROR!
   org.apache.flink.runtime.util.TestingFatalErrorHandler$TestingException: java.lang.UnsupportedOperationException:
Should not be called.
   	at org.apache.flink.runtime.util.TestingFatalErrorHandler.rethrowError(TestingFatalErrorHandler.java:51)
   	at org.apache.flink.runtime.dispatcher.DispatcherHATest.teardown(DispatcherHATest.java:98)
   Caused by: java.lang.UnsupportedOperationException: Should not be called.
   	at org.apache.flink.runtime.dispatcher.DispatcherHATest$BlockingSubmittedJobGraphStore.releaseJobGraph(DispatcherHATest.java:306)
   	at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$null$26(Dispatcher.java:809)
   	at org.apache.flink.util.function.BiFunctionWithException.apply(BiFunctionWithException.java:49)
   	at java.util.concurrent.CompletableFuture.biApply(CompletableFuture.java:1105)
   	at java.util.concurrent.CompletableFuture$BiApply.tryFire(CompletableFuture.java:1070)
   	at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
   	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
   	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
   	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
   ```
   
   ```
   A TaskManager should go into a clean state in case of a JobManager failure(org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase)
 Time elapsed: 121.247 sec  <<< FAILURE!
   java.lang.AssertionError: assertion failed: timeout (119585594930 nanoseconds) during expectMsg
while waiting for Acknowledge
   	at scala.Predef$.assert(Predef.scala:170)
   	at akka.testkit.TestKitBase$class.expectMsg_internal(TestKit.scala:387)
   	at akka.testkit.TestKitBase$class.expectMsg(TestKit.scala:364)
   	at akka.testkit.TestKit.expectMsg(TestKit.scala:814)
   	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply$mcV$sp(JobManagerFailsITCase.scala:118)
   	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply(JobManagerFailsITCase.scala:104)
   	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3$$anonfun$apply$mcV$sp$4.apply(JobManagerFailsITCase.scala:104)
   	at akka.testkit.TestKitBase$class.within(TestKit.scala:345)
   	at akka.testkit.TestKit.within(TestKit.scala:814)
   	at akka.testkit.TestKitBase$class.within(TestKit.scala:359)
   	at akka.testkit.TestKit.within(TestKit.scala:814)
   	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3.apply$mcV$sp(JobManagerFailsITCase.scala:104)
   	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(JobManagerFailsITCase.scala:85)
   	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(JobManagerFailsITCase.scala:85)
   	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
   	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
   	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   	at org.scalatest.Transformer.apply(Transformer.scala:22)
   	at org.scalatest.Transformer.apply(Transformer.scala:20)
   	at org.scalatest.WordSpecLike$$anon$1.apply(WordSpecLike.scala:953)
   	at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
   	at org.apache.flink.api.scala.runtime.jobmanager.JobManagerFailsITCase.withFixture(JobManagerFailsITCase.scala:37)
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Standby Dispatcher locks submitted JobGraphs
> --------------------------------------------
>
>                 Key: FLINK-10255
>                 URL: https://issues.apache.org/jira/browse/FLINK-10255
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination
>    Affects Versions: 1.5.3, 1.6.0, 1.7.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.6.1, 1.7.0, 1.5.4
>
>
> Currently, standby {{Dispatchers}} lock submitted {{JobGraphs}} which are added to the
{{SubmittedJobGraphStore}} if HA mode is enabled. Locking the {{JobGraphs}} can prevent their
cleanup leaving the system in an inconsistent state.
> The problem is that we recover in the {{SubmittedJobGraphListener#onAddedJobGraph}} callback
which is also called if don't have the leadership the newly added {{JobGraph}}. Recovering
the {{JobGraph}} currently locks the {{JobGraph}}. In case that the {{Dispatcher}} is not
the leader, then we won't start that job after its recovery. However, we also don't release
the {{JobGraph}} leaving it locked.
> There are two possible solutions to the problem. Either we check whether we are the leader
before recovering jobs or we say that recovering jobs does not lock them. Only if we can submit
the recovered job we lock them. The latter approach has the advantage that it follows a quite
similar code path as an initial job submission. Moreover, jobs are currently also recovered
at other places. In all these places we currently would need to release the {{JobGraphs}}
if we cannot submit the recovered {{JobGraph}} (e.g. {{Dispatcher#grantLeadership}}).
> An extension of the first solution could be to stop the {{SubmittedJobGraphStore}} while
the {{Dispatcher}} is not the leader. Then we would have to make sure that no concurrent callback
from the {{SubmittedJobGraphStore#SubmittedJobGraphListener}} can be executed after revoking
leadership from the {{Dispatcher}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message