flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-8756][Client] Support ClusterClient.getAccumulators() in RestClusterClient
Date Wed, 21 Mar 2018 14:11:56 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.5 27189d805 -> 9f2b3c332


[FLINK-8756][Client] Support ClusterClient.getAccumulators() in RestClusterClient

This closes #5573.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f2b3c33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f2b3c33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f2b3c33

Branch: refs/heads/release-1.5
Commit: 9f2b3c33272f82c275d1a5d7bca5d8c72702f8ce
Parents: 27189d8
Author: vinoyang <vinoyang@tencent.com>
Authored: Sat Feb 24 14:50:55 2018 +0800
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Mar 21 15:11:22 2018 +0100

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java  | 41 +++++++++++-
 .../program/rest/RestClusterClientTest.java     | 68 ++++++++++++++++++++
 .../handler/job/JobAccumulatorsHandler.java     | 35 +++++++---
 ...orsIncludeSerializedValueQueryParameter.java | 41 ++++++++++++
 .../rest/messages/JobAccumulatorsHeaders.java   |  6 +-
 .../rest/messages/JobAccumulatorsInfo.java      | 46 ++++++++++++-
 .../JobAccumulatorsMessageParameters.java       | 36 +++++++++++
 .../json/SerializedValueDeserializer.java       |  6 ++
 .../json/SerializedValueSerializer.java         |  6 ++
 .../rest/messages/JobAccumulatorsInfoTest.java  |  2 +-
 10 files changed, 273 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f2b3c33/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 5558461..f3f1961 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program.rest;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ProgramInvocationException;
@@ -52,6 +53,9 @@ import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
@@ -101,6 +105,7 @@ import java.net.URL;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -403,6 +408,40 @@ public class RestClusterClient<T> extends ClusterClient<T>
{
 		});
 	}
 
