flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7535] Port DashboardConfigHandler to new REST endpoint
Date Thu, 21 Sep 2017 07:31:15 GMT
Repository: flink
Updated Branches:
  refs/heads/master dbabdb1cc -> c6243b8b1


[FLINK-7535] Port DashboardConfigHandler to new REST endpoint

Lets DashboardConfigHandler implement the LegacyRestHandler. Moreover, this
commit defines the appropriate DashboardConfigurationHeaders.

The DispatcherRestEndpoint registers the DashboardConfigHandler.

This closes #4604.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6243b8b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6243b8b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6243b8b

Branch: refs/heads/master
Commit: c6243b8b1de6117623d3c4255f47f062d10c4602
Parents: dbabdb1
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Aug 21 15:11:08 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Sep 21 09:30:50 2017 +0200

----------------------------------------------------------------------
 .../webmonitor/history/HistoryServer.java       |   4 +-
 .../dispatcher/DispatcherRestEndpoint.java      |  35 +++--
 .../entrypoint/SessionClusterEntrypoint.java    |   9 +-
 .../webmonitor/StatusOverviewWithVersion.java   | 128 -----------------
 .../rest/handler/RestHandlerConfiguration.java  |  68 +++++++++
 .../handler/legacy/ClusterOverviewHandler.java  |   2 +-
 .../handler/legacy/DashboardConfigHandler.java  |  45 +++---
 .../legacy/messages/DashboardConfiguration.java | 137 +++++++++++++++++++
 .../messages/StatusOverviewWithVersion.java     | 130 ++++++++++++++++++
 .../rest/messages/ClusterOverviewHeaders.java   |   2 +-
 .../messages/DashboardConfigurationHeaders.java |  70 ++++++++++
 .../ZooKeeperLeaderElectionTest.java            |   1 -
 .../StatusOverviewWithVersionTest.java          |  60 --------
 .../legacy/DashboardConfigHandlerTest.java      |  22 +--
 .../messages/DashboardConfigurationTest.java    |  56 ++++++++
 .../messages/StatusOverviewWithVersionTest.java |  60 ++++++++
 16 files changed, 592 insertions(+), 237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 01228d5..f9aea22 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -28,6 +28,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
@@ -47,6 +48,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.nio.file.Files;
+import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -289,7 +291,7 @@ public class HistoryServer {
 
 	private void createDashboardConfigFile() throws IOException {
 		try (FileWriter fw = createOrGetFile(webDir, "config")) {
-			fw.write(DashboardConfigHandler.createConfigJson(webRefreshIntervalMillis));
+			fw.write(DashboardConfigHandler.createConfigJson(DashboardConfiguration.from(webRefreshIntervalMillis, ZonedDateTime.now())));
 			fw.flush();
 		} catch (IOException ioe) {
 			LOG.error("Failed to write config file.");

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 1f64c67..6054a7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -20,15 +20,19 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandlerAdapter;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
+import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
+import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
@@ -51,26 +55,25 @@ import java.util.concurrent.Executor;
 public class DispatcherRestEndpoint extends RestServerEndpoint {
 
 	private final GatewayRetriever<DispatcherGateway> leaderRetriever;
-	private final Time timeout;
-	private final File tmpDir;
+	private final RestHandlerConfiguration restConfiguration;
 	private final Executor executor;
 
 	public DispatcherRestEndpoint(
 			RestServerEndpointConfiguration configuration,
 			GatewayRetriever<DispatcherGateway> leaderRetriever,
-			Time timeout,
-			File tmpDir,
+			RestHandlerConfiguration restConfiguration,
 			Executor executor) {
 		super(configuration);
 		this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
-		this.timeout = Preconditions.checkNotNull(timeout);
-		this.tmpDir = Preconditions.checkNotNull(tmpDir);
+		this.restConfiguration = Preconditions.checkNotNull(restConfiguration);
 		this.executor = Preconditions.checkNotNull(executor);
 	}
 
 	@Override
 	protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) {
-		ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(2);
+		ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(3);
+
+		final Time timeout = restConfiguration.getTimeout();
 
 		LegacyRestHandlerAdapter<DispatcherGateway, StatusOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new LegacyRestHandlerAdapter<>(
 			restAddressFuture,
@@ -81,7 +84,16 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 				executor,
 				timeout));
 
-		handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
+		LegacyRestHandlerAdapter<DispatcherGateway, DashboardConfiguration, EmptyMessageParameters> dashboardConfigurationHandler = new LegacyRestHandlerAdapter<>(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			DashboardConfigurationHeaders.getInstance(),
+			new DashboardConfigHandler(
+				executor,
+				restConfiguration.getRefreshInterval()));
+
+		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
 
@@ -96,6 +108,9 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			optWebContent = Optional.empty();
 		}
 
+		handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
+		handlers.add(Tuple2.of(DashboardConfigurationHeaders.getInstance(), dashboardConfigurationHandler));
+
 		optWebContent.ifPresent(
 			webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)));
 
