flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tison <wander4...@gmail.com>
Subject Re: flink 1.7.2 YARN Session模式提交任务问题求助
Date Wed, 15 Apr 2020 14:33:35 GMT
注意环境变量和 fs.hdfs.hdfsdefault 要配置成 HDFS 路径或 YARN
集群已知的本地路径,不要配置成客户端的路径。因为实际起作用是在拉起
TM 的那台机器上解析拉取的。

Best,
tison.


Chief <codeegg@foxmail.com> 于2020年4月15日周三 下午7:40写道:

> hi Yangze Guo
> 您说的环境变量已经在当前用户的环境变量文件里面设置了,您可以看看我的问题描述,现在如果checkpoint的路径设置不是namenode
> ha的nameservice就不会报错,checkpoint都正常。
>
>
>
>
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:&nbsp;"Yangze Guo"<karmagyz@gmail.com&gt;;
> 发送时间:&nbsp;2020年4月15日(星期三) 下午3:00
> 收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
> 主题:&nbsp;Re: flink 1.7.2 YARN Session模式提交任务问题求助
>
>
>
> Flink需要设置hadoop相关conf位置的环境变量 YARN_CONF_DIR or HADOOP_CONF_DIR
[1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/yarn_setup.html
>
> Best,
> Yangze Guo
>
> On Mon, Apr 13, 2020 at 10:52 PM Chief <codeegg@foxmail.com&gt; wrote:
> &gt;
> &gt; 大家好
> &gt; 目前环境是flink 1.7.2,使用YARN Session模式提交任务,Hadoop 版本2.7.3,hdfs
> namenode配置了ha模式,提交任务的时候报以下错误,系统环境变量中已经设置了HADOOP_HOME,YARN_CONF_DIR,HADOOP_CONF_DIR,HADOOP_CLASSPATH,在flink_conf.yaml中配置了fs.hdfs.hadoopconf
> &gt;
> &gt;
> &gt; 2020-04-10 19:12:02,908 INFO&amp;nbsp;
> org.apache.flink.runtime.jobmaster.JobMaster&amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; - Connecting to ResourceManager akka.tcp://flink@trusfortpoc1
> :23584/user/resourcemanager(00000000000000000000000000000000)
> &gt; 2020-04-10 19:12:02,909 INFO&amp;nbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool&amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; - Cannot serve slot request, no
> ResourceManager connected. Adding as pending request
> [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}]
> &gt; 2020-04-10 19:12:02,911 INFO&amp;nbsp;
> org.apache.flink.runtime.jobmaster.JobMaster&amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; - Resolved ResourceManager address, beginning registration
> &gt; 2020-04-10 19:12:02,911 INFO&amp;nbsp;
> org.apache.flink.runtime.jobmaster.JobMaster&amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; - Registration at ResourceManager attempt 1 (timeout=100ms)
> &gt; 2020-04-10 19:12:02,912 INFO&amp;nbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool&amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; - Cannot serve slot request, no
> ResourceManager connected. Adding as pending request
> [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}]
> &gt; 2020-04-10 19:12:02,913 INFO&amp;nbsp;
> org.apache.flink.yarn.YarnResourceManager&amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp;- Registering job manager
> 00000000000000000000000000000000@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
> for job 24691b33c18d7ad73b1f52edb3d68ae4.
> &gt; 2020-04-10 19:12:02,917 INFO&amp;nbsp;
> org.apache.flink.yarn.YarnResourceManager&amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp;- Registered job manager
> 00000000000000000000000000000000@akka.tcp://flink@trusfortpoc1:23584/user/jobmanager_0
> for job 24691b33c18d7ad73b1f52edb3d68ae4.
> &gt; 2020-04-10 19:12:02,919 INFO&amp;nbsp;
> org.apache.flink.runtime.jobmaster.JobMaster&amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; - JobManager successfully registered at ResourceManager, leader
> id: 00000000000000000000000000000000.
> &gt; 2020-04-10 19:12:02,919 INFO&amp;nbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool&amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; - Requesting new slot
> [SlotRequestId{35ad2384e9cd0efd30b43f5302db24b6}] and profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
> &gt; 2020-04-10 19:12:02,920 INFO&amp;nbsp;
> org.apache.flink.yarn.YarnResourceManager&amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp;- Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id
> AllocationID{5a12237c7f2bd8b1cc760ddcbab5a1c0}.
> &gt; 2020-04-10 19:12:02,921 INFO&amp;nbsp;
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool&amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; - Requesting new slot
> [SlotRequestId{0feacbb4fe16c8c7a70249f1396565d0}] and profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} from resource manager.
> &gt; 2020-04-10 19:12:02,924 INFO&amp;nbsp;
> org.apache.flink.yarn.YarnResourceManager&amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp;- Requesting new TaskExecutor container with resources
> <memory:4096, vCores:6&amp;gt;. Number pending requests 1.
> &gt; 2020-04-10 19:12:02,926 INFO&amp;nbsp;
> org.apache.flink.yarn.YarnResourceManager&amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp;- Request slot with profile
> ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, directMemoryInMB=0,
> nativeMemoryInMB=0, networkMemoryInMB=0} for job
> 24691b33c18d7ad73b1f52edb3d68ae4 with allocation id
> AllocationID{37dd666a18040bf63ffbf2e022b2ea9b}.
> &gt; 2020-04-10 19:12:06,531 INFO&amp;nbsp;
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl&amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp;- Received new token for :
> trusfortpoc3:35206
> &gt; 2020-04-10 19:12:06,543 INFO&amp;nbsp;
> org.apache.flink.yarn.YarnResourceManager&amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp;- Received new container:
> container_1586426824930_0006_01_000002 - Remaining pending container
> requests: 1
> &gt; 2020-04-10 19:12:06,543 INFO&amp;nbsp;
> org.apache.flink.yarn.YarnResourceManager&amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp;- Removing container request Capability[<memory:4096,
> vCores:6&amp;gt;]Priority[1]. Pending container requests 0.
> &gt; 2020-04-10 19:12:06,568 ERROR
> org.apache.flink.yarn.YarnResourceManager&amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;
> &amp;nbsp; &amp;nbsp;- Could not start TaskManager in container
> container_1586426824930_0006_01_000002.
> &gt; java.lang.IllegalArgumentException: java.net.UnknownHostException:
> hdfsClusterForML
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:320)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.hadoop.hdfs.DFSClient.<init&amp;gt;(DFSClient.java:687)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.hadoop.hdfs.DFSClient.<init&amp;gt;(DFSClient.java:628)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:93)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2701)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2683)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.hadoop.fs.FileSystem.get(FileSystem.java:372)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.flink.yarn.Utils.createTaskExecutorContext(Utils.java:453)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.flink.yarn.YarnResourceManager.createTaskExecutorLaunchContext(YarnResourceManager.java:555)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.flink.yarn.YarnResourceManager.lambda$onContainersAllocated$1(YarnResourceManager.java:390)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.flink.yarn.YarnResourceManager$$Lambda$183/1182651376.run(Unknown
> Source)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:332)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:158)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:70)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> akka.actor.ActorCell.invoke(ActorCell.scala:495)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> akka.dispatch.Mailbox.run(Mailbox.scala:224)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> &gt; Caused by: java.net.UnknownHostException: hdfsClusterForML
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
... 33 more
> &gt;
> &gt;
> &gt; 这个hdfsClusterForML是namenode ha
> 的nameservice,经过分析是没加载hdfs-site.xml配置导致的,
> &gt; 也尝试过把Hadoop的几个配置文件放到flink
> 的conf目录下但都无效,最终通过改YarnResourceManager源码后能够正常提交任务。
> &gt; public YarnResourceManager(
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; RpcService rpcService,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; String resourceManagerEndpointId,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; ResourceID resourceId,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; Configuration flinkConfig,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; Map<String, String&amp;gt;
env,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; HighAvailabilityServices
> highAvailabilityServices,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; HeartbeatServices heartbeatServices,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; SlotManager slotManager,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; MetricRegistry metricRegistry,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; JobLeaderIdService
> jobLeaderIdService,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; ClusterInformation
> clusterInformation,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; FatalErrorHandler fatalErrorHandler,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; @Nullable String webInterfaceUrl,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; JobManagerMetricGroup
> jobManagerMetricGroup) {
> &gt; &amp;nbsp; &amp;nbsp;super(
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; rpcService,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; resourceManagerEndpointId,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; resourceId,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; highAvailabilityServices,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; heartbeatServices,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; slotManager,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; metricRegistry,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; jobLeaderIdService,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; clusterInformation,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; fatalErrorHandler,
> &gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; jobManagerMetricGroup);
> &gt; &amp;nbsp; &amp;nbsp;this.flinkConfig&amp;nbsp; = flinkConfig;
> &gt; &amp;nbsp; &amp;nbsp;this.yarnConfig = new
> YarnConfiguration(HadoopUtils.getHadoopConfiguration(flinkConfig));
> &gt; 但我认为这肯定不是解决问题的方法,所以向大家求助,是不是我忽略什么。

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message