flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 郑 洁锋 <zjfpla...@hotmail.com>
Subject Re: Re: MiniCluster问题
Date Thu, 16 Jan 2020 06:24:22 GMT
这是完整的到启动的代码

public class ClusterClientFactory {

    public static ClusterClient createClusterClient(Options launcherOptions) throws Exception
{
        String mode = launcherOptions.getMode();
        if(mode.equals(ClusterMode.standalone.name())) {
            return createStandaloneClient(launcherOptions);
        } else if(mode.equals(ClusterMode.yarn.name())) {
            return createYarnClient(launcherOptions,mode);
        }
        throw new IllegalArgumentException("Unsupported cluster client type: ");
    }

    public static ClusterClient createStandaloneClient(Options launcherOptions) throws Exception
{
        String flinkConfDir = launcherOptions.getFlinkconf();
        Configuration config = GlobalConfiguration.loadConfiguration(flinkConfDir);
        MiniClusterConfiguration.Builder configBuilder = new MiniClusterConfiguration.Builder();
        configBuilder.setConfiguration(config);
        MiniCluster miniCluster = new MiniCluster(configBuilder.build());
        MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster);
        LeaderConnectionInfo connectionInfo = clusterClient.getClusterConnectionInfo();
        InetSocketAddress address = AkkaUtils.getInetSocketAddressFromAkkaURL(connectionInfo.getAddress());
        config.setString(JobManagerOptions.ADDRESS, address.getAddress().getHostName());
        config.setInteger(JobManagerOptions.PORT, address.getPort());
        clusterClient.setDetached(true);
        return clusterClient;
    }


启动类中:

ClusterClient clusterClient = ClusterClientFactory.createClusterClient(launcherOptions);
clusterClient.run(program, 1);
clusterClient.shutdown();

________________________________
zjfplayer@hotmail.com

发件人: tison<mailto:wander4096@gmail.com>
发送时间: 2020-01-16 13:31
收件人: user-zh<mailto:user-zh@flink.apache.org>
主题: Re: Re: MiniCluster问题
MiniCluster miniCluster = new MiniCluster(configBuilder.build());

miniCluster.start();


MiniClusterClient clusterClient = new MiniClusterClient(config, miniCluster)
;

Best,
tison.


tison <wander4096@gmail.com> 于2020年1月16日周四 下午1:30写道:

> 跟集群无关
> Best,
> tison.
>
>
> tison <wander4096@gmail.com> 于2020年1月16日周四 下午1:30写道:
>
>> 1. 这是个内部实现类,文档是一个问题,但最好不要直接依赖
>>
>> 2. ?你截图的代码片段在哪?我看是你自己代码创建使用的啊
>>
>> Best,
>> tison.
>>
>>
>> 郑 洁锋 <zjfplayer@hotmail.com> 于2020年1月16日周四 下午1:18写道:
>>
>>> MiniCluster有没有相关文档,官方文档里面看到部署模式里面没有相关的内容?
>>> 我是通过bin/start-cluster.sh启动的flink standalone集群
>>>
>>>
>>> ________________________________
>>> zjfplayer@hotmail.com
>>>
>>> 发件人: tison<mailto:wander4096@gmail.com>
>>> 发送时间: 2020-01-16 12:39
>>> 收件人: user-zh<mailto:user-zh@flink.apache.org>
>>> 主题: Re: MiniCluster问题
>>> 你 MiniCluster 要 start 啊(x
>>>
>>> Best,
>>> tison.
>>>
>>>
>>> 郑 洁锋 <zjfplayer@hotmail.com> 于2020年1月16日周四 上午11:38写道:
>>>
>>> > MiniCluster代码执行过程中报错:
>>> >
>>> > SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
>>> > SLF4J: Defaulting to no-operation (NOP) logger implementation
>>> > SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
>>> further details.
>>> > Exception in thread "main" java.lang.IllegalStateException:
>>> MiniCluster is not yet running.
>>> >         at
>>> org.apache.flink.util.Preconditions.checkState(Preconditions.java:195)
>>> >         at
>>> org.apache.flink.runtime.minicluster.MiniCluster.getHighAvailabilityServices(MiniCluster.java:223)
>>> >         at
>>> org.apache.flink.client.program.MiniClusterClient.<init>(MiniClusterClient.java:61)
>>> >         at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createStandaloneClient(ClusterClientFactory.java:82)
>>> >         at
>>> com.dtstack.flink.sql.launcher.ClusterClientFactory.createClusterClient(ClusterClientFactory.java:69)
>>> >         at
>>> com.dtstack.flink.sql.launcher.LauncherMain.main(LauncherMain.java:99)
>>> >
>>> > 报错段代码如下:
>>> >
>>> > Configuration config =
>>> GlobalConfiguration.loadConfiguration(flinkConfDir);
>>> > MiniClusterConfiguration.Builder configBuilder = new
>>> MiniClusterConfiguration.Builder();
>>> > configBuilder.setConfiguration(config);
>>> > MiniCluster miniCluster = new MiniCluster(configBuilder.build());
>>> > MiniClusterClient clusterClient = new MiniClusterClient(config,
>>> miniCluster);
>>> >
>>> > 其中flinkConfDir为/opt/flink/conf
>>> >
>>> >
>>> > flink standalone HA集群信息如下:
>>> > ------------------------------
>>> > zjfplayer@hotmail.com
>>> >
>>> >
>>> >
>>>
>>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message