+	@Override
+	public Map<String, Object> getAccumulators(final JobID jobID, ClassLoader loader)
throws Exception {
+		final JobAccumulatorsHeaders accumulatorsHeaders = JobAccumulatorsHeaders.getInstance();
+		final JobAccumulatorsMessageParameters accMsgParams = accumulatorsHeaders.getUnresolvedMessageParameters();
+		accMsgParams.jobPathParameter.resolve(jobID);
+		accMsgParams.includeSerializedAccumulatorsParameter.resolve(Collections.singletonList(true));
+
+		CompletableFuture<JobAccumulatorsInfo> responseFuture = sendRequest(
+			accumulatorsHeaders,
+			accMsgParams
+		);
+
+		Map<String, Object> result = Collections.emptyMap();
+
+		try {
+			result = responseFuture.thenApply((JobAccumulatorsInfo accumulatorsInfo) -> {
+				try {
+					return AccumulatorHelper.deserializeAccumulators(
+						accumulatorsInfo.getSerializedUserAccumulators(),
+						loader);
+				} catch (Exception e) {
+					throw new CompletionException(
+						new FlinkException(
+							String.format("Deserialization of accumulators for job %s failed.", jobID),
+							e));
+				}
+			}).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+		} catch (ExecutionException ee) {
+			ExceptionUtils.rethrowException(ExceptionUtils.stripExecutionException(ee));
+		}
+
+		return result;
+	}
+
 	private CompletableFuture<SavepointInfo> pollSavepointAsync(
 			final JobID jobId,
 			final TriggerId triggerID) {
@@ -661,7 +700,7 @@ public class RestClusterClient<T> extends ClusterClient<T>
{
 				TimeUnit.MILLISECONDS)
 			.thenApplyAsync(leaderAddressSessionId -> {
 				final String address = leaderAddressSessionId.f0;
-				final Optional<String> host = ScalaUtils.toJava(AddressFromURIString.parse(address).host());
+				final Optional<String> host = ScalaUtils.<String>toJava(AddressFromURIString.parse(address).host());
 
 				return host.orElseGet(() -> {
 					// if the dispatcher address does not contain a host part, then assume it's running

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2b3c33/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index ca2ba22..e108a0b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -47,11 +47,15 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
+import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
 import org.apache.flink.runtime.rest.messages.BlobServerPortHeaders;
 import org.apache.flink.runtime.rest.messages.BlobServerPortResponseBody;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationMessageParameters;
@@ -102,8 +106,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -118,6 +124,7 @@ import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -542,6 +549,67 @@ public class RestClusterClientTest extends TestLogger {
 		}
 	}
 
+	@Test
+	public void testGetAccumulators() throws Exception {
+		TestAccumulatorHandler accumulatorHandler = new TestAccumulatorHandler();
+
+		try (TestRestServerEndpoint ignored = createRestServerEndpoint(accumulatorHandler)){
+
+			JobID id = new JobID();
+
+			{
+				Map<String, Object> accumulators = restClusterClient.getAccumulators(id);
+				assertNotNull(accumulators);
+				assertEquals(1, accumulators.size());
+
+				assertEquals(true, accumulators.containsKey("testKey"));
+				assertEquals("testValue", accumulators.get("testKey").toString());
+			}
+		}
+	}
+
+	private class TestAccumulatorHandler extends TestHandler<EmptyRequestBody, JobAccumulatorsInfo,
JobAccumulatorsMessageParameters> {
+
+		public TestAccumulatorHandler() {
+			super(JobAccumulatorsHeaders.getInstance());
+		}
+
+		@Override
+		protected CompletableFuture<JobAccumulatorsInfo> handleRequest(
+			@Nonnull HandlerRequest<EmptyRequestBody,
+				JobAccumulatorsMessageParameters> request,
+			@Nonnull DispatcherGateway gateway) throws RestHandlerException {
+			JobAccumulatorsInfo accumulatorsInfo;
+			List<Boolean> queryParams = request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
+
+			final boolean includeSerializedValue;
+			if (!queryParams.isEmpty()) {
+				includeSerializedValue = queryParams.get(0);
+			} else {
+				includeSerializedValue = false;
+			}
+
+			List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(1);
+
+			userTaskAccumulators.add(new JobAccumulatorsInfo.UserTaskAccumulator("testName", "testType",
"testValue"));
+
+			if (includeSerializedValue) {
+				Map<String, SerializedValue<Object>> serializedUserTaskAccumulators = new
HashMap<>(1);
+				try {
+					serializedUserTaskAccumulators.put("testKey", new SerializedValue<>("testValue"));
+				} catch (IOException e) {
+					throw new RuntimeException(e);
+				}
+
+				accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators,
serializedUserTaskAccumulators);
+			} else {
+				accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators,
Collections.emptyMap());
+			}
+
+			return CompletableFuture.completedFuture(accumulatorsInfo);
+		}
+	}
+
 	private class TestListJobsHandler extends TestHandler<EmptyRequestBody, MultipleJobsDetails,
EmptyMessageParameters> {
 
 		private TestListJobsHandler() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2b3c33/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
index 7dd5ff0..0fe9201 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
@@ -24,12 +24,14 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
-import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.SerializedValue;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -41,14 +43,14 @@ import java.util.concurrent.Executor;
 /**
  * Request handler that returns the aggregated accumulators of a job.
  */
-public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo,
JobMessageParameters> {
+public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo,
JobAccumulatorsMessageParameters> {
 
 	public JobAccumulatorsHandler(
 			CompletableFuture<String> localRestAddress,
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
 			Time timeout,
 			Map<String, String> responseHeaders,
-			MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobMessageParameters> messageHeaders,
+			MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo, JobAccumulatorsMessageParameters>
messageHeaders,
 			ExecutionGraphCache executionGraphCache,
 			Executor executor) {
 		super(
@@ -62,11 +64,21 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc
 	}
 
 	@Override
-	protected JobAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters>
request, AccessExecutionGraph graph) throws RestHandlerException {
-		StringifiedAccumulatorResult[] accs = graph.getAccumulatorResultsStringified();
-		List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(accs.length);
+	protected JobAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobAccumulatorsMessageParameters>
request, AccessExecutionGraph graph) throws RestHandlerException {
+		JobAccumulatorsInfo accumulatorsInfo;
+		List<Boolean> queryParams = request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
 
-		for (StringifiedAccumulatorResult acc : accs) {
+		final boolean includeSerializedValue;
+		if (!queryParams.isEmpty()) {
+			includeSerializedValue = queryParams.get(0);
+		} else {
+			includeSerializedValue = false;
+		}
+
+		StringifiedAccumulatorResult[] stringifiedAccs = graph.getAccumulatorResultsStringified();
+		List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(stringifiedAccs.length);
+
+		for (StringifiedAccumulatorResult acc : stringifiedAccs) {
 			userTaskAccumulators.add(
 				new JobAccumulatorsInfo.UserTaskAccumulator(
 					acc.getName(),
@@ -74,6 +86,13 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc
 					acc.getValue()));
 		}
 
-		return new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators);
+		if (includeSerializedValue) {
+			Map<String, SerializedValue<Object>> serializedUserTaskAccumulators = graph.getAccumulatorsSerialized();
+			accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators,
serializedUserTaskAccumulators);
+		} else {
+			accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators,
Collections.emptyMap());
+		}
+
+		return accumulatorsInfo;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2b3c33/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java
new file mode 100644
index 0000000..1f685c2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/AccumulatorsIncludeSerializedValueQueryParameter.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Query parameter for job's accumulator handler {@link org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class AccumulatorsIncludeSerializedValueQueryParameter extends MessageQueryParameter<Boolean>
{
+
+	private static final String key = "includeSerializedValue";
+
+	public AccumulatorsIncludeSerializedValueQueryParameter() {
+		super(key, MessageParameterRequisiteness.OPTIONAL);
+	}
+
+	@Override
+	public String convertValueToString(Boolean value) {
+		return String.valueOf(value);
+	}
+
+	@Override
+	public Boolean convertStringToValue(String value) {
+		return Boolean.valueOf(value);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2b3c33/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
index 00f4fd5..2e00c91 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsHeaders.java
@@ -26,7 +26,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 /**
  * Message headers for the {@link JobAccumulatorsHandler}.
  */
-public class JobAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo,
JobMessageParameters> {
+public class JobAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody, JobAccumulatorsInfo,
JobAccumulatorsMessageParameters> {
 
 	private static final JobAccumulatorsHeaders INSTANCE = new JobAccumulatorsHeaders();
 
@@ -53,8 +53,8 @@ public class JobAccumulatorsHeaders implements MessageHeaders<EmptyRequestBody,
 	}
 
 	@Override
-	public JobMessageParameters getUnresolvedMessageParameters() {
-		return new JobMessageParameters();
+	public JobAccumulatorsMessageParameters getUnresolvedMessageParameters() {
+		return new JobAccumulatorsMessageParameters();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2b3c33/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
index 367a38b..2262120 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfo.java
@@ -19,12 +19,19 @@
 package org.apache.flink.runtime.rest.messages;
 
 import org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueDeserializer;
+import org.apache.flink.runtime.rest.messages.json.SerializedValueSerializer;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -33,6 +40,7 @@ import java.util.Objects;
 public class JobAccumulatorsInfo implements ResponseBody {
 	public static final String FIELD_NAME_JOB_ACCUMULATORS = "job-accumulators";
 	public static final String FIELD_NAME_USER_TASK_ACCUMULATORS = "user-task-accumulators";
+	public static final String FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS = "serialized-user-task-accumulators";
 
 	@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS)
 	private List<JobAccumulator> jobAccumulators;
@@ -40,12 +48,33 @@ public class JobAccumulatorsInfo implements ResponseBody {
 	@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS)
 	private List<UserTaskAccumulator> userAccumulators;
 
+	@JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS)
+	@JsonSerialize(contentUsing = SerializedValueSerializer.class)
+	private Map<String, SerializedValue<Object>> serializedUserAccumulators;
+
 	@JsonCreator
 	public JobAccumulatorsInfo(
 			@JsonProperty(FIELD_NAME_JOB_ACCUMULATORS) List<JobAccumulator> jobAccumulators,
-			@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators)
{
+			@JsonProperty(FIELD_NAME_USER_TASK_ACCUMULATORS) List<UserTaskAccumulator> userAccumulators,
+			@JsonDeserialize(contentUsing = SerializedValueDeserializer.class) @JsonProperty(FIELD_NAME_SERIALIZED_USER_TASK_ACCUMULATORS)
Map<String, SerializedValue<Object>> serializedUserAccumulators) {
 		this.jobAccumulators = Preconditions.checkNotNull(jobAccumulators);
 		this.userAccumulators = Preconditions.checkNotNull(userAccumulators);
+		this.serializedUserAccumulators = Preconditions.checkNotNull(serializedUserAccumulators);
+	}
+
+	@JsonIgnore
+	public List<JobAccumulator> getJobAccumulators() {
+		return jobAccumulators;
+	}
+
+	@JsonIgnore
+	public List<UserTaskAccumulator> getUserAccumulators() {
+		return userAccumulators;
+	}
+
+	@JsonIgnore
+	public Map<String, SerializedValue<Object>> getSerializedUserAccumulators()
{
+		return serializedUserAccumulators;
 	}
 
 	@Override
@@ -104,6 +133,21 @@ public class JobAccumulatorsInfo implements ResponseBody {
 			this.value = Preconditions.checkNotNull(value);
 		}
 
+		@JsonIgnore
+		public String getName() {
+			return name;
+		}
+
+		@JsonIgnore
+		public String getType() {
+			return type;
+		}
+
+		@JsonIgnore
+		public String getValue() {
+			return value;
+		}
+
 		@Override
 		public boolean equals(Object o) {
 			if (this == o) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2b3c33/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
new file mode 100644
index 0000000..ef23560
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsMessageParameters.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Request parameter for job accumulator's handler {@link org.apache.flink.runtime.rest.handler.job.JobAccumulatorsHandler}.
+ */
+public class JobAccumulatorsMessageParameters extends JobMessageParameters {
+
+	public final AccumulatorsIncludeSerializedValueQueryParameter
+		includeSerializedAccumulatorsParameter = new AccumulatorsIncludeSerializedValueQueryParameter();
+
+	@Override
+	public Collection<MessageQueryParameter<?>> getQueryParameters() {
+		return Collections.singleton(includeSerializedAccumulatorsParameter);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2b3c33/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
index 6a2eadb..d7c321d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueDeserializer.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.rest.messages.json;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
 
 import java.io.IOException;
 
@@ -34,6 +36,10 @@ public class SerializedValueDeserializer extends StdDeserializer<SerializedValue
 
 	private static final long serialVersionUID = 1L;
 
+	public SerializedValueDeserializer() {
+		super(TypeFactory.defaultInstance().constructType(new TypeReference<SerializedValue<Object>>()
{}));
+	}
+
 	public SerializedValueDeserializer(final JavaType valueType) {
 		super(valueType);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2b3c33/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
index 0383d99..b63b1ef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/json/SerializedValueSerializer.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.rest.messages.json;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JavaType;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.SerializerProvider;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.type.TypeFactory;
 
 import java.io.IOException;
 
@@ -36,6 +38,10 @@ public class SerializedValueSerializer extends StdSerializer<SerializedValue<?>>
 
 	private static final long serialVersionUID = 1L;
 
+	public SerializedValueSerializer() {
+		super(TypeFactory.defaultInstance().constructType(new TypeReference<SerializedValue<Object>>()
{}));
+	}
+
 	public SerializedValueSerializer(final JavaType javaType) {
 		super(javaType);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f2b3c33/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
index baaa551..e0e9649 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobAccumulatorsInfoTest.java
@@ -47,6 +47,6 @@ public class JobAccumulatorsInfoTest extends RestResponseMarshallingTestBase<Job
 			"uta3.type",
 			"uta3.value"));
 
-		return new JobAccumulatorsInfo(Collections.emptyList(), userAccumulatorList);
+		return new JobAccumulatorsInfo(Collections.emptyList(), userAccumulatorList, Collections.EMPTY_MAP);
 	}
 }


Mime
View raw message