flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject git commit: [FLINK-1046] Made sure that Client uses correct user-code class loaders.
Date Thu, 14 Aug 2014 13:11:22 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/release-0.6 7c96c4050 -> d6886c427


[FLINK-1046] Made sure that Client uses correct user-code class loaders.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d6886c42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d6886c42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d6886c42

Branch: refs/heads/release-0.6
Commit: d6886c4270a394f80ba9139603311b4498d53b00
Parents: 7c96c40
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Aug 14 02:19:26 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Aug 14 15:10:19 2014 +0200

----------------------------------------------------------------------
 .../api/avro/AvroExternalJarProgramITCase.java  |   2 +-
 .../org/apache/flink/client/CliFrontend.java    |   8 +-
 .../org/apache/flink/client/RemoteExecutor.java |  24 ++--
 .../client/minicluster/NepheleMiniCluster.java  |   2 +-
 .../org/apache/flink/client/program/Client.java |  10 +-
 .../flink/client/web/JobSubmissionServlet.java  |  16 ++-
 .../flink/client/CliFrontendInfoTest.java       |   4 +-
 .../client/CliFrontendPackageProgramTest.java   |   2 +-
 .../apache/flink/client/program/ClientTest.java |   8 +-
 .../runtime/accumulators/AccumulatorEvent.java  | 144 ++++++++++++-------
 .../apache/flink/runtime/client/JobClient.java  |  20 +--
 .../flink/runtime/jobmanager/JobManager.java    |   5 +-
 .../jobmanager/web/JobmanagerInfoServlet.java   |   3 +-
 .../runtime/operators/RegularPactTask.java      |   2 +-
 .../runtime/jobmanager/JobManagerITCase.java    |  18 +--
 .../test/iterative/nephele/JobGraphUtils.java   |   2 +-
 16 files changed, 165 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
index 4a6e7f1..0ddcb20 100644
--- a/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
+++ b/flink-addons/flink-avro/src/test/java/org/apache/flink/api/avro/AvroExternalJarProgramITCase.java
@@ -58,7 +58,7 @@ public class AvroExternalJarProgramITCase {
 			
 			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData
});
 						
-			Client c = new Client(new InetSocketAddress("localhost", TEST_JM_PORT), new Configuration());
+			Client c = new Client(new InetSocketAddress("localhost", TEST_JM_PORT), new Configuration(),
program.getUserCodeClassLoader());
 			c.run(program, 4, true);
 		}
 		catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index f01883a..e516255 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -292,7 +292,7 @@ public class CliFrontend {
 				return 1;
 			}
 			
