flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-6629] Use HAServices to find connecting address for ClusterClient's ActorSystem
Date Mon, 22 May 2017 09:21:44 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 6e40223b5 -> f783e529c


[FLINK-6629] Use HAServices to find connecting address for ClusterClient's ActorSystem

The ClusterClient starts its ActorSystem lazily. In order to find out the address
to which to bind, the ClusterClient tries to connect to the JobManager. In order
to find out the JobManager's address it is important to use the
HighAvailabilityServices instead of retrieving the address information from the
configuration, because otherwise it conflicts with HA mode.

This closes #3949.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f783e529
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f783e529
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f783e529

Branch: refs/heads/release-1.3
Commit: f783e529c558ac3df68b4e69fb931f2d55b55db7
Parents: 77c02fe
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri May 19 14:31:19 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon May 22 11:21:20 2017 +0200

----------------------------------------------------------------------
 flink-clients/pom.xml                           |   8 ++
 .../flink/client/program/ClusterClient.java     | 110 +++++++++++++------
 .../client/program/ClientConnectionTest.java    |  52 +++++++++
 .../runtime/util/LeaderRetrievalUtils.java      |   4 +-
 .../RemoteEnvironmentITCase.java                |   8 +-
 .../apache/flink/yarn/YarnClusterClient.java    |  28 +++--
 6 files changed, 165 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-clients/pom.xml
----------------------------------------------------------------------
diff --git a/flink-clients/pom.xml b/flink-clients/pom.xml
index 0e0c146..205c3d8 100644
--- a/flink-clients/pom.xml
+++ b/flink-clients/pom.xml
@@ -76,6 +76,14 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
 		
 		<dependency>
 			<groupId>com.data-artisans</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index e09a0b6..e7314eb 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.optimizer.CompilerException;
@@ -54,7 +54,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
 import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
-import org.apache.flink.runtime.net.ConnectionUtils;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.apache.flink.util.FlinkException;
@@ -62,13 +61,12 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.Some;
+import scala.Option;
 import scala.Tuple2;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URISyntaxException;
@@ -151,7 +149,11 @@ public abstract class ClusterClient {
 		this.timeout = AkkaUtils.getClientTimeout(flinkConfig);
 		this.lookupTimeout = AkkaUtils.getLookupTimeout(flinkConfig);
 
-		this.actorSystemLoader = new LazyActorSystemLoader(flinkConfig, LOG);
+		this.actorSystemLoader = new LazyActorSystemLoader(
+			highAvailabilityServices,
+			Time.milliseconds(lookupTimeout.toMillis()),
+			flinkConfig,
+			LOG);
 
 		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
 	}
