flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Queryable State Client with 1.3.0-rc0
Date Wed, 07 Jun 2017 15:11:10 GMT
Sorry for yet another update but this is the complete settings for making it work on Flink
1.3:

Configuration configuration = new Configuration();

configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
configuration.setString(JobManagerOptions.ADDRESS, "localhost");
configuration.setInteger(JobManagerOptions.PORT, 6123);
configuration.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
configuration.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
// needed because queryable state server is always disabled with only one TaskManager
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);

FlinkMiniCluster flinkMiniCluster = new LocalFlinkMiniCluster(
      configuration,
      HighAvailabilityServicesUtils.createHighAvailabilityServices(
            configuration,
            Executors.directExecutor(),
            HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION),
      false);

flinkMiniCluster.start();

final StreamExecutionEnvironment env =
      StreamExecutionEnvironment.createRemoteEnvironment("localhost", 6123);

> On 7. Jun 2017, at 14:55, Aljoscha Krettek <aljoscha@apache.org> wrote:
> 
> Hi Claudio,
> 
> Quick question: what exactly was your call for getting the local environment with web
UI? Did you also have a custom Configuration where you specified, for example, that the queryable
state server should be enabled?
> 
> I can make an example work where I start a local cluster in one process (in the IDE)
and then query from another process (also started in the IDE) but only if I manually start
the LocalFlinkMiniCluster, as outlined in my last mail. I’m talking about Flink 1.2.x here.
> 
> Best,
> Aljoscha
> 
>> On 6. Jun 2017, at 17:23, Aljoscha Krettek <aljoscha@apache.org> wrote:
>> 
>> Hi Claudio,
>> 
>> Quick follow up: querying a locally started cluster does not work out-of-box anymore
in Flink 1.3. You can manually start a mini cluster that has the required settings, though.
You would do something like this:
>> 
>> Configuration configuration = new Configuration();
>> configuration.addAll(jobGraph.getJobConfiguration());
>> 
>> configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
>> configuration.setString(JobManagerOptions.ADDRESS, "localhost");
>> configuration.setInteger(JobManagerOptions.PORT, 6123);
>> conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
>> 
>> flinkMiniCluster = new LocalFlinkMiniCluster(
>>   configuration,
>>   HighAvailabilityServicesUtils.createHighAvailabilityServices(
>>       configuration,
>>       Executors.directExecutor(),
>>       HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION),
>>   false);
>> 
>> flinkMiniCluster.start();
>> 
>> And then you can create a remote StreamExecutionEnvironment using StreamExecutionEnvironment.createRemoteEnvironment()
to submit your job to that cluster.
>> 
>> You can stop the cluster using flinkMiniCluster.stop()
>> 
>> I hope this helps?
>> 
>> Best,
>> Aljoscha
>> 
>>> On 6. Jun 2017, at 16:33, Aljoscha Krettek <aljoscha@apache.org> wrote:
>>> 
>>> Hi Claudio,
>>> 
>>> The documentation for this was recently updated: https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html#querying-state.
Could you see if that helps? The important bit for you is probably this:
>>> 
>>> final HighAvailabilityServices highAvailabilityServices =
>>>     HighAvailabilityServicesUtils.createHighAvailabilityServices(
>>>          config,
>>>          Executors.newSingleThreadScheduledExecutor(),
>>>          HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
>>> 
>>> If that doesn’t help we’ll need to delve deeper.
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 11. May 2017, at 22:21, Fahey, Claudio <Claudio.Fahey@dell.com>
wrote:
>>>> 
>>>> I’ve been using QueryableStateClient in Flink 1.2 successfully. I have
now upgraded to release-1.3.0-rc0 and QueryableStateClient now requires a HighAvailabilityServices
parameter. The documentation hasn’t been updated on using HighAvailabilityServices so I’m
a bit lost on what exactly I should specify for that parameter. For development, I want to
connect to a Flink Job Manager that I created from a different process using StreamExecutionEnvironment.createLocalEnvironmentWithWebUI.
Can somebody provide the code needed to create the appropriate HighAvailabilityServices parameter?
>>>> 
>>>> I have tried the following code:
>>>> 
>>>> val jobManagerIpcAddress = “localhost”
>>>> val jobManagerIpcPort = 6123
>>>> configuration.setString(JobManagerOptions.ADDRESS, jobManagerIpcAddress)
>>>> configuration.setInteger(JobManagerOptions.PORT, jobManagerIpcPort)
>>>> private val highAvailabilityServices = new StandaloneHaServices(jobManagerIpcAddress,
jobManagerIpcAddress)
>>>> private val client = new QueryableStateClient(configuration, highAvailabilityServices)
>>>> 
>>>> It results in:
>>>> 
>>>> Exception in thread "main" akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Anchor(akka://flink/), Path(/localhost)]
>>>> 
>>>> 
>>>> Claudio Fahey
>>>> Chief Solutions Architect, Analytics
>>>> Dell EMC | Emerging Technologies Team
>>>> 
>>> 
>> 
> 


Mime
View raw message