flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-1438] [jobmanager] Fix class loading issue for messages with custom input splits
Date Wed, 11 Feb 2015 21:04:46 GMT
Repository: flink
Updated Branches:
  refs/heads/release-0.8 5b420d847 -> 79da5a920


[FLINK-1438] [jobmanager] Fix class loading issue for messages with custom input splits


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/689e26f7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/689e26f7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/689e26f7

Branch: refs/heads/release-0.8
Commit: 689e26f7a6536fa6944184078388ff029874604b
Parents: 5b420d8
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Feb 9 14:01:10 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 11 20:56:42 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/RemoteExecutor.java |   2 +-
 .../runtime/jobmanager/InputSplitWrapper.java   |  71 ++++++++
 .../flink/runtime/jobmanager/JobManager.java    |  16 +-
 .../protocols/InputSplitProviderProtocol.java   |   4 +-
 .../taskmanager/TaskInputSplitProvider.java     |  20 ++-
 .../flink/runtime/taskmanager/TaskManager.java  |   2 +-
 flink-tests/pom.xml                             |  45 ++++-
 flink-tests/src/test/assembly/test-assembly.xml |  37 ----
 .../test/assembly/test-custominput-assembly.xml |  37 ++++
 .../src/test/assembly/test-kmeans-assembly.xml  |  37 ++++
 .../InputSplitClassLoaderITCase.java            |  56 ++++++
 .../jar/CustomInputSpitProgram.java             | 172 +++++++++++++++++++
 .../PackagedProgramEndToEndITCase.java          |   8 +-
 .../scala/functions/ClosureCleanerITCase.scala  |   2 +-
 14 files changed, 452 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index c90d334..6c52667 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -94,7 +94,7 @@ public class RemoteExecutor extends PlanExecutor {
 		return c.run(p, -1, true);
 	}
 
