flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/5] flink git commit: [FLINK-2594][client] implement a method to retrieve the accumulators of a job
Date Mon, 31 Aug 2015 12:13:54 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.10.0-milestone-1 8d5656732 -> 13a95961f


[FLINK-2594][client] implement a method to retrieve the accumulators of a job

- move SerializedValue from runtime to core
- unified code to deserialize accumulators

This closes #1072.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fd8690b2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fd8690b2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fd8690b2

Branch: refs/heads/release-0.10.0-milestone-1
Commit: fd8690b26f9781d154b730a5b5705f5e1e54a236
Parents: 8d56567
Author: Maximilian Michels <mxm@apache.org>
Authored: Fri Aug 28 17:01:41 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 31 12:59:43 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/program/Client.java | 75 +++++++++++++++++
 .../common/accumulators/AccumulatorHelper.java  | 36 +++++++++
 .../org/apache/flink/util/SerializedValue.java  | 82 +++++++++++++++++++
 .../accumulators/AccumulatorSnapshot.java       |  2 +-
 .../runtime/checkpoint/PendingCheckpoint.java   |  2 +-
 .../flink/runtime/checkpoint/StateForTask.java  |  2 +-
 .../client/SerializedJobExecutionResult.java    | 18 +----
 .../deployment/TaskDeploymentDescriptor.java    |  2 +-
 .../flink/runtime/executiongraph/Execution.java |  2 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  2 +-
 .../runtime/executiongraph/ExecutionVertex.java |  2 +-
 .../checkpoint/AcknowledgeCheckpoint.java       |  2 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |  2 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  2 +-
 .../flink/runtime/util/SerializedValue.java     | 84 --------------------
 .../flink/runtime/jobmanager/JobManager.scala   |  8 +-
 .../accumulators/AccumulatorMessages.scala      |  2 +-
 .../checkpoint/CheckpointStateRestoreTest.java  |  2 +-
 .../SerializedJobExecutionResultTest.java       |  4 +-
 .../messages/CheckpointMessagesTest.java        |  2 +-
 .../flink/runtime/util/SerializedValueTest.java |  1 +
 .../flink/tachyon/FileStateHandleTest.java      |  2 +-
 22 files changed, 218 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 78c82f6..06156fa 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -25,11 +25,14 @@ import java.io.PrintStream;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.api.common.JobExecutionResult;
@@ -55,6 +58,10 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.JobManager;
 import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
+import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
+import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults;
+import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -492,6 +499,74 @@ public class Client {
 	}
 
 
+	/**
+	 * Requests and returns the accumulators for the given job identifier. Accumulators can
be
+	 * requested while a is running or after it has finished. The default class loader is used
+	 * to deserialize the incoming accumulator results.
+	 * @param jobID The job identifier of a job.
+	 * @return A Map containing the accumulator's name and its value.
+	 */
+	public Map<String, Object> getAccumulators(JobID jobID) throws Exception {
+		return getAccumulators(jobID, ClassLoader.getSystemClassLoader());
+	}
+
+	/**
+	 * Requests and returns the accumulators for the given job identifier. Accumulators can
be
+	 * requested while a is running or after it has finished.
+	 * @param jobID The job identifier of a job.
+	 * @param loader The class loader for deserializing the accumulator results.
+	 * @return A Map containing the accumulator's name and its value.
+	 */
+	public Map<String, Object> getAccumulators(JobID jobID, ClassLoader loader) throws
Exception {
+
+		final FiniteDuration timeout = AkkaUtils.getTimeout(configuration);
+
+		ActorSystem actorSystem;
+		try {
+			actorSystem = JobClient.startJobClientActorSystem(configuration);
+		} catch (Exception e) {
+			throw new Exception("Could start client actor system.", e);
+		}
+
+		ActorRef jobManager;
+		try {
+			jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, timeout);
+		} catch (Exception e) {
+			throw new Exception("Error getting the remote actor reference for the job manager.", e);
+		}
+
+		Future<Object> response;
+		try {
+			ActorGateway jobManagerGateway = JobManager.getJobManagerGateway(jobManager, timeout);
+			response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout);
+		} catch (Exception e) {
+			throw new Exception("Failed to query the job manager gateway for accumulators.", e);
+		}
+
+		try {
+
+			Object result = Await.result(response, timeout);
+
+			if (result instanceof AccumulatorResultsFound) {
+				Map<String, SerializedValue<Object>> serializedAccumulators =
+						((AccumulatorResultsFound) result).result();
+
+				return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader);
+
+			} else if (result instanceof AccumulatorResultsErroneous) {
+				throw ((AccumulatorResultsErroneous) result).cause();
+			} else {
+				LOG.warn("Failed to fetch accumulators for job {}.", jobID);
+			}
+
+		} catch (Exception e) {
+			LOG.error("Error occurred while fetching accumulators for {}", jobID, e);
+		}
+
+		return Collections.emptyMap();
+	}
+
+
 	// --------------------------------------------------------------------------------------------
 	
 	public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 4fa173c..bb48bdd 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -18,7 +18,11 @@
 
 package org.apache.flink.api.common.accumulators;
 