@@ -106,6 +121,8 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 	public void shutdown(Time timeout) {
 		super.shutdown(timeout);
 
+		final File tmpDir = restConfiguration.getTmpDir();
+
 		try {
 			log.info("Removing cache directory {}", tmpDir);
 			FileUtils.deleteDirectory(tmpDir);

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 9f4e04a..e394854 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.entrypoint;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
-import java.io.File;
 import java.util.Optional;
 import java.util.concurrent.Executor;
 
@@ -157,14 +156,10 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint {
 			LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
 			Executor executor) throws Exception {
 
-		Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
-		File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));
-
 		return new DispatcherRestEndpoint(
 			RestServerEndpointConfiguration.fromConfiguration(configuration),
 			dispatcherGatewayRetriever,
-			timeout,
-			tmpDir,
+			RestHandlerConfiguration.fromConfiguration(configuration),
 			executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
deleted file mode 100644
index 9029537..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersion.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.util.Preconditions;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-
-import java.util.Objects;
-
-/**
- * Status overview message including the current Flink version and commit id.
- */
-public class StatusOverviewWithVersion extends StatusOverview implements ResponseBody {
-
-	private static final long serialVersionUID = 5000058311783413216L;
-
-	public static final String FIELD_NAME_VERSION = "flink-version";
-	public static final String FIELD_NAME_COMMIT = "flink-commit";
-
-	@JsonProperty(FIELD_NAME_VERSION)
-	private final String version;
-
-	@JsonProperty(FIELD_NAME_COMMIT)
-	private final String commitId;
-
-	@JsonCreator
-	public StatusOverviewWithVersion(
-			@JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
-			@JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
-			@JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
-			@JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
-			@JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
-			@JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
-			@JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed,
-			@JsonProperty(FIELD_NAME_VERSION) String version,
-			@JsonProperty(FIELD_NAME_COMMIT) String commitId) {
-		super(
-			numTaskManagersConnected,
-			numSlotsTotal,
-			numSlotsAvailable,
-			numJobsRunningOrPending,
-			numJobsFinished,
-			numJobsCancelled,
-			numJobsFailed);
-
-		this.version = Preconditions.checkNotNull(version);
-		this.commitId = Preconditions.checkNotNull(commitId);
-	}
-
-	public StatusOverviewWithVersion(
-			int numTaskManagersConnected,
-			int numSlotsTotal,
-			int numSlotsAvailable,
-			JobsOverview jobs1,
-			JobsOverview jobs2,
-			String version,
-			String commitId) {
-		super(numTaskManagersConnected, numSlotsTotal, numSlotsAvailable, jobs1, jobs2);
-
-		this.version = Preconditions.checkNotNull(version);
-		this.commitId = Preconditions.checkNotNull(commitId);
-	}
-
-	public static StatusOverviewWithVersion fromStatusOverview(StatusOverview statusOverview, String version, String commitId) {
-		return new StatusOverviewWithVersion(
-			statusOverview.getNumTaskManagersConnected(),
-			statusOverview.getNumSlotsTotal(),
-			statusOverview.getNumSlotsAvailable(),
-			statusOverview.getNumJobsRunningOrPending(),
-			statusOverview.getNumJobsFinished(),
-			statusOverview.getNumJobsCancelled(),
-			statusOverview.getNumJobsFailed(),
-			version,
-			commitId);
-	}
-
-	public String getVersion() {
-		return version;
-	}
-
-	public String getCommitId() {
-		return commitId;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-		if (!super.equals(o)) {
-			return false;
-		}
-
-		StatusOverviewWithVersion that = (StatusOverviewWithVersion) o;
-
-		return Objects.equals(version, that.getVersion()) && Objects.equals(commitId, that.getCommitId());
-	}
-
-	@Override
-	public int hashCode() {
-		int result = super.hashCode();
-		result = 31 * result + (version != null ? version.hashCode() : 0);
-		result = 31 * result + (commitId != null ? commitId.hashCode() : 0);
-		return result;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
new file mode 100644
index 0000000..9220bd9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.util.Preconditions;
+
+import java.io.File;
+
+/**
+ * Configuration object containing values for the rest handler configuration.
+ */
+public class RestHandlerConfiguration {
+
+	private final long refreshInterval;
+
+	private final Time timeout;
+
+	private final File tmpDir;
+
+	public RestHandlerConfiguration(long refreshInterval, Time timeout, File tmpDir) {
+		Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0.");
+		this.refreshInterval = refreshInterval;
+
+		this.timeout = Preconditions.checkNotNull(timeout);
+		this.tmpDir = Preconditions.checkNotNull(tmpDir);
+	}
+
+	public long getRefreshInterval() {
+		return refreshInterval;
+	}
+
+	public Time getTimeout() {
+		return timeout;
+	}
+
+	public File getTmpDir() {
+		return tmpDir;
+	}
+
+	public static RestHandlerConfiguration fromConfiguration(Configuration configuration) {
+		final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL);
+
+		final Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+
+		final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR));
+
+		return new RestHandlerConfiguration(refreshInterval, timeout, tmpDir);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
index 9340fa2..480c9e8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -25,9 +25,9 @@ import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.util.EnvironmentInformation;

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
index e8854f4..0cef5fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
@@ -18,15 +18,20 @@
 
 package org.apache.flink.runtime.rest.handler.legacy;
 
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.LegacyRestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
+import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.time.ZonedDateTime;
 import java.util.Map;
-import java.util.TimeZone;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -35,16 +40,21 @@ import java.util.concurrent.Executor;
  * against this web server should behave. It defines for example the refresh interval,
  * and time zone of the server timestamps.
  */
-public class DashboardConfigHandler extends AbstractJsonRequestHandler {
+public class DashboardConfigHandler extends AbstractJsonRequestHandler implements LegacyRestHandler<DispatcherGateway, DashboardConfiguration, EmptyMessageParameters> {
 
-	private static final String DASHBOARD_CONFIG_REST_PATH = "/config";
+	public static final String DASHBOARD_CONFIG_REST_PATH = "/config";
 
 	private final String configString;
 
+	private final DashboardConfiguration dashboardConfiguration;
+
 	public DashboardConfigHandler(Executor executor, long refreshInterval) {
 		super(executor);
+
+		dashboardConfiguration = DashboardConfiguration.from(refreshInterval, ZonedDateTime.now());
+
 		try {
-			this.configString = createConfigJson(refreshInterval);
+			this.configString = createConfigJson(dashboardConfiguration);
 		}
 		catch (Exception e) {
 			// should never happen
@@ -58,28 +68,25 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
+	public CompletableFuture<DashboardConfiguration> handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, DispatcherGateway gateway) {
+		return CompletableFuture.completedFuture(dashboardConfiguration);
+	}
+
+	@Override
 	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
 		return CompletableFuture.completedFuture(configString);
 	}
 
-	public static String createConfigJson(long refreshInterval) throws IOException {
+	public static String createConfigJson(DashboardConfiguration dashboardConfiguration) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
-		TimeZone timeZone = TimeZone.getDefault();
-		String timeZoneName = timeZone.getDisplayName();
-		long timeZoneOffset = timeZone.getRawOffset();
-
 		gen.writeStartObject();
-		gen.writeNumberField("refresh-interval", refreshInterval);
-		gen.writeNumberField("timezone-offset", timeZoneOffset);
-		gen.writeStringField("timezone-name", timeZoneName);
-		gen.writeStringField("flink-version", EnvironmentInformation.getVersion());
-
-		EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
-		if (revision != null) {
-			gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate);
-		}
+		gen.writeNumberField(DashboardConfiguration.FIELD_NAME_REFRESH_INTERVAL, dashboardConfiguration.getRefreshInterval());
+		gen.writeNumberField(DashboardConfiguration.FIELD_NAME_TIMEZONE_OFFSET, dashboardConfiguration.getTimeZoneOffset());
+		gen.writeStringField(DashboardConfiguration.FIELD_NAME_TIMEZONE_NAME, dashboardConfiguration.getTimeZoneName());
+		gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_VERSION, dashboardConfiguration.getFlinkVersion());
+		gen.writeStringField(DashboardConfiguration.FIELD_NAME_FLINK_REVISION, dashboardConfiguration.getFlinkRevision());
 
 		gen.writeEndObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java
new file mode 100644
index 0000000..cfb3aaa
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfiguration.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.time.ZonedDateTime;
+import java.time.format.TextStyle;
+import java.util.Locale;
+import java.util.Objects;
+
+/**
+ * Response of the {@link DashboardConfigHandler} containing general configuration
+ * values such as the time zone and the refresh interval.
+ */
+public class DashboardConfiguration implements ResponseBody {
+
+	public static final String FIELD_NAME_REFRESH_INTERVAL = "refresh-interval";
+	public static final String FIELD_NAME_TIMEZONE_OFFSET = "timezone-offset";
+	public static final String FIELD_NAME_TIMEZONE_NAME = "timezone-name";
+	public static final String FIELD_NAME_FLINK_VERSION = "flink-version";
+	public static final String FIELD_NAME_FLINK_REVISION = "flink-revision";
+
+	@JsonProperty(FIELD_NAME_REFRESH_INTERVAL)
+	private final long refreshInterval;
+
+	@JsonProperty(FIELD_NAME_TIMEZONE_NAME)
+	private final String timeZoneName;
+
+	@JsonProperty(FIELD_NAME_TIMEZONE_OFFSET)
+	private final int timeZoneOffset;
+
+	@JsonProperty(FIELD_NAME_FLINK_VERSION)
+	private final String flinkVersion;
+
+	@JsonProperty(FIELD_NAME_FLINK_REVISION)
+	private final String flinkRevision;
+
+	@JsonCreator
+	public DashboardConfiguration(
+			@JsonProperty(FIELD_NAME_REFRESH_INTERVAL) long refreshInterval,
+			@JsonProperty(FIELD_NAME_TIMEZONE_NAME) String timeZoneName,
+			@JsonProperty(FIELD_NAME_TIMEZONE_OFFSET) int timeZoneOffset,
+			@JsonProperty(FIELD_NAME_FLINK_VERSION) String flinkVersion,
+			@JsonProperty(FIELD_NAME_FLINK_REVISION) String flinkRevision) {
+		this.refreshInterval = refreshInterval;
+		this.timeZoneName = Preconditions.checkNotNull(timeZoneName);
+		this.timeZoneOffset = timeZoneOffset;
+		this.flinkVersion = Preconditions.checkNotNull(flinkVersion);
+		this.flinkRevision = Preconditions.checkNotNull(flinkRevision);
+	}
+
+	public long getRefreshInterval() {
+		return refreshInterval;
+	}
+
+	public int getTimeZoneOffset() {
+		return timeZoneOffset;
+	}
+
+	public String getTimeZoneName() {
+		return timeZoneName;
+	}
+
+	public String getFlinkVersion() {
+		return flinkVersion;
+	}
+
+	public String getFlinkRevision() {
+		return flinkRevision;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		DashboardConfiguration that = (DashboardConfiguration) o;
+		return refreshInterval == that.refreshInterval &&
+			timeZoneOffset == that.timeZoneOffset &&
+			Objects.equals(timeZoneName, that.timeZoneName) &&
+			Objects.equals(flinkVersion, that.flinkVersion) &&
+			Objects.equals(flinkRevision, that.flinkRevision);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(refreshInterval, timeZoneName, timeZoneOffset, flinkVersion, flinkRevision);
+	}
+
+	public static DashboardConfiguration from(long refreshInterval, ZonedDateTime zonedDateTime) {
+
+		final String flinkVersion = EnvironmentInformation.getVersion();
+
+		final EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
+		final String flinkRevision;
+
+		if (revision != null) {
+			flinkRevision = revision.commitId + " @ " + revision.commitDate;
+		} else {
+			flinkRevision = "unknown revision";
+		}
+
+		return new DashboardConfiguration(
+			refreshInterval,
+			zonedDateTime.getZone().getDisplayName(TextStyle.FULL, Locale.getDefault()),
+			// convert zone date time into offset in order to not do the day light saving adaptions wrt the offset
+			zonedDateTime.toOffsetDateTime().getOffset().getTotalSeconds() * 1000,
+			flinkVersion,
+			flinkRevision);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java
new file mode 100644
index 0000000..f001afc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersion.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.messages.webmonitor.JobsOverview;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Status overview message including the current Flink version and commit id.
+ */
+public class StatusOverviewWithVersion extends StatusOverview implements ResponseBody {
+
+	private static final long serialVersionUID = 5000058311783413216L;
+
+	public static final String FIELD_NAME_VERSION = "flink-version";
+	public static final String FIELD_NAME_COMMIT = "flink-commit";
+
+	@JsonProperty(FIELD_NAME_VERSION)
+	private final String version;
+
+	@JsonProperty(FIELD_NAME_COMMIT)
+	private final String commitId;
+
+	@JsonCreator
+	public StatusOverviewWithVersion(
+			@JsonProperty(FIELD_NAME_TASKMANAGERS) int numTaskManagersConnected,
+			@JsonProperty(FIELD_NAME_SLOTS_TOTAL) int numSlotsTotal,
+			@JsonProperty(FIELD_NAME_SLOTS_AVAILABLE) int numSlotsAvailable,
+			@JsonProperty(FIELD_NAME_JOBS_RUNNING) int numJobsRunningOrPending,
+			@JsonProperty(FIELD_NAME_JOBS_FINISHED) int numJobsFinished,
+			@JsonProperty(FIELD_NAME_JOBS_CANCELLED) int numJobsCancelled,
+			@JsonProperty(FIELD_NAME_JOBS_FAILED) int numJobsFailed,
+			@JsonProperty(FIELD_NAME_VERSION) String version,
+			@JsonProperty(FIELD_NAME_COMMIT) String commitId) {
+		super(
+			numTaskManagersConnected,
+			numSlotsTotal,
+			numSlotsAvailable,
+			numJobsRunningOrPending,
+			numJobsFinished,
+			numJobsCancelled,
+			numJobsFailed);
+
+		this.version = Preconditions.checkNotNull(version);
+		this.commitId = Preconditions.checkNotNull(commitId);
+	}
+
+	public StatusOverviewWithVersion(
+			int numTaskManagersConnected,
+			int numSlotsTotal,
+			int numSlotsAvailable,
+			JobsOverview jobs1,
+			JobsOverview jobs2,
+			String version,
+			String commitId) {
+		super(numTaskManagersConnected, numSlotsTotal, numSlotsAvailable, jobs1, jobs2);
+
+		this.version = Preconditions.checkNotNull(version);
+		this.commitId = Preconditions.checkNotNull(commitId);
+	}
+
+	public static StatusOverviewWithVersion fromStatusOverview(StatusOverview statusOverview, String version, String commitId) {
+		return new StatusOverviewWithVersion(
+			statusOverview.getNumTaskManagersConnected(),
+			statusOverview.getNumSlotsTotal(),
+			statusOverview.getNumSlotsAvailable(),
+			statusOverview.getNumJobsRunningOrPending(),
+			statusOverview.getNumJobsFinished(),
+			statusOverview.getNumJobsCancelled(),
+			statusOverview.getNumJobsFailed(),
+			version,
+			commitId);
+	}
+
+	public String getVersion() {
+		return version;
+	}
+
+	public String getCommitId() {
+		return commitId;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+
+		StatusOverviewWithVersion that = (StatusOverviewWithVersion) o;
+
+		return Objects.equals(version, that.getVersion()) && Objects.equals(commitId, that.getCommitId());
+	}
+
+	@Override
+	public int hashCode() {
+		int result = super.hashCode();
+		result = 31 * result + (version != null ? version.hashCode() : 0);
+		result = 31 * result + (commitId != null ? commitId.hashCode() : 0);
+		return result;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
index f0f98ec..887ce2e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/ClusterOverviewHeaders.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.runtime.rest.messages;
 
-import org.apache.flink.runtime.messages.webmonitor.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
+import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
 
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java
new file mode 100644
index 0000000..cc03b7b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationHeaders.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link DashboardConfigHandler}.
+ */
+public final class DashboardConfigurationHeaders implements MessageHeaders<EmptyRequestBody, DashboardConfiguration, EmptyMessageParameters> {
+
+	private static final DashboardConfigurationHeaders INSTANCE = new DashboardConfigurationHeaders();
+
+	// make the constructor private since we want it to be a singleton
+	private DashboardConfigurationHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return DashboardConfigHandler.DASHBOARD_CONFIG_REST_PATH;
+	}
+
+	@Override
+	public Class<DashboardConfiguration> getResponseClass() {
+		return DashboardConfiguration.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public EmptyMessageParameters getUnresolvedMessageParameters() {
+		return EmptyMessageParameters.getInstance();
+	}
+
+	public static DashboardConfigurationHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
index 73cf063..e815a74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionTest.java
@@ -24,7 +24,6 @@ import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.framework.recipes.cache.NodeCacheListener;
 import org.apache.curator.test.TestingServer;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
deleted file mode 100644
index d69049e..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/webmonitor/StatusOverviewWithVersionTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages.webmonitor;
-
-import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.TestLogger;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests for the {@link StatusOverviewWithVersion}.
- */
-public class StatusOverviewWithVersionTest extends TestLogger {
-
-	/**
-	 * Tests that we can marshal and unmarshal StatusOverviewWithVersion.
-	 */
-	@Test
-	public void testJsonMarshalling() throws JsonProcessingException {
-		final StatusOverviewWithVersion expected = new StatusOverviewWithVersion(
-			1,
-			3,
-			3,
-			7,
-			4,
-			2,
-			0,
-			"version",
-			"commit");
-
-		ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
-
-		JsonNode json = objectMapper.valueToTree(expected);
-
-		final StatusOverviewWithVersion unmarshalled = objectMapper.treeToValue(json, StatusOverviewWithVersion.class);
-
-		assertEquals(expected, unmarshalled);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
index 06a99fe..73d9157 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
@@ -19,19 +19,20 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
-import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.TestLogger;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.TimeZone;
+import java.time.ZonedDateTime;
 
 /**
  * Tests for the DashboardConfigHandler.
  */
-public class DashboardConfigHandlerTest {
+public class DashboardConfigHandlerTest extends TestLogger {
 	@Test
 	public void testGetPaths() {
 		DashboardConfigHandler handler = new DashboardConfigHandler(Executors.directExecutor(), 10000L);
@@ -43,17 +44,18 @@ public class DashboardConfigHandlerTest {
 	@Test
 	public void testJsonGeneration() throws Exception {
 		long refreshInterval = 12345;
-		TimeZone timeZone = TimeZone.getDefault();
-		EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
+		final ZonedDateTime zonedDateTime = ZonedDateTime.now();
 
-		String json = DashboardConfigHandler.createConfigJson(refreshInterval);
+		final DashboardConfiguration dashboardConfiguration = DashboardConfiguration.from(refreshInterval, zonedDateTime);
+
+		String json = DashboardConfigHandler.createConfigJson(dashboardConfiguration);
 
 		JsonNode result = ArchivedJobGenerationUtils.MAPPER.readTree(json);
 
 		Assert.assertEquals(refreshInterval, result.get("refresh-interval").asLong());
-		Assert.assertEquals(timeZone.getDisplayName(), result.get("timezone-name").asText());
-		Assert.assertEquals(timeZone.getRawOffset(), result.get("timezone-offset").asLong());
-		Assert.assertEquals(EnvironmentInformation.getVersion(), result.get("flink-version").asText());
-		Assert.assertEquals(revision.commitId + " @ " + revision.commitDate, result.get("flink-revision").asText());
+		Assert.assertEquals(dashboardConfiguration.getTimeZoneName(), result.get("timezone-name").asText());
+		Assert.assertEquals(dashboardConfiguration.getTimeZoneOffset(), result.get("timezone-offset").asInt());
+		Assert.assertEquals(dashboardConfiguration.getFlinkVersion(), result.get("flink-version").asText());
+		Assert.assertEquals(dashboardConfiguration.getFlinkRevision(), result.get("flink-revision").asText());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java
new file mode 100644
index 0000000..9a9046b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link DashboardConfiguration}.
+ */
+public class DashboardConfigurationTest extends TestLogger {
+
+	/**
+	 * Tests that we can marshal and unmarshal {@link DashboardConfiguration} objects.
+	 */
+	@Test
+	public void testJsonMarshalling() throws JsonProcessingException {
+		final DashboardConfiguration expected = new DashboardConfiguration(
+			1L,
+			"foobar",
+			42,
+			"version",
+			"revision");
+
+		final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+		JsonNode marshaled = objectMapper.valueToTree(expected);
+
+		final DashboardConfiguration unmarshaled = objectMapper.treeToValue(marshaled, DashboardConfiguration.class);
+
+		assertEquals(expected, unmarshaled);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c6243b8b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java
new file mode 100644
index 0000000..a1bbc9a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link StatusOverviewWithVersion}.
+ */
+public class StatusOverviewWithVersionTest extends TestLogger {
+
+	/**
+	 * Tests that we can marshal and unmarshal StatusOverviewWithVersion.
+	 */
+	@Test
+	public void testJsonMarshalling() throws JsonProcessingException {
+		final StatusOverviewWithVersion expected = new StatusOverviewWithVersion(
+			1,
+			3,
+			3,
+			7,
+			4,
+			2,
+			0,
+			"version",
+			"commit");
+
+		ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+
+		JsonNode json = objectMapper.valueToTree(expected);
+
+		final StatusOverviewWithVersion unmarshalled = objectMapper.treeToValue(json, StatusOverviewWithVersion.class);
+
+		assertEquals(expected, unmarshalled);
+	}
+}


Mime
View raw message