-	public JobExecutionResult executeJar(String jarPath, String assemblerClass, String[] args)
throws Exception {
+	public JobExecutionResult executeJar(String jarPath, String assemblerClass, String... args)
throws Exception {
 		File jarFile = new File(jarPath);
 		PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/InputSplitWrapper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/InputSplitWrapper.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/InputSplitWrapper.java
new file mode 100644
index 0000000..741e4ad
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/InputSplitWrapper.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.InstantiationUtil;
+
+public class InputSplitWrapper implements IOReadableWritable {
+
+	private InputSplit split;
+	
+	private byte[] splitData;
+	
+	// ------------------------------------------------------------------------
+	
+	public InputSplitWrapper() {}
+	
+	public InputSplitWrapper(InputSplit split) throws Exception {
+		this.split = split;
+		this.splitData = InstantiationUtil.serializeObject(split);
+	}
+
+	public InputSplit getSplit(ClassLoader userCodeClassLoader) throws ClassNotFoundException,
IOException {
+		if (split == null) {
+			if (splitData == null) {
+				throw new IllegalStateException("No split or split data available");
+			}
+			
+			split = (InputSplit) InstantiationUtil.deserializeObject(splitData, userCodeClassLoader);
+		}
+		
+		return split;
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	@Override
+	public void read(DataInputView in) throws IOException {
+		int len = in.readInt();
+		splitData = new byte[len];
+		in.readFully(splitData);
+		split = null;
+	}
+	
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeInt(splitData.length);
+		out.write(splitData);		
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 80607c2..4c51c09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -480,7 +480,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 	}
 	
 	@Override
-	public InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertexId, ExecutionAttemptID
executionAttempt) throws IOException {
+	public InputSplitWrapper requestNextInputSplit(JobID jobID, JobVertexID vertexId, ExecutionAttemptID
executionAttempt) throws IOException {
 
 		final ExecutionGraph graph = this.currentJobs.get(jobID);
 		if (graph == null) {
@@ -505,6 +505,7 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 		Execution execution = graph.getRegisteredExecutions().get(executionAttempt);
 		if(execution == null) {
 			LOG.error("Can not find Execution for attempt " + executionAttempt);
+			return null;
 		} else {
 			SimpleSlot slot = execution.getAssignedResource();
 			if(slot != null) {
@@ -512,7 +513,18 @@ public class JobManager implements ExtendedManagementProtocol, InputSplitProvide
 			}
 		}
 		
-		return splitAssigner.getNextInputSplit(host);
+		InputSplit split = splitAssigner.getNextInputSplit(host);
+		if (split == null) {
+			return null;
+		}
+		
+		try {
+			return new InputSplitWrapper(split);
+		}
+		catch (Throwable t) {
+			graph.fail(new Exception("Error serializing input split: " + t.getMessage(), t));
+			return null;
+		}
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
index be1846a..2d6c862 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/protocols/InputSplitProviderProtocol.java
@@ -20,16 +20,16 @@ package org.apache.flink.runtime.protocols;
 
 import java.io.IOException;
 
-import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.protocols.VersionedProtocol;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.InputSplitWrapper;
 
 /**
  * The input split provider protocol is used to facilitate RPC calls related to the lazy
split assignment.
  */
 public interface InputSplitProviderProtocol extends VersionedProtocol {
 
-	InputSplit requestNextInputSplit(JobID jobID, JobVertexID vertex, ExecutionAttemptID executionAttempt)
throws IOException;
+	InputSplitWrapper requestNextInputSplit(JobID jobID, JobVertexID vertex, ExecutionAttemptID
executionAttempt) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index d4e1b7d..813d6d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -18,13 +18,12 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import java.io.IOException;
-
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobmanager.InputSplitWrapper;
 import org.apache.flink.runtime.protocols.InputSplitProviderProtocol;
 
 public class TaskInputSplitProvider implements InputSplitProvider {
@@ -37,19 +36,30 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 	
 	private final ExecutionAttemptID executionAttempt;
 	
-	public TaskInputSplitProvider(InputSplitProviderProtocol protocol, JobID jobId, JobVertexID
vertexId, ExecutionAttemptID executionAttempt) {
+	private final ClassLoader userCodeClassLoader;
+	
+	public TaskInputSplitProvider(InputSplitProviderProtocol protocol, JobID jobId, JobVertexID
vertexId,
+			ExecutionAttemptID executionAttempt, ClassLoader userCodeClassLoader)
+	{
 		this.protocol = protocol;
 		this.jobId = jobId;
 		this.vertexId = vertexId;
 		this.executionAttempt = executionAttempt;
+		this.userCodeClassLoader = userCodeClassLoader;
 	}
 
 	@Override
 	public InputSplit getNextInputSplit() {
 		try {
-			return protocol.requestNextInputSplit(jobId, vertexId, executionAttempt);
+			InputSplitWrapper wrapper = protocol.requestNextInputSplit(jobId, vertexId, executionAttempt);
+			if (wrapper == null) {
+				return null;
+			}
+			else {
+				return wrapper.getSplit(userCodeClassLoader);
+			}
 		}
-		catch (IOException e) {
+		catch (Exception e) {
 			throw new RuntimeException("Requesting the next InputSplit failed.", e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index e6e5287..02cf710 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -607,7 +607,7 @@ public class TaskManager implements TaskOperationProtocol {
 				throw new Exception("TaskManager contains already a task with executionId " + executionId);
 			}
 			
-			final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider,
jobID, vertexId, executionId);
+			final InputSplitProvider splitProvider = new TaskInputSplitProvider(this.globalInputSplitProvider,
jobID, vertexId, executionId, userCodeClassLoader);
 			final RuntimeEnvironment env = new RuntimeEnvironment(task, tdd, userCodeClassLoader,
this.memoryManager, this.ioManager, splitProvider, this.accumulatorProtocolProxy, this.bcVarManager);
 			task.setEnvironment(env);
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index c773a21..cf7fb6b 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -297,7 +297,7 @@ under the License.
 				<version>2.4</version><!--$NO-MVN-MAN-VER$-->
 				<executions>
 					<execution>
-						<id>create-test-dependency</id>
+						<id>create-kmeans-jar</id>
 						<phase>process-test-classes</phase>
 						<goals>
 							<goal>single</goal>
@@ -308,10 +308,29 @@ under the License.
 									<mainClass>org.apache.flink.test.util.testjar.KMeansForTest</mainClass>
 								</manifest>
 							</archive>
-							<finalName>maven</finalName>
+							<finalName>kmeans</finalName>
 							<attach>false</attach>
 							<descriptors>
-								<descriptor>src/test/assembly/test-assembly.xml</descriptor>
+								<descriptor>src/test/assembly/test-kmeans-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
+					<execution>
+						<id>create-custominputsplit-jar</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.test.classloading.jar.CustomInputSpitProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>customsplit</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/test/assembly/test-custominput-assembly.xml</descriptor>
 							</descriptors>
 						</configuration>
 					</execution>
@@ -326,7 +345,7 @@ under the License.
 				<version>2.5</version><!--$NO-MVN-MAN-VER$-->
 				<executions>
 					<execution>
-						<id>remove-kmeansfortest</id>
+						<id>remove-kmeans-test-dependencies</id>
 						<phase>process-test-classes</phase>
 						<goals>
 							<goal>clean</goal>
@@ -343,6 +362,24 @@ under the License.
 							</filesets>
 						</configuration>
 					</execution>
+					<execution>
+						<id>remove-custominputformat-test-dependencies</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>clean</goal>
+						</goals>
+						<configuration>
+							<excludeDefaultDirectories>true</excludeDefaultDirectories>
+							<filesets>
+								<fileset>
+									<directory>${project.build.testOutputDirectory}</directory>
+									<includes>
+										<include>**/classloading/jar/*.class</include>
+									</includes>
+								</fileset>
+							</filesets>
+						</configuration>
+					</execution>
 				</executions>
 			</plugin>
 		</plugins>

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-assembly.xml b/flink-tests/src/test/assembly/test-assembly.xml
deleted file mode 100644
index bad0e38..0000000
--- a/flink-tests/src/test/assembly/test-assembly.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
--->
-
-<assembly>
-	<id>test-jar</id>
-	<formats>
-		<format>jar</format>
-	</formats>
-	<includeBaseDirectory>false</includeBaseDirectory>
-	<fileSets>
-		<fileSet>
-			<directory>${project.build.testOutputDirectory}</directory>
-			<outputDirectory>/</outputDirectory>
-			<!--modify/add include to match your package(s) -->
-			<includes>
-				<include>org/apache/flink/test/util/testjar/**</include>
-			</includes>
-		</fileSet>
-	</fileSets>
-</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/assembly/test-custominput-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-custominput-assembly.xml b/flink-tests/src/test/assembly/test-custominput-assembly.xml
new file mode 100644
index 0000000..9d4800b
--- /dev/null
+++ b/flink-tests/src/test/assembly/test-custominput-assembly.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/test/classloading/jar/**</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/assembly/test-kmeans-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-kmeans-assembly.xml b/flink-tests/src/test/assembly/test-kmeans-assembly.xml
new file mode 100644
index 0000000..bad0e38
--- /dev/null
+++ b/flink-tests/src/test/assembly/test-kmeans-assembly.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/test/util/testjar/**</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
new file mode 100644
index 0000000..705dbf3
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading;
+
+import java.io.File;
+
+import org.apache.flink.client.minicluster.NepheleMiniCluster;
+import org.apache.flink.client.program.PackagedProgram;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InputSplitClassLoaderITCase {
+	
+	private static final String JAR_FILE = "target/customsplit-test-jar.jar";
+	
+	@Test
+	public void testJobWithCustomInputFormat() {	
+		try {
+			NepheleMiniCluster cluster = new NepheleMiniCluster();
+			cluster.setNumTaskManager(2);
+			cluster.setTaskManagerNumSlots(2);
+			cluster.start();
+			
+			try {
+				int port = cluster.getJobManagerRpcPort();
+				
+				PackagedProgram prog = new PackagedProgram(new File(JAR_FILE),
+						new String[] { JAR_FILE, "localhost", String.valueOf(port) } );
+				prog.invokeInteractiveModeForExecution();
+			}
+			finally {
+				cluster.stop();
+			}
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			Assert.fail(t.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
new file mode 100644
index 0000000..31aaaf2
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading.jar;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+@SuppressWarnings("serial")
+public class CustomInputSpitProgram {
+	
+	public static void main(String[] args) throws Exception {
+		
+		final String jarFile = args[0];
+		final String host = args[1];
+		final int port = Integer.parseInt(args[2]);
+		
+		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+
+		DataSet<Integer> data = env.createInput(new CustomInputFormat());
+
+		data
+			.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
+				@Override
+				public Tuple2<Integer, Double> map(Integer value) {
+					return new Tuple2<Integer, Double>(value, value * 0.5);
+				}
+			})
+			.output(new DiscardingOutputFormat<Tuple2<Integer,Double>>());
+
+		env.execute();
+	}
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class CustomInputFormat implements InputFormat<Integer, CustomInputSplit>,
ResultTypeQueryable<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private Integer value;
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+			return null;
+		}
+
+		@Override
+		public CustomInputSplit[] createInputSplits(int minNumSplits) {
+			CustomInputSplit[] splits = new CustomInputSplit[minNumSplits];
+			for (int i = 0; i < minNumSplits; i++) {
+				splits[i] = new CustomInputSplit(i);
+			}
+			return splits;
+		}
+
+		@Override
+		public InputSplitAssigner getInputSplitAssigner(CustomInputSplit[] inputSplits) {
+			return new CustomSplitAssigner(inputSplits);
+		}
+
+		@Override
+		public void open(CustomInputSplit split) {
+			this.value = split.getSplitNumber();
+		}
+
+		@Override
+		public boolean reachedEnd() {
+			return this.value == null;
+		}
+
+		@Override
+		public Integer nextRecord(Integer reuse) {
+			Integer val = this.value;
+			this.value = null;
+			return val;
+		}
+
+		@Override
+		public void close() {}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return BasicTypeInfo.INT_TYPE_INFO;
+		}
+	}
+
+	public static final class CustomInputSplit implements InputSplit {
+
+		private static final long serialVersionUID = 1L;
+
+		private int splitNumber;
+
+		public CustomInputSplit() {
+			this(-1);
+		}
+
+		public CustomInputSplit(int splitNumber) {
+			this.splitNumber = splitNumber;
+		}
+
+		@Override
+		public int getSplitNumber() {
+			return this.splitNumber;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeInt(splitNumber);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			splitNumber = in.readInt();
+		}
+	}
+
+	public static final class CustomSplitAssigner implements InputSplitAssigner {
+
+		private final List<CustomInputSplit> remainingSplits;
+
+		public CustomSplitAssigner(CustomInputSplit[] splits) {
+			this.remainingSplits = new ArrayList<CustomInputSplit>(Arrays.asList(splits));
+		}
+
+		@Override
+		public InputSplit getNextInputSplit(String host) {
+			synchronized (this) {
+				int size = remainingSplits.size();
+				if (size > 0) {
+					return remainingSplits.remove(size - 1);
+				} else {
+					return null;
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
index e7e3f95..430d817 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -27,10 +27,10 @@ import org.apache.flink.test.testdata.KMeansData;
 import org.junit.Assert;
 import org.junit.Test;
 
-// When the API changes KMeansForTest needs to be rebuilt and the KMeansForTest.jar in resources
needs
-// to be replaced with the new one.
 
 public class PackagedProgramEndToEndITCase {
+	
+	private static final String JAR_PATH = "target/kmeans-test-jar.jar";
 
 	@Test
 	public void testEverything() {
@@ -56,7 +56,7 @@ public class PackagedProgramEndToEndITCase {
 			fwClusters.write(KMeansData.INITIAL_CENTERS);
 			fwClusters.close();
 
-			String jarPath = "target/maven-test-jar.jar";
+			
 
 			// run KMeans
 			cluster.setNumTaskManager(2);
@@ -65,7 +65,7 @@ public class PackagedProgramEndToEndITCase {
 
 			RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRpcPort());
 
-			ex.executeJar(jarPath,
+			ex.executeJar(JAR_PATH,
 					"org.apache.flink.test.util.testjar.KMeansForTest",
 					new String[] {
 							points.toURI().toString(),

http://git-wip-us.apache.org/repos/asf/flink/blob/689e26f7/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index a063957..2cae061 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -190,7 +190,7 @@ object TestObjectWithBogusReturns {
       nums.map { x => return 1; x * 2}.print()
     } catch {
       case inv: InvalidProgramException => // all good
-      case _ => fail("Bogus return statement not detected.")
+      case _: Throwable => fail("Bogus return statement not detected.")
     }
 
     nums.writeAsText(resultPath)


Mime
View raw message