+import org.apache.flink.util.SerializedValue;
+
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -134,4 +138,36 @@ public class AccumulatorHelper {
 		return result;
 	}
 
+	/**
+	 * Takes the serialized accumulator results and tries to deserialize them using the provided
+	 * class loader.
+	 * @param serializedAccumulators The serialized accumulator results.
+	 * @param loader The class loader to use.
+	 * @return The deserialized accumulator results.
+	 * @throws IOException
+	 * @throws ClassNotFoundException
+	 */
+	public static Map<String, Object> deserializeAccumulators(
+			Map<String, SerializedValue<Object>> serializedAccumulators, ClassLoader loader)
+			throws IOException, ClassNotFoundException {
+
+		if (serializedAccumulators == null || serializedAccumulators.isEmpty()) {
+			return Collections.emptyMap();
+		}
+
+		Map<String, Object> accumulators = new HashMap<>(serializedAccumulators.size());
+
+		for (Map.Entry<String, SerializedValue<Object>> entry : serializedAccumulators.entrySet())
{
+
+			Object value = null;
+			if (entry.getValue() != null) {
+				value = entry.getValue().deserializeValue(loader);
+			}
+
+			accumulators.put(entry.getKey(), value);
+		}
+
+		return accumulators;
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
new file mode 100644
index 0000000..5731fc1
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * This class is used to transfer (via serialization) objects whose classes are not available
+ * in the system class loader. When those objects are deserialized without access to their
+ * special class loader, the deserialization fails with a {@code ClassNotFoundException}.
+ *
+ * To work around that issue, the SerializedValue serialized data immediately into a byte
array.
+ * When send through RPC or another service that uses serialization, only the byte array
is
+ * transferred. The object is deserialized later (upon access) and requires the accessor
to
+ * provide the corresponding class loader.
+ *
+ * @param <T> The type of the value held.
+ */
+public class SerializedValue<T> implements java.io.Serializable {
+
+	private static final long serialVersionUID = -3564011643393683761L;
+
+	/** The serialized data */
+	private final byte[] serializedData;
+
+
+	public SerializedValue(T value) throws IOException {
+		this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
+	}
+
+
+	@SuppressWarnings("unchecked")
+	public T deserializeValue(ClassLoader loader) throws IOException, ClassNotFoundException
{
+		if (loader == null) {
+			throw new NullPointerException();
+		}
+
+		return serializedData == null ? null : (T) InstantiationUtil.deserializeObject(serializedData,
loader);
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+
+	@Override
+	public int hashCode() {
+		return serializedData == null ? 0 : Arrays.hashCode(serializedData);
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof SerializedValue) {
+			SerializedValue<?> other = (SerializedValue<?>) obj;
+			return this.serializedData == null ? other.serializedData == null :
+					(other.serializedData != null && Arrays.equals(this.serializedData, other.serializedData));
+		}
+		else {
+			return false;
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "SerializedValue";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
index 0f1911d..b813153 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.accumulators;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
 import java.io.Serializable;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 9ea3b6f..370ae50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 
 /**
  * A pending checkpoint is a checkpoint that has been started, but has not been

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
index 73deeed..120c503 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
index 029bc38..ec2312f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java
@@ -20,11 +20,10 @@ package org.apache.flink.runtime.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -80,17 +79,8 @@ public class SerializedJobExecutionResult implements java.io.Serializable
{
 	}
 
 	public JobExecutionResult toJobExecutionResult(ClassLoader loader) throws IOException, ClassNotFoundException
{
-		Map<String, Object> accumulators = null;
-		if (accumulatorResults != null) {
-			accumulators = accumulatorResults.isEmpty() ?
-									Collections.<String, Object>emptyMap() :
-									new HashMap<String, Object>(this.accumulatorResults.size());
-
-			for (Map.Entry<String, SerializedValue<Object>> entry : this.accumulatorResults.entrySet())
{
-				Object o = entry.getValue() == null ? null : entry.getValue().deserializeValue(loader);
-				accumulators.put(entry.getKey(), o);
-			}
-		}
+		Map<String, Object> accumulators =
+				AccumulatorHelper.deserializeAccumulators(accumulatorResults, loader);
 
 		return new JobExecutionResult(jobId, netRuntime, accumulators);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 0a1268d..c4065d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 
 import java.io.Serializable;
 import java.util.Collection;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 64c4d47..c7191fa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,7 +46,7 @@ import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 import scala.concurrent.ExecutionContext;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index b52a4e8..169971d 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.messages.ExecutionGraphMessages;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.apache.flink.util.InstantiationUtil;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index dcf64c0..78e9804 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableExceptio
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 
 import scala.concurrent.duration.FiniteDuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index db12e0a..381b8d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.messages.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 
 /**
  * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager}
to the

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 8cfc1c3..07cea33 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 
 import java.util.Map;
 import java.util.concurrent.Future;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 1b43139..c7abce0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -55,7 +55,7 @@ import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateUtils;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.FiniteDuration;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
deleted file mode 100644
index 6a5468a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.util;
-
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.util.Arrays;
-
-/**
- * This class is used to transfer (via serialization) objects whose classes are not available
- * in the system class loader. When those objects are deserialized without access to their
- * special class loader, the deserialization fails with a {@code ClassNotFoundException}.
- *
- * To work around that issue, the SerializedValue serialized data immediately into a byte
array.
- * When send through RPC or another service that uses serialization, only the byte array
is
- * transferred. The object is deserialized later (upon access) and requires the accessor
to
- * provide the corresponding class loader.
- *
- * @param <T> The type of the value held.
- */
-public class SerializedValue<T> implements java.io.Serializable {
-
-	private static final long serialVersionUID = -3564011643393683761L;
-
-	/** The serialized data */
-	private final byte[] serializedData;
-
-
-	public SerializedValue(T value) throws IOException {
-		this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
-	}
-
-
-	@SuppressWarnings("unchecked")
-	public T deserializeValue(ClassLoader loader) throws IOException, ClassNotFoundException
{
-		if (loader == null) {
-			throw new NullPointerException();
-		}
-
-		return serializedData == null ? null : (T) InstantiationUtil.deserializeObject(serializedData,
loader);
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-
-	@Override
-	public int hashCode() {
-		return serializedData == null ? 0 : Arrays.hashCode(serializedData);
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof SerializedValue) {
-			SerializedValue<?> other = (SerializedValue<?>) obj;
-			return this.serializedData == null ? other.serializedData == null :
-					(other.serializedData != null && Arrays.equals(this.serializedData, other.serializedData));
-		}
-		else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return "SerializedValue";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index d001d5a..839fdb4 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -48,7 +48,7 @@ import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util.ZooKeeperUtil
-import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation}
+import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.runtime.webmonitor.WebMonitor
 import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessages}
 import org.apache.flink.runtime.{LogMessages}
@@ -60,7 +60,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat}
-import org.apache.flink.util.{ExceptionUtils, InstantiationUtil}
+import org.apache.flink.util.{SerializedValue, ExceptionUtils, InstantiationUtil}
 
 import _root_.akka.pattern.ask
 import scala.concurrent._
@@ -769,7 +769,7 @@ class JobManager(
             currentJobs.get(jobID) match {
               case Some((graph, jobInfo)) =>
                 val accumulatorValues = graph.getAccumulatorsSerialized()
-          sender() ! decorateMessage(AccumulatorResultsFound(jobID, accumulatorValues))
+                sender() ! decorateMessage(AccumulatorResultsFound(jobID, accumulatorValues))
               case None =>
                 archive.forward(message)
             }
@@ -1271,7 +1271,7 @@ object JobManager {
 
   /**
    * Create the job manager components as (instanceManager, scheduler, libraryCacheManager,
-   *              archiverProps, accumulatorManager, defaultExecutionRetries,
+   *              archiverProps, defaultExecutionRetries,
    *              delayBetweenRetries, timeout)
    *
    * @param configuration The configuration from which to parse the config values.

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
index 015c96e..107ba82 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.messages.accumulators
 
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult
-import org.apache.flink.runtime.util.SerializedValue
+import org.apache.flink.util.SerializedValue
 
 /**
  * Base trait of all accumulator messages

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 902eb4b..08cb0a3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 import org.mockito.Mockito;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
index 5c9ffa7..a22ed13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.client;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import java.util.HashMap;
@@ -93,7 +93,7 @@ public class SerializedJobExecutionResultTest {
 
 			JobExecutionResult jResult = result.toJobExecutionResult(getClass().getClassLoader());
 			assertNull(jResult.getJobID());
-			assertNull(jResult.getAllAccumulatorResults());
+			assertTrue(jResult.getAllAccumulatorResults().isEmpty());
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 211d5e3..597249a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java
index 0154334..0d19613 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.util;
 
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd8690b2/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
index 2873c78..ec414c0 100644
--- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
@@ -30,7 +30,7 @@ import java.io.Serializable;
 import org.apache.flink.runtime.state.FileStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.flink.util.SerializedValue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;


Mime
View raw message