Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E1710200C8C for ; Mon, 22 May 2017 11:21:46 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E012C160BBF; Mon, 22 May 2017 09:21:46 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B0FC9160BA5 for ; Mon, 22 May 2017 11:21:45 +0200 (CEST) Received: (qmail 22225 invoked by uid 500); 22 May 2017 09:21:44 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 22207 invoked by uid 99); 22 May 2017 09:21:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 May 2017 09:21:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 28F1CDFAEB; Mon, 22 May 2017 09:21:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Mon, 22 May 2017 09:21:44 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-6629] Use HAServices to find connecting address for ClusterClient's ActorSystem archived-at: Mon, 22 May 2017 09:21:47 -0000 Repository: flink Updated Branches: refs/heads/release-1.3 6e40223b5 -> f783e529c [FLINK-6629] Use HAServices to find connecting address for ClusterClient's ActorSystem The ClusterClient starts its ActorSystem lazily. In order to find out the address to which to bind, the ClusterClient tries to connect to the JobManager. In order to find out the JobManager's address it is important to use the HighAvailabilityServices instead of retrieving the address information from the configuration, because otherwise it conflicts with HA mode. This closes #3949. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f783e529 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f783e529 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f783e529 Branch: refs/heads/release-1.3 Commit: f783e529c558ac3df68b4e69fb931f2d55b55db7 Parents: 77c02fe Author: Till Rohrmann Authored: Fri May 19 14:31:19 2017 +0200 Committer: Till Rohrmann Committed: Mon May 22 11:21:20 2017 +0200 ---------------------------------------------------------------------- flink-clients/pom.xml | 8 ++ .../flink/client/program/ClusterClient.java | 110 +++++++++++++------ .../client/program/ClientConnectionTest.java | 52 +++++++++ .../runtime/util/LeaderRetrievalUtils.java | 4 +- .../RemoteEnvironmentITCase.java | 8 +- .../apache/flink/yarn/YarnClusterClient.java | 28 +++-- 6 files changed, 165 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-clients/pom.xml ---------------------------------------------------------------------- diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml index 0e0c146..205c3d8 100644 --- a/flink-clients/pom.xml +++ b/flink-clients/pom.xml @@ -76,6 +76,14 @@ under the License. ${project.version} test + + + org.apache.flink + flink-runtime_${scala.binary.version} + ${project.version} + test + test-jar + com.data-artisans http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index e09a0b6..e7314eb 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.accumulators.AccumulatorHelper; -import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.optimizer.CompilerException; @@ -54,7 +54,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous; import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound; import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults; -import org.apache.flink.runtime.net.ConnectionUtils; import org.apache.flink.runtime.util.LeaderConnectionInfo; import org.apache.flink.runtime.util.LeaderRetrievalUtils; import org.apache.flink.util.FlinkException; @@ -62,13 +61,12 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.Some; +import scala.Option; import scala.Tuple2; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; -import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URISyntaxException; @@ -151,7 +149,11 @@ public abstract class ClusterClient { this.timeout = AkkaUtils.getClientTimeout(flinkConfig); this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig); - this.actorSystemLoader = new LazyActorSystemLoader(flinkConfig, LOG); + this.actorSystemLoader = new LazyActorSystemLoader( + highAvailabilityServices, + Time.milliseconds(lookupTimeout.toMillis()), + flinkConfig, + LOG); this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); } @@ -164,13 +166,23 @@ public abstract class ClusterClient { private final Logger LOG; - private final Configuration flinkConfig; + private final HighAvailabilityServices highAvailabilityServices; + + private final Time timeout; + + private final Configuration configuration; private ActorSystem actorSystem; - private LazyActorSystemLoader(Configuration flinkConfig, Logger LOG) { - this.flinkConfig = flinkConfig; - this.LOG = LOG; + private LazyActorSystemLoader( + HighAvailabilityServices highAvailabilityServices, + Time timeout, + Configuration configuration, + Logger LOG) { + this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices); + this.timeout = Preconditions.checkNotNull(timeout); + this.configuration = Preconditions.checkNotNull(configuration); + this.LOG = Preconditions.checkNotNull(LOG); } /** @@ -192,30 +204,31 @@ public abstract class ClusterClient { /** * Creates a new ActorSystem or returns an existing one. * @return ActorSystem + * @throws Exception if the ActorSystem could not be created */ - public ActorSystem get() { + public ActorSystem get() throws FlinkException { if (!isLoaded()) { // start actor system LOG.info("Starting client actor system."); - String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); - if (hostName == null || port == -1) { - throw new RuntimeException("The initial JobManager address has not been set correctly."); + final InetAddress ownHostname; + try { + ownHostname = LeaderRetrievalUtils.findConnectingAddress( + highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), + timeout); + } catch (LeaderRetrievalException lre) { + throw new FlinkException("Could not find out our own hostname by connecting to the " + + "leading JobManager. Please make sure that the Flink cluster has been started.", lre); } - InetSocketAddress initialJobManagerAddress = new InetSocketAddress(hostName, port); - // find name of own public interface, able to connect to the JM - // try to find address for 2 seconds. log after 400 ms. - InetAddress ownHostname; try { - ownHostname = ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000, 400); - } catch (IOException e) { - throw new RuntimeException("Failed to resolve JobManager address at " + initialJobManagerAddress, e); + actorSystem = AkkaUtils.createActorSystem( + configuration, + Option.apply(new Tuple2(ownHostname.getCanonicalHostName(), 0))); + } catch (Exception e) { + throw new FlinkException("Could not start the ActorSystem lazily.", e); } - actorSystem = AkkaUtils.createActorSystem(flinkConfig, - new Some<>(new Tuple2(ownHostname.getCanonicalHostName(), 0))); } return actorSystem; @@ -440,10 +453,19 @@ public abstract class ClusterClient { waitForClusterToBeReady(); + final ActorSystem actorSystem; + + try { + actorSystem = actorSystemLoader.get(); + } catch (FlinkException fe) { + throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to the " + + "JobManager.", fe); + } + try { logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job completion."); this.lastJobExecutionResult = JobClient.submitJobAndWait( - actorSystemLoader.get(), + actorSystem, flinkConfig, highAvailabilityServices, jobGraph, @@ -451,7 +473,7 @@ public abstract class ClusterClient { printStatusDuringExecution, classLoader); - return this.lastJobExecutionResult; + return lastJobExecutionResult; } catch (JobExecutionException e) { throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e); } @@ -491,6 +513,17 @@ public abstract class ClusterClient { * @throws JobExecutionException if an error occurs during monitoring the job execution */ public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException { + final ActorSystem actorSystem; + + try { + actorSystem = actorSystemLoader.get(); + } catch (FlinkException fe) { + throw new JobExecutionException( + jobID, + "Could not start the ActorSystem needed to talk to the JobManager.", + fe); + } + ActorGateway jobManagerGateway; try { jobManagerGateway = getJobManagerGateway(); @@ -502,7 +535,7 @@ public abstract class ClusterClient { jobID, jobManagerGateway, flinkConfig, - actorSystemLoader.get(), + actorSystem, highAvailabilityServices, timeout, printStatusDuringExecution); @@ -518,6 +551,17 @@ public abstract class ClusterClient { * @throws JobExecutionException if an error occurs during monitoring the job execution */ public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException { + final ActorSystem actorSystem; + + try { + actorSystem = actorSystemLoader.get(); + } catch (FlinkException fe) { + throw new JobExecutionException( + jobID, + "Could not start the ActorSystem needed to talk to the JobManager.", + fe); + } + ActorGateway jobManagerGateway; try { jobManagerGateway = getJobManagerGateway(); @@ -526,13 +570,13 @@ public abstract class ClusterClient { } return JobClient.attachToRunningJob( - jobID, - jobManagerGateway, - flinkConfig, - actorSystemLoader.get(), - highAvailabilityServices, - timeout, - printStatusDuringExecution); + jobID, + jobManagerGateway, + flinkConfig, + actorSystem, + highAvailabilityServices, + timeout, + printStatusDuringExecution); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java index 3bfaa95..246a75c 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java @@ -18,16 +18,27 @@ package org.apache.flink.client.program; +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobClientActorTest; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.NetUtils; import org.apache.flink.util.TestLogger; import org.junit.Test; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.util.UUID; import static org.junit.Assert.*; @@ -98,4 +109,45 @@ public class ClientConnectionTest extends TestLogger { assertTrue(CommonTestUtils.containsCause(e, LeaderRetrievalException.class)); } } + + /** + * FLINK-6629 + * + * Tests that the {@link HighAvailabilityServices} are respected when initializing the ClusterClient's + * {@link ActorSystem} and retrieving the leading JobManager. + */ + @Test + public void testJobManagerRetrievalWithHAServices() throws Exception { + final Configuration configuration = new Configuration(); + final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); + final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + ActorRef actorRef = null; + final UUID leaderId = UUID.randomUUID(); + + try { + actorRef = actorSystem.actorOf( + Props.create( + JobClientActorTest.PlainActor.class, + leaderId)); + + final String expectedAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef); + + final TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(expectedAddress, leaderId); + + highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, testingLeaderRetrievalService); + + ClusterClient client = new StandaloneClusterClient(configuration, highAvailabilityServices); + + ActorGateway gateway = client.getJobManagerGateway(); + + assertEquals(expectedAddress, gateway.path()); + assertEquals(leaderId, gateway.leaderSessionID()); + } finally { + if (actorRef != null) { + TestingUtils.stopActorGracefully(actorRef); + } + + actorSystem.shutdown(); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java index 009bec6..6b861a3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java @@ -137,8 +137,8 @@ public class LeaderRetrievalUtils { } public static InetAddress findConnectingAddress( - LeaderRetrievalService leaderRetrievalService, - Time timeout) throws LeaderRetrievalException { + LeaderRetrievalService leaderRetrievalService, + Time timeout) throws LeaderRetrievalException { return findConnectingAddress(leaderRetrievalService, new FiniteDuration(timeout.getSize(), timeout.getUnit())); } http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java index 0091571..7c6f73a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java @@ -23,11 +23,13 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.io.LocalCollectionOutputFormat; +import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.GenericInputSplit; import org.apache.flink.runtime.minicluster.StandaloneMiniCluster; import org.apache.flink.util.Collector; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Assert; @@ -73,7 +75,7 @@ public class RemoteEnvironmentITCase extends TestLogger { /** * Ensure that that Akka configuration parameters can be set. */ - @Test(expected=IllegalArgumentException.class) + @Test(expected=FlinkException.class) public void testInvalidAkkaConfiguration() throws Throwable { Configuration config = new Configuration(); config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT); @@ -86,11 +88,11 @@ public class RemoteEnvironmentITCase extends TestLogger { env.getConfig().disableSysoutLogging(); DataSet result = env.createInput(new TestNonRichInputFormat()); - result.output(new LocalCollectionOutputFormat(new ArrayList())); + result.output(new LocalCollectionOutputFormat<>(new ArrayList())); try { env.execute(); Assert.fail("Program should not run successfully, cause of invalid akka settings."); - } catch (IOException ex) { + } catch (ProgramInvocationException ex) { throw ex.getCause(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java index e70af09..8f47b18 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java @@ -19,6 +19,7 @@ package org.apache.flink.yarn; import akka.actor.ActorRef; +import akka.actor.ActorSystem; import akka.actor.Props; import akka.pattern.Patterns; import akka.util.Timeout; @@ -34,6 +35,7 @@ import org.apache.flink.runtime.clusterframework.messages.InfoMessage; import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.cli.FlinkYarnSessionCli; import org.apache.hadoop.conf.Configuration; @@ -569,17 +571,29 @@ public class YarnClusterClient extends ClusterClient { * Creates a new ApplicationClient actor or returns an existing one. May start an ActorSystem. * @return ActorSystem */ - public ActorRef get() { + public ActorRef get() throws FlinkException { if (applicationClient == null) { // start application client LOG.info("Start application client."); - applicationClient = actorSystemLoader.get().actorOf( - Props.create( - ApplicationClient.class, - flinkConfig, - highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID)), - "applicationClient"); + final ActorSystem actorSystem; + + try { + actorSystem = actorSystemLoader.get(); + } catch (FlinkException fle) { + throw new FlinkException("Could not start the ClusterClient's ActorSystem.", fle); + } + + try { + applicationClient = actorSystem.actorOf( + Props.create( + ApplicationClient.class, + flinkConfig, + highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID)), + "applicationClient"); + } catch (Exception e) { + throw new FlinkException("Could not start the ApplicationClient.", e); + } } return applicationClient;