@@ -164,13 +166,23 @@ public abstract class ClusterClient {
 
 		private final Logger LOG;
 
-		private final Configuration flinkConfig;
+		private final HighAvailabilityServices highAvailabilityServices;
+
+		private final Time timeout;
+
+		private final Configuration configuration;
 
 		private ActorSystem actorSystem;
 
-		private LazyActorSystemLoader(Configuration flinkConfig, Logger LOG) {
-			this.flinkConfig = flinkConfig;
-			this.LOG = LOG;
+		private LazyActorSystemLoader(
+				HighAvailabilityServices highAvailabilityServices,
+				Time timeout,
+				Configuration configuration,
+				Logger LOG) {
+			this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
+			this.timeout = Preconditions.checkNotNull(timeout);
+			this.configuration = Preconditions.checkNotNull(configuration);
+			this.LOG = Preconditions.checkNotNull(LOG);
 		}
 
 		/**
@@ -192,30 +204,31 @@ public abstract class ClusterClient {
 		/**
 		 * Creates a new ActorSystem or returns an existing one.
 		 * @return ActorSystem
+		 * @throws Exception if the ActorSystem could not be created
 		 */
-		public ActorSystem get() {
+		public ActorSystem get() throws FlinkException {
 
 			if (!isLoaded()) {
 				// start actor system
 				LOG.info("Starting client actor system.");
 
-				String hostName = flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
null);
-				int port = flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-				if (hostName == null || port == -1) {
-					throw new RuntimeException("The initial JobManager address has not been set correctly.");
+				final InetAddress ownHostname;
+				try {
+					ownHostname = LeaderRetrievalUtils.findConnectingAddress(
+						highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
+						timeout);
+				} catch (LeaderRetrievalException lre) {
+					throw new FlinkException("Could not find out our own hostname by connecting to the "
+
+						"leading JobManager. Please make sure that the Flink cluster has been started.", lre);
 				}
-				InetSocketAddress initialJobManagerAddress = new InetSocketAddress(hostName, port);
 
-				// find name of own public interface, able to connect to the JM
-				// try to find address for 2 seconds. log after 400 ms.
-				InetAddress ownHostname;
 				try {
-					ownHostname = ConnectionUtils.findConnectingAddress(initialJobManagerAddress, 2000,
400);
-				} catch (IOException e) {
-					throw new RuntimeException("Failed to resolve JobManager address at " + initialJobManagerAddress,
e);
+					actorSystem = AkkaUtils.createActorSystem(
+						configuration,
+						Option.apply(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(), 0)));
+				} catch (Exception e) {
+					throw new FlinkException("Could not start the ActorSystem lazily.", e);
 				}
-				actorSystem = AkkaUtils.createActorSystem(flinkConfig,
-					new Some<>(new Tuple2<String, Object>(ownHostname.getCanonicalHostName(),
0)));
 			}
 
 			return actorSystem;
@@ -440,10 +453,19 @@ public abstract class ClusterClient {
 
 		waitForClusterToBeReady();
 
+		final ActorSystem actorSystem;
+
+		try {
+			actorSystem = actorSystemLoader.get();
+		} catch (FlinkException fe) {
+			throw new ProgramInvocationException("Could not start the ActorSystem needed to talk to
the " +
+				"JobManager.", fe);
+		}
+
 		try {
 			logAndSysout("Submitting job with JobID: " + jobGraph.getJobID() + ". Waiting for job
completion.");
 			this.lastJobExecutionResult = JobClient.submitJobAndWait(
-				actorSystemLoader.get(),
+				actorSystem,
 				flinkConfig,
 				highAvailabilityServices,
 				jobGraph,
@@ -451,7 +473,7 @@ public abstract class ClusterClient {
 				printStatusDuringExecution,
 				classLoader);
 
-			return this.lastJobExecutionResult;
+			return lastJobExecutionResult;
 		} catch (JobExecutionException e) {
 			throw new ProgramInvocationException("The program execution failed: " + e.getMessage(),
e);
 		}
@@ -491,6 +513,17 @@ public abstract class ClusterClient {
 	 * @throws JobExecutionException if an error occurs during monitoring the job execution
 	 */
 	public JobExecutionResult retrieveJob(JobID jobID) throws JobExecutionException {
+		final ActorSystem actorSystem;
+
+		try {
+			actorSystem = actorSystemLoader.get();
+		} catch (FlinkException fe) {
+			throw new JobExecutionException(
+				jobID,
+				"Could not start the ActorSystem needed to talk to the JobManager.",
+				fe);
+		}
+
 		ActorGateway jobManagerGateway;
 		try {
 			jobManagerGateway = getJobManagerGateway();
@@ -502,7 +535,7 @@ public abstract class ClusterClient {
 			jobID,
 			jobManagerGateway,
 			flinkConfig,
-			actorSystemLoader.get(),
+			actorSystem,
 			highAvailabilityServices,
 			timeout,
 			printStatusDuringExecution);
@@ -518,6 +551,17 @@ public abstract class ClusterClient {
 	 * @throws JobExecutionException if an error occurs during monitoring the job execution
 	 */
 	public JobListeningContext connectToJob(JobID jobID) throws JobExecutionException {
+		final ActorSystem actorSystem;
+
+		try {
+			actorSystem = actorSystemLoader.get();
+		} catch (FlinkException fe) {
+			throw new JobExecutionException(
+				jobID,
+				"Could not start the ActorSystem needed to talk to the JobManager.",
+				fe);
+		}
+
 		ActorGateway jobManagerGateway;
 		try {
 			jobManagerGateway = getJobManagerGateway();
@@ -526,13 +570,13 @@ public abstract class ClusterClient {
 		}
 
 		return JobClient.attachToRunningJob(
-				jobID,
-				jobManagerGateway,
-				flinkConfig,
-				actorSystemLoader.get(),
-				highAvailabilityServices,
-				timeout,
-				printStatusDuringExecution);
+			jobID,
+			jobManagerGateway,
+			flinkConfig,
+			actorSystem,
+			highAvailabilityServices,
+			timeout,
+			printStatusDuringExecution);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
index 3bfaa95..246a75c 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientConnectionTest.java
@@ -18,16 +18,27 @@
 
 package org.apache.flink.client.program;
 
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClientActorTest;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.NetUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
+import java.util.UUID;
 
 import static org.junit.Assert.*;
 
@@ -98,4 +109,45 @@ public class ClientConnectionTest extends TestLogger {
 			assertTrue(CommonTestUtils.containsCause(e, LeaderRetrievalException.class));
 		}
 	}
+
+	/**
+	 * FLINK-6629
+	 *
+	 * Tests that the {@link HighAvailabilityServices} are respected when initializing the ClusterClient's
+	 * {@link ActorSystem} and retrieving the leading JobManager.
+	 */
+	@Test
+	public void testJobManagerRetrievalWithHAServices() throws Exception {
+		final Configuration configuration = new Configuration();
+		final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
+		final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+		ActorRef actorRef = null;
+		final UUID leaderId = UUID.randomUUID();
+
+		try {
+			actorRef = actorSystem.actorOf(
+				Props.create(
+					JobClientActorTest.PlainActor.class,
+					leaderId));
+
+			final String expectedAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
+
+			final TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(expectedAddress,
leaderId);
+
+			highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID,
testingLeaderRetrievalService);
+
+			ClusterClient client = new StandaloneClusterClient(configuration, highAvailabilityServices);
+
+			ActorGateway gateway = client.getJobManagerGateway();
+
+			assertEquals(expectedAddress, gateway.path());
+			assertEquals(leaderId, gateway.leaderSessionID());
+		} finally {
+			if (actorRef != null) {
+				TestingUtils.stopActorGracefully(actorRef);
+			}
+
+			actorSystem.shutdown();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
index 009bec6..6b861a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LeaderRetrievalUtils.java
@@ -137,8 +137,8 @@ public class LeaderRetrievalUtils {
 	}
 
 	public static InetAddress findConnectingAddress(
-		LeaderRetrievalService leaderRetrievalService,
-		Time timeout) throws LeaderRetrievalException {
+			LeaderRetrievalService leaderRetrievalService,
+			Time timeout) throws LeaderRetrievalException {
 		return findConnectingAddress(leaderRetrievalService, new FiniteDuration(timeout.getSize(),
timeout.getUnit()));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
index 0091571..7c6f73a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
@@ -23,11 +23,13 @@ import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.GenericInputSplit;
 import org.apache.flink.runtime.minicluster.StandaloneMiniCluster;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -73,7 +75,7 @@ public class RemoteEnvironmentITCase extends TestLogger {
 	/**
 	 * Ensure that that Akka configuration parameters can be set.
 	 */
-	@Test(expected=IllegalArgumentException.class)
+	@Test(expected=FlinkException.class)
 	public void testInvalidAkkaConfiguration() throws Throwable {
 		Configuration config = new Configuration();
 		config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, INVALID_STARTUP_TIMEOUT);
@@ -86,11 +88,11 @@ public class RemoteEnvironmentITCase extends TestLogger {
 		env.getConfig().disableSysoutLogging();
 
 		DataSet<String> result = env.createInput(new TestNonRichInputFormat());
-		result.output(new LocalCollectionOutputFormat<String>(new ArrayList<String>()));
+		result.output(new LocalCollectionOutputFormat<>(new ArrayList<String>()));
 		try {
 			env.execute();
 			Assert.fail("Program should not run successfully, cause of invalid akka settings.");
-		} catch (IOException ex) {
+		} catch (ProgramInvocationException ex) {
 			throw ex.getCause();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f783e529/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
index e70af09..8f47b18 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClient.java
@@ -19,6 +19,7 @@ package org.apache.flink.yarn;
 
 import akka.actor.ActorRef;
 
+import akka.actor.ActorSystem;
 import akka.actor.Props;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
@@ -34,6 +35,7 @@ import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.messages.ShutdownClusterAfterJob;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.hadoop.conf.Configuration;
@@ -569,17 +571,29 @@ public class YarnClusterClient extends ClusterClient {
 		 * Creates a new ApplicationClient actor or returns an existing one. May start an ActorSystem.
 		 * @return ActorSystem
 		 */
-		public ActorRef get() {
+		public ActorRef get() throws FlinkException {
 			if (applicationClient == null) {
 				// start application client
 				LOG.info("Start application client.");
 
-				applicationClient = actorSystemLoader.get().actorOf(
-					Props.create(
-						ApplicationClient.class,
-						flinkConfig,
-						highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID)),
-					"applicationClient");
+				final ActorSystem actorSystem;
+
+				try {
+					actorSystem = actorSystemLoader.get();
+				} catch (FlinkException fle) {
+					throw new FlinkException("Could not start the ClusterClient's ActorSystem.", fle);
+				}
+
+				try {
+					applicationClient = actorSystem.actorOf(
+						Props.create(
+							ApplicationClient.class,
+							flinkConfig,
+							highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID)),
+						"applicationClient");
+				} catch (Exception e) {
+					throw new FlinkException("Could not start the ApplicationClient.", e);
+				}
 			}
 
 			return applicationClient;


Mime
View raw message