flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/4] flink git commit: [FLINK-7381] [web] Decouple WebRuntimeMonitor from ActorGateway
Date Fri, 11 Aug 2017 11:48:58 GMT
Repository: flink
Updated Branches:
  refs/heads/master 00d5b6222 -> 9f790d3ef


http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
new file mode 100644
index 0000000..e608aa0
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/retriever/impl/RpcJobManagerRetriever.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.retriever.impl;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
+import org.apache.flink.util.Preconditions;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * JobManagerRetriever implementation for Flip-6 JobManager.
+ */
+public class RpcJobManagerRetriever extends JobManagerRetriever {
+
+	private final RpcService rpcService;
+
+	public RpcJobManagerRetriever(
+		RpcService rpcService) {
+
+		this.rpcService = Preconditions.checkNotNull(rpcService);
+	}
+
+	@Override
+	protected CompletableFuture<JobManagerGateway> createJobManagerGateway(String leaderAddress,
UUID leaderId) throws Exception {
+		return rpcService.connect(leaderAddress, JobManagerGateway.class);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index e490b48..1616a7b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
-import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager}
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -82,6 +82,7 @@ import org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskexecutor.TaskExecutor
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
+import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{InstantiationUtil, NetUtils, SerializedThrowable}
@@ -2219,12 +2220,17 @@ object JobManager {
       if (configuration.getInteger(WebOptions.PORT, 0) >= 0) {
         LOG.info("Starting JobManager web frontend")
 
+        val timeout = FutureUtils.toTime(AkkaUtils.getTimeout(configuration))
+
         // start the web frontend. we need to load this dynamically
         // because it is not in the same project/dependencies
         val webServer = WebMonitorUtils.startWebRuntimeMonitor(
           configuration,
           highAvailabilityServices,
-          jobManagerSystem)
+          new AkkaJobManagerRetriever(jobManagerSystem, timeout),
+          new AkkaQueryServiceRetriever(jobManagerSystem, timeout),
+          timeout,
+          jobManagerSystem.dispatcher)
 
         Option(webServer)
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index bc323cc..831c026 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -32,7 +32,7 @@ import org.apache.flink.configuration._
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils}
 import org.apache.flink.runtime.client.{JobClient, JobExecutionException}
-import org.apache.flink.runtime.concurrent.{Executors => FlinkExecutors}
+import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors}
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader
 import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils}
 import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway}
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
 import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.util.{ExecutorThreadFactory, Hardware}
+import org.apache.flink.runtime.webmonitor.retriever.impl.{AkkaJobManagerRetriever, AkkaQueryServiceRetriever}
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
 import org.apache.flink.util.NetUtils
 import org.slf4j.LoggerFactory
@@ -389,6 +390,8 @@ abstract class FlinkMiniCluster(
       config.getBoolean(ConfigConstants.LOCAL_START_WEBSERVER, false) &&
         config.getInteger(WebOptions.PORT, 0) >= 0) {
 
+      val flinkTimeout = FutureUtils.toTime(timeout)
+
       LOG.info("Starting JobManger web frontend")
       // start the new web frontend. we need to load this dynamically
       // because it is not in the same project/dependencies
@@ -396,7 +399,10 @@ abstract class FlinkMiniCluster(
         WebMonitorUtils.startWebRuntimeMonitor(
           config,
           highAvailabilityServices,
-          actorSystem)
+          new AkkaJobManagerRetriever(actorSystem, flinkTimeout),
+          new AkkaQueryServiceRetriever(actorSystem, flinkTimeout),
+          flinkTimeout,
+          actorSystem.dispatcher)
       )
 
       webServer.foreach(_.start(jobManagerAkkaURL))

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
index 858bbbb..ddbb82d 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingUtils.scala
@@ -62,6 +62,8 @@ object TestingUtils {
 
   val TESTING_TIMEOUT = 1 minute
 
+  val TIMEOUT = Time.minutes(1L)
+
   val DEFAULT_AKKA_ASK_TIMEOUT = "200 s"
 
   def getDefaultTestingActorSystemConfigString: String = {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f790d3e/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
index 88cc585..9130901 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -44,6 +45,8 @@ import org.apache.flink.runtime.util.Hardware;
 import org.apache.flink.runtime.util.JvmShutdownSafeguard;
 import org.apache.flink.runtime.util.SignalHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaJobManagerRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
@@ -358,11 +361,16 @@ public class YarnApplicationMasterRunner {
 			// 2: the web monitor
 			LOG.debug("Starting Web Frontend");
 
+			Time webMonitorTimeout = Time.milliseconds(config.getLong(WebOptions.TIMEOUT));
+
 			webMonitor = BootstrapTools.startWebMonitorIfConfigured(
 				config,
 				highAvailabilityServices,
-				actorSystem,
-				jobManager,
+				new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout),
+				new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
+				webMonitorTimeout,
+				futureExecutor,
+				AkkaUtils.getAkkaURL(actorSystem, jobManager),
 				LOG);
 
 			String protocol = "http://";


Mime
View raw message