-			Client client = getClient(line);
+			Client client = getClient(line, program.getUserCodeClassLoader());
 			if (client == null) {
 				printHelpForRun();
 				return 1;
@@ -440,7 +440,7 @@ public class CliFrontend {
 			
 			// check for json plan request
 			if (plan) {
-				Client client = getClient(line);
+				Client client = getClient(line, program.getUserCodeClassLoader());
 				String jsonPlan = client.getOptimizedPlanAsJson(program, parallelism);
 				
 				if (jsonPlan != null) {
@@ -814,8 +814,8 @@ public class CliFrontend {
 		return GlobalConfiguration.getConfiguration();
 	}
 	
-	protected Client getClient(CommandLine line) throws IOException {
-		return new Client(getJobManagerAddress(line), getGlobalConfiguration());
+	protected Client getClient(CommandLine line, ClassLoader classLoader) throws IOException
{
+		return new Client(getJobManagerAddress(line), getGlobalConfiguration(), classLoader);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 9ca8534..e366be8 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -38,9 +38,9 @@ import org.apache.flink.configuration.Configuration;
 
 public class RemoteExecutor extends PlanExecutor {
 
-	private Client client;
-
-	private List<String> jarFiles;
+	private final List<String> jarFiles;
+	
+	private final InetSocketAddress address;
 	
 	
 	public RemoteExecutor(String hostname, int port) {
@@ -60,8 +60,8 @@ public class RemoteExecutor extends PlanExecutor {
 	}
 
 	public RemoteExecutor(InetSocketAddress inet, List<String> jarFiles) {
-		this.client = new Client(inet, new Configuration());
 		this.jarFiles = jarFiles;
+		this.address = inet;
 	}
 
 	
@@ -84,22 +84,30 @@ public class RemoteExecutor extends PlanExecutor {
 	@Override
 	public JobExecutionResult executePlan(Plan plan) throws Exception {
 		JobWithJars p = new JobWithJars(plan, this.jarFiles);
-		return this.client.run(p, -1, true);
+		
+		Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader());
+		return c.run(p, -1, true);
 	}
 	
 	public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
-		return this.client.run(p, -1, true);
+		Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader());
+		return c.run(p, -1, true);
 	}
 
 	public JobExecutionResult executeJar(String jarPath, String assemblerClass, String[] args)
throws Exception {
 		File jarFile = new File(jarPath);
 		PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
-		return this.client.run(program.getPlanWithJars(), -1, true);
+		
+		Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader());
+		return c.run(program.getPlanWithJars(), -1, true);
 	}
 
 	@Override
 	public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
-		OptimizedPlan op = client.getOptimizedPlan(new JobWithJars(plan, this.jarFiles), -1);
+		JobWithJars p = new JobWithJars(plan, this.jarFiles);
+		Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader());
+		
+		OptimizedPlan op = c.getOptimizedPlan(p, -1);
 		PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
 		return jsonGen.getOptimizerPlanAsJSON(op);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
index ee845ec..db2eaf4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/minicluster/NepheleMiniCluster.java
@@ -174,7 +174,7 @@ public class NepheleMiniCluster {
 		Configuration configuration = jobGraph.getJobConfiguration();
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost");
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerRpcPort);
-		return new JobClient(jobGraph, configuration);
+		return new JobClient(jobGraph, configuration, getClass().getClassLoader());
 	}
 
 	public void start() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 9c71ef1..a82b9d5 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -64,6 +64,8 @@ public class Client {
 	private final Configuration configuration;	// the configuration describing the job manager
address
 	
 	private final PactCompiler compiler;		// the compiler to compile the jobs
+	
+	private final ClassLoader userCodeClassLoader;
 
 	private boolean printStatusDuringExecution;
 	
@@ -77,12 +79,13 @@ public class Client {
 	 * 
 	 * @param jobManagerAddress Address and port of the job-manager.
 	 */
-	public Client(InetSocketAddress jobManagerAddress, Configuration config) {
+	public Client(InetSocketAddress jobManagerAddress, Configuration config, ClassLoader userCodeClassLoader)
{
 		Preconditions.checkNotNull(config, "Configuration is null");
 		this.configuration = config;
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
 		
+		this.userCodeClassLoader = userCodeClassLoader;
 		this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
 	}
 
@@ -92,7 +95,7 @@ public class Client {
 	 * 
 	 * @param config The config used to obtain the job-manager's address.
 	 */
-	public Client(Configuration config) {
+	public Client(Configuration config, ClassLoader userCodeClassLoader) {
 		Preconditions.checkNotNull(config, "Configuration is null");
 		this.configuration = config;
 		
@@ -107,6 +110,7 @@ public class Client {
 			throw new CompilerException("Cannot find port to job manager's RPC service in the global
configuration.");
 		}
 
+		this.userCodeClassLoader = userCodeClassLoader;
 		this.compiler = new PactCompiler(new DataStatistics(), new DefaultCostEstimator());
 	}
 	
@@ -290,7 +294,7 @@ public class Client {
 	public JobExecutionResult run(JobGraph jobGraph, boolean wait) throws ProgramInvocationException
{
 		JobClient client;
 		try {
-			client = new JobClient(jobGraph, configuration);
+			client = new JobClient(jobGraph, configuration, this.userCodeClassLoader);
 		}
 		catch (IOException e) {
 			throw new ProgramInvocationException("Could not open job manager: " + e.getMessage());

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
index 26fac1e..844930c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobSubmissionServlet.java
@@ -84,13 +84,13 @@ public class JobSubmissionServlet extends HttpServlet {
 
 	private final Map<Long, JobGraph> submittedJobs;	// map from UIDs to the running jobs
 
-	private final Random rand;							// random number generator for UIDs
-
-	private final Client client;						// the client used to compile and submit jobs
+	private final Random rand;							// random number generator for UID
+	
+	private final Configuration nepheleConfig;
 
 
 	public JobSubmissionServlet(Configuration nepheleConfig, File jobDir, File planDir) {
-		this.client = new Client(nepheleConfig);
+		this.nepheleConfig = nepheleConfig;
 		this.jobStoreDirectory = jobDir;
 		this.planDumpDirectory = planDir;
 
@@ -155,6 +155,7 @@ public class JobSubmissionServlet extends HttpServlet {
 			String[] options = params.isEmpty() ? new String[0] : (String[]) params.toArray(new String[params.size()]);
 			PackagedProgram program;
 			OptimizedPlan optPlan;
+			Client client;
 			
 			try {
 				if (assemblerClass == null) {
@@ -163,6 +164,8 @@ public class JobSubmissionServlet extends HttpServlet {
 					program = new PackagedProgram(jarFile, assemblerClass, options);
 				}
 				
+				client = new Client(nepheleConfig, program.getUserCodeClassLoader());
+				
 				optPlan = client.getOptimizedPlan(program, -1);
 				
 				if (optPlan == null) {
@@ -226,7 +229,7 @@ public class JobSubmissionServlet extends HttpServlet {
 				// submit the job only, if it should not be suspended
 				if (!suspend) {
 					try {
-						this.client.run(program, optPlan, false);
+						client.run(program, optPlan, false);
 					} catch (Throwable t) {
 						LOG.error("Error submitting job to the job-manager.", t);
 						showErrorPage(resp, t.getMessage());
@@ -236,7 +239,7 @@ public class JobSubmissionServlet extends HttpServlet {
 					}
 				} else {
 					try {
-						this.submittedJobs.put(uid, this.client.getJobGraph(program, optPlan));
+						this.submittedJobs.put(uid, client.getJobGraph(program, optPlan));
 					}
 					catch (ProgramInvocationException piex) {
 						LOG.error("Error creating JobGraph from optimized plan.", piex);
@@ -295,6 +298,7 @@ public class JobSubmissionServlet extends HttpServlet {
 
 			// submit the job
 			try {
+				Client client = new Client(nepheleConfig, getClass().getClassLoader());
 				client.run(job, false);
 			} catch (Exception ex) {
 				LOG.error("Error submitting job to the job-manager.", ex);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
index 1b53f98..6a218b4 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java
@@ -128,7 +128,7 @@ public class CliFrontendInfoTest {
 		}
 
 		@Override
-		protected Client getClient(CommandLine line) throws IOException {
+		protected Client getClient(CommandLine line, ClassLoader loader) throws IOException {
 			try {
 				return new TestClient(expectedDop);
 			}
@@ -143,7 +143,7 @@ public class CliFrontendInfoTest {
 		private final int expectedDop;
 		
 		private TestClient(int expectedDop) throws Exception {
-			super(new InetSocketAddress(InetAddress.getLocalHost(), 6176), new Configuration());
+			super(new InetSocketAddress(InetAddress.getLocalHost(), 6176), new Configuration(), CliFrontendInfoTest.class.getClassLoader());
 			
 			this.expectedDop = expectedDop;
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
index de763f8..51c7f2f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendPackageProgramTest.java
@@ -258,7 +258,7 @@ public class CliFrontendPackageProgramTest {
 
 			Configuration c = new Configuration();
 			c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "devil");
-			Client cli = new Client(c);
+			Client cli = new Client(c, getClass().getClassLoader());
 			
 			cli.getOptimizedPlanAsJson(prog, 666);
 		} catch(ProgramInvocationException pie) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 1780d3f..7d5a4f7 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -94,7 +94,7 @@ public class ClientTest {
 		whenNew(NepheleJobGraphGenerator.class).withNoArguments().thenReturn(generatorMock);
 		when(generatorMock.compileJobGraph(optimizedPlanMock)).thenReturn(jobGraphMock);
 
-		whenNew(JobClient.class).withArguments(any(JobGraph.class), any(Configuration.class)).thenReturn(this.jobClientMock);
+		whenNew(JobClient.class).withArguments(any(JobGraph.class), any(Configuration.class), any(ClassLoader.class)).thenReturn(this.jobClientMock);
 
 		when(this.jobClientMock.submitJob()).thenReturn(jobSubmissionResultMock);
 	}
@@ -103,7 +103,7 @@ public class ClientTest {
 	public void shouldSubmitToJobClient() throws ProgramInvocationException, IOException {
 		when(jobSubmissionResultMock.getReturnCode()).thenReturn(ReturnCode.SUCCESS);
 
-		Client out = new Client(configMock);
+		Client out = new Client(configMock, getClass().getClassLoader());
 		out.run(program.getPlanWithJars(), -1, false);
 		program.deleteExtractedLibraries();
 
@@ -116,7 +116,7 @@ public class ClientTest {
 	public void shouldThrowException() throws Exception {
 		when(jobSubmissionResultMock.getReturnCode()).thenReturn(ReturnCode.ERROR);
 
-		Client out = new Client(configMock);
+		Client out = new Client(configMock, getClass().getClassLoader());
 		out.run(program.getPlanWithJars(), -1, false);
 		program.deleteExtractedLibraries();
 
@@ -137,6 +137,6 @@ public class ClientTest {
 			}
 		}).when(packagedProgramMock).invokeInteractiveModeForExecution();
 
-		new Client(configMock).run(packagedProgramMock, 1, true);
+		new Client(configMock, getClass().getClassLoader()).run(packagedProgramMock, 1, true);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
index ed6ea29..22bcb1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorEvent.java
@@ -16,10 +16,14 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.accumulators;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -27,9 +31,10 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
+import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
 import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.InstantiationUtil;
 
 /**
  * This class encapsulates a map of accumulators for a single job. It is used
@@ -39,86 +44,123 @@ import org.apache.flink.util.StringUtils;
 public class AccumulatorEvent implements IOReadableWritable {
 
 	private JobID jobID;
+	
+	private Map<String, Accumulator<?, ?>> accumulators;
+	
+	// staging deserialized data until the classloader is available
+	private String[] accNames;
+	private String[] classNames;
+	private byte[][] serializedData;
 
-	private Map<String, Accumulator<?, ?>> accumulators = new HashMap<String,
Accumulator<?, ?>>();
-
-	private boolean useUserClassLoader = false;
 
 	// Removing this causes an EOFException in the RPC service. The RPC should
 	// be improved in this regard (error message is very unspecific).
 	public AccumulatorEvent() {
+		this.accumulators = Collections.emptyMap();
 	}
 
-	public AccumulatorEvent(JobID jobID,
-			Map<String, Accumulator<?, ?>> accumulators,
-			boolean useUserClassLoader) {
+	public AccumulatorEvent(JobID jobID, Map<String, Accumulator<?, ?>> accumulators)
{
 		this.accumulators = accumulators;
 		this.jobID = jobID;
-		this.useUserClassLoader = useUserClassLoader;
 	}
 
 	public JobID getJobID() {
 		return this.jobID;
 	}
 
-	public Map<String, Accumulator<?, ?>> getAccumulators() {
+	public Map<String, Accumulator<?, ?>> getAccumulators(ClassLoader loader) {
+		if (loader == null) {
+			throw new NullPointerException();
+		}
+		
+		if (this.accumulators == null) {
+			// deserialize
+			// we have read the binary data, but not yet turned into the objects
+			final int num = accNames.length;
+			this.accumulators = new HashMap<String, Accumulator<?,?>>(num);
+			for (int i = 0; i < num; i++) {
+				Accumulator<?, ?> acc;
+				try {
+					@SuppressWarnings("unchecked")
+					Class<? extends Accumulator<?, ?>> valClass = (Class<? extends Accumulator<?,
?>>) Class.forName(classNames[i], true, loader);
+					acc = InstantiationUtil.instantiate(valClass, Accumulator.class);
+				}
+				catch (ClassNotFoundException e) {
+					throw new RuntimeException("Could not load user-defined class '" + classNames[i] + "'.",
e);
+				}
+				catch (ClassCastException e) {
+					throw new RuntimeException("User-defined accumulator class is not an Accumulator sublass.");
+				}
+				
+				DataInputStream in = new DataInputStream(new ByteArrayInputStream(serializedData[i]));
+				try {
+					acc.read(new InputViewDataInputStreamWrapper(in));
+					in.close();
+				} catch (IOException e) {
+					throw new RuntimeException("Error while deserializing the user-defined aggregate class.",
e);
+				}
+				
+				accumulators.put(accNames[i], acc);
+			}
+			
+			// reset the serialized data
+			this.accNames = null;
+			this.classNames = null;
+			this.serializedData = null;
+		}
+		
 		return this.accumulators;
 	}
 
 	@Override
 	public void write(DataOutputView out) throws IOException {
-		out.writeBoolean(this.useUserClassLoader);
 		jobID.write(out);
 		out.writeInt(accumulators.size());
-		for (Map.Entry<String, Accumulator<?, ?>> entry : this.accumulators
-				.entrySet()) {
-			out.writeUTF(entry.getKey());
-			out.writeUTF(entry.getValue().getClass().getName());
-			entry.getValue().write(out);
+		
+		if (accumulators.size() > 0) {
+			ByteArrayOutputStream boas = new ByteArrayOutputStream();
+			DataOutputStream bufferStream = new DataOutputStream(boas);
+			
+			for (Map.Entry<String, Accumulator<?, ?>> entry : this.accumulators.entrySet())
{
+				
+				// write accumulator name
+				out.writeUTF(entry.getKey());
+				
+				// write type class
+				out.writeUTF(entry.getValue().getClass().getName());
+				
+				entry.getValue().write(new OutputViewDataOutputStreamWrapper(bufferStream));
+				bufferStream.flush();
+				byte[] bytes = boas.toByteArray();
+				out.writeInt(bytes.length);
+				out.write(bytes);
+				boas.reset();
+			}
+			bufferStream.close();
+			boas.close();
 		}
 	}
 
-	@SuppressWarnings("unchecked")
 	@Override
 	public void read(DataInputView in) throws IOException {
-		this.useUserClassLoader = in.readBoolean();
+		this.accumulators = null; // this makes sure we deserialize
+		
 		jobID = new JobID();
 		jobID.read(in);
+		
 		int numberOfMapEntries = in.readInt();
-		this.accumulators = new HashMap<String, Accumulator<?, ?>>(
-				numberOfMapEntries);
-
-		// Get user class loader. This is required at the JobManager, but not at
-		// the
-		// client.
-		ClassLoader classLoader = null;
-		if (this.useUserClassLoader) {
-			classLoader = LibraryCacheManager.getClassLoader(jobID);
-		} else {
-			classLoader = this.getClass().getClassLoader();
-		}
+		this.accNames = new String[numberOfMapEntries];
+		this.classNames = new String[numberOfMapEntries];
+		this.serializedData = new byte[numberOfMapEntries][];
 
 		for (int i = 0; i < numberOfMapEntries; i++) {
-			String key = in.readUTF();
-
-			final String valueType = in.readUTF();
-			Class<Accumulator<?, ?>> valueClass = null;
-			try {
-				valueClass = (Class<Accumulator<?, ?>>) Class.forName(
-						valueType, true, classLoader);
-			} catch (ClassNotFoundException e) {
-				throw new IOException(StringUtils.stringifyException(e));
-			}
-
-			Accumulator<?, ?> value = null;
-			try {
-				value = valueClass.newInstance();
-			} catch (Exception e) {
-				throw new IOException(StringUtils.stringifyException(e));
-			}
-			value.read(in);
-
-			this.accumulators.put(key, value);
+			this.accNames[i] = in.readUTF();
+			this.classNames[i] = in.readUTF();
+			
+			int len = in.readInt();
+			byte[] data = new byte[len];
+			this.serializedData[i] = data;
+			in.readFully(data);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index 6f77b07..e40e4bc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -79,6 +79,8 @@ public class JobClient {
 	 * The shutdown hook which is executed if the user interrupts the job the job execution.
 	 */
 	private final JobCleanUp jobCleanUp;
+	
+	private final ClassLoader userCodeClassLoader;
 
 	/**
 	 * The sequence number of the last processed event received from the job manager.
@@ -126,9 +128,8 @@ public class JobClient {
 	 * @throws IOException
 	 *         thrown on error while initializing the RPC connection to the job manager
 	 */
-	public JobClient(final JobGraph jobGraph) throws IOException {
-
-		this(jobGraph, new Configuration());
+	public JobClient(JobGraph jobGraph, ClassLoader userCodeClassLoader) throws IOException
{
+		this(jobGraph, new Configuration(), userCodeClassLoader);
 	}
 
 	/**
@@ -142,7 +143,7 @@ public class JobClient {
 	 * @throws IOException
 	 *         thrown on error while initializing the RPC connection to the job manager
 	 */
-	public JobClient(final JobGraph jobGraph, final Configuration configuration) throws IOException
{
+	public JobClient(JobGraph jobGraph, Configuration configuration, ClassLoader userCodeClassLoader)
throws IOException {
 
 		final String address = configuration.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY,
null);
 		final int port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
@@ -154,6 +155,7 @@ public class JobClient {
 		this.jobGraph = jobGraph;
 		this.configuration = configuration;
 		this.jobCleanUp = new JobCleanUp(this);
+		this.userCodeClassLoader = userCodeClassLoader;
 	}
 
 	/**
@@ -169,14 +171,14 @@ public class JobClient {
 	 * @throws IOException
 	 *         thrown on error while initializing the RPC connection to the job manager
 	 */
-	public JobClient(final JobGraph jobGraph, final Configuration configuration,
-			final InetSocketAddress jobManagerAddress)
-			throws IOException {
-
+	public JobClient(JobGraph jobGraph, Configuration configuration, InetSocketAddress jobManagerAddress,
ClassLoader userCodeClassLoader)
+			throws IOException
+	{
 		this.jobSubmitClient = RPC.getProxy(JobManagementProtocol.class, jobManagerAddress,	NetUtils.getSocketFactory());
 		this.jobGraph = jobGraph;
 		this.configuration = configuration;
 		this.jobCleanUp = new JobCleanUp(this);
+		this.userCodeClassLoader = userCodeClassLoader;
 	}
 
 	/**
@@ -343,7 +345,7 @@ public class JobClient {
 						// Request accumulators
 						Map<String, Object> accumulators = null;
 						try {
-							accumulators = AccumulatorHelper.toResultMap(getAccumulators().getAccumulators());
+							accumulators = AccumulatorHelper.toResultMap(getAccumulators().getAccumulators(this.userCodeClassLoader));
 						} catch (IOException ioe) {
 							Runtime.getRuntime().removeShutdownHook(this.jobCleanUp);
 							throw ioe;	// Rethrow error

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 0073349..be0bb48 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -1189,12 +1189,11 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 
 	@Override
 	public void reportAccumulatorResult(AccumulatorEvent accumulatorEvent) throws IOException
{
-		this.accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID(),
-				accumulatorEvent.getAccumulators());
+		this.accumulatorManager.processIncomingAccumulators(accumulatorEvent.getJobID(), accumulatorEvent.getAccumulators(LibraryCacheManager.getClassLoader(accumulatorEvent.getJobID())));
 	}
 
 	@Override
 	public AccumulatorEvent getAccumulatorResults(JobID jobID) throws IOException {
-		return new AccumulatorEvent(jobID, this.accumulatorManager.getJobAccumulators(jobID), false);
+		return new AccumulatorEvent(jobID, this.accumulatorManager.getJobAccumulators(jobID));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index 7086fb2..3aa17bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
 import org.apache.flink.runtime.event.job.JobEvent;
 import org.apache.flink.runtime.event.job.RecentJobEvent;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -291,7 +292,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 			
 			// write accumulators
 			AccumulatorEvent accumulators = jobmanager.getAccumulatorResults(jobEvent.getJobID());
-			Map<String, Object> accMap = AccumulatorHelper.toResultMap(accumulators.getAccumulators());
+			Map<String, Object> accMap = AccumulatorHelper.toResultMap(accumulators.getAccumulators(LibraryCacheManager.getClassLoader(jobEvent.getJobID())));
 			
 			wrt.write("\n\"accumulators\": [");
 			int i = 0;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 9bc1893..e8d7add 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -602,7 +602,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable
i
 		synchronized (env.getAccumulatorProtocolProxy()) {
 			try {
 				env.getAccumulatorProtocolProxy().reportAccumulatorResult(
-						new AccumulatorEvent(env.getJobID(), accumulators, true));
+						new AccumulatorEvent(env.getJobID(), accumulators));
 			} catch (IOException e) {
 				throw new RuntimeException("Communication with JobManager is broken. Could not send accumulators.",
e);
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
index 30e3d6a..cff207c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerITCase.java
@@ -218,7 +218,7 @@ public class JobManagerITCase {
 			jg.addJar(new Path(new File(ServerTestUtils.getTempDir() + File.separator + forwardClassName
+ ".jar").toURI()));
 
 			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration);
+			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
 			jobClient.submitJobAndWait();
 
 			// Finally, compare output file to initial number sequence
@@ -317,7 +317,7 @@ public class JobManagerITCase {
 				.toURI()));
 
 			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration);
+			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
 			
 			// deactivate logging of expected test exceptions
 			Logger taskLogger = Logger.getLogger(Task.class);
@@ -411,7 +411,7 @@ public class JobManagerITCase {
 				+ ".jar").toURI()));
 
 			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration);
+			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
 			
 			// deactivate logging of expected test exceptions
 			Logger jcLogger = Logger.getLogger(JobClient.class);
@@ -518,7 +518,7 @@ public class JobManagerITCase {
 					+ ".jar").toURI()));
 
 			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration);
+			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
 
 			// deactivate logging of expected test exceptions
 			// deactivate logging of expected test exceptions
@@ -637,7 +637,7 @@ public class JobManagerITCase {
 				.toURI()));
 
 			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration);
+			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
 			
 			try {
 				jobClient.submitJobAndWait();
@@ -725,7 +725,7 @@ public class JobManagerITCase {
 			jg.addJar(new Path(jarFile.toURI()));
 
 			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration);
+			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
 			jobClient.submitJobAndWait();
 
 		} catch (Exception e) {
@@ -794,7 +794,7 @@ public class JobManagerITCase {
 			jg.addJar(new Path(jarFile.toURI()));
 
 			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration);
+			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
 			jobClient.submitJobAndWait();
 		} catch (Exception e) {
 			e.printStackTrace();
@@ -887,7 +887,7 @@ public class JobManagerITCase {
 			jg.addJar(new Path(jarFile.toURI()));
 
 			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration);
+			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
 
 			try {
 				jobClient.submitJobAndWait();
@@ -1034,7 +1034,7 @@ public class JobManagerITCase {
 			jg.addJar(new Path(jarFile.toURI()));
 
 			// Create job client and launch job
-			jobClient = new JobClient(jg, configuration);
+			jobClient = new JobClient(jg, configuration, getClass().getClassLoader());
 			
 			// disable logging for the taskmanager and the client, as they will have many
 			// expected test errors they will log.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6886c42/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
index d9ecdcf..4370111 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
@@ -51,7 +51,7 @@ public class JobGraphUtils {
 	}
 
 	public static void submit(JobGraph graph, Configuration nepheleConfig) throws IOException,
JobExecutionException {
-		JobClient client = new JobClient(graph, nepheleConfig);
+		JobClient client = new JobClient(graph, nepheleConfig, JobGraphUtils.class.getClassLoader());
 		client.submitJobAndWait();
 	}
 	


Mime
View raw message