flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From imcking <imck...@126.com>
Subject flink 1.6.2 ON YARN HA环境,提交任务报错
Date Fri, 12 Apr 2019 13:18:26 GMT
大家好:
         我在使用1.6.2的时候遇到一个问题,flink1.6.2,hadoop-2.7.6,Flink
on YARN环境,flink配置了HA,配置文件内容如下:


jobmanager.execution.failover-strategy: individual


state.backend: filesystem
state.backend.fs.checkpointdir: hdfs:///flink/checkpoints


high-availability: zookeeper
high-availability.zookeeper.quorum: 172.23.71.133:2181,172.23.71.135:2181,172.23.71.136:2181
high-availability.storageDir: hdfs:///flink/recovery
high-availability.zookeeper.path.root: /flink
yarn.application-attempts: 10
yarn.reallocate-failed: true
yarn.maximum-failed-containers: 10


      main函数中的主要逻辑:
DataSource dataSource = new DataSource(args);
DataStream<Message> messageStream = env.addSource(dataSource);

messageStream.map(new FlowMapper()).keyBy(0).flatMap(new FilterflowMapper()).addSink(new RedisSink());
env.execute();


其中FlatMap中使用了 MapState<String,Integer> mapState;


报错信息:


Starting execution of program
org.apache.flink.client.program.ProgramInvocationException: Could not retrieve the execution
result. (JobID: 246bb8240a70ce17683285bf87ba20a2)
        at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
        at org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
        at com.jd.ma.flink.TestMain.main(TestMain.java:39)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
        at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:816)
        at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:290)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:216)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1053)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1129)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1129)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit JobGraph.
        at org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:380)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
        at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:203)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.rest.util.RestClientException: [Internal server error.,
<Exception on server side:
java.util.concurrent.CompletionException: org.apache.flink.runtime.client.JobSubmissionException:
Failed to submit job.
        at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$submitJob$2(Dispatcher.java:267)
        at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
        at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
        at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:739)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
        at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
        at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.
        ... 24 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException:
Could not set up JobManager
        at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
        ... 4 more
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
        at org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
        at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
        at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
        ... 7 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Incompatible failover strategy - strategy
'Individual Task Restart' can only handle jobs with only disconnected tasks.
        at org.apache.flink.runtime.executiongraph.failover.RestartIndividualStrategy.notifyNewVertices(RestartIndividualStrategy.java:142)
        at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:857)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:232)
        at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:100)
        at org.apache.flink.runtime.jobmaster.JobMaster.createExecutionGraph(JobMaster.java:1151)
        at org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1131)
        at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:294)
        at org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:157)
        ... 10 more

End of exception on server side>]
        at org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:351)
        at org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:335)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
        ... 4 more

The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution
environment.




如果我Flink配置文件中不用HA配置,任务提交正常,请问下知道是什么原因造成的吗?



Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message