flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-1438] [jobmanager] Fix class loading issue for messages with custom input splits
Date Fri, 06 Feb 2015 17:47:53 GMT
Repository: flink
Updated Branches:
  refs/heads/master f6c85bc59 -> a07d59d72


[FLINK-1438] [jobmanager] Fix class loading issue for messages with custom input splits


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a07d59d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a07d59d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a07d59d7

Branch: refs/heads/master
Commit: a07d59d72fc059a600a3eb1f479b02964ca256f5
Parents: f6c85bc
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Feb 6 16:24:01 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Feb 6 18:28:37 2015 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/RemoteExecutor.java |   2 +-
 .../taskmanager/TaskInputSplitProvider.java     |  17 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   8 +-
 .../runtime/messages/TaskManagerMessages.scala  |   4 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   2 +-
 flink-tests/pom.xml                             |  45 ++++-
 flink-tests/src/test/assembly/test-assembly.xml |  37 ----
 .../test/assembly/test-custominput-assembly.xml |  37 ++++
 .../src/test/assembly/test-kmeans-assembly.xml  |  37 ++++
 .../InputSplitClassLoaderITCase.java            |  59 +++++++
 .../jar/CustomInputSpitProgram.java             | 172 +++++++++++++++++++
 .../PackagedProgramEndToEndITCase.java          |   8 +-
 .../flink/test/operators/ObjectReuseITCase.java |   1 +
 .../scala/functions/ClosureCleanerITCase.scala  |   2 +-
 14 files changed, 374 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
index 1eb6be1..af45a11 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java
@@ -90,7 +90,7 @@ public class RemoteExecutor extends PlanExecutor {
 		return c.run(p, -1, true);
 	}
 
-	public JobExecutionResult executeJar(String jarPath, String assemblerClass, String[] args)
throws Exception {
+	public JobExecutionResult executeJar(String jarPath, String assemblerClass, String... args)
throws Exception {
 		File jarFile = new File(jarPath);
 		PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
index 3303c72..fdd2636 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskInputSplitProvider.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.taskmanager;
 
 import akka.actor.ActorRef;
+
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -27,6 +28,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
+import org.apache.flink.util.InstantiationUtil;
+
 import scala.concurrent.duration.FiniteDuration;
 
 public class TaskInputSplitProvider implements InputSplitProvider {
@@ -39,14 +42,19 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 
 	private final ExecutionAttemptID executionID;
 
+	private final ClassLoader usercodeClassLoader;
+	
 	private final FiniteDuration timeout;
 	
 	public TaskInputSplitProvider(ActorRef jobManager, JobID jobId, JobVertexID vertexId,
-								ExecutionAttemptID executionID, FiniteDuration timeout) {
+								ExecutionAttemptID executionID, ClassLoader userCodeClassLoader,
+								FiniteDuration timeout)
+	{
 		this.jobManager = jobManager;
 		this.jobId = jobId;
 		this.vertexId = vertexId;
 		this.executionID = executionID;
+		this.usercodeClassLoader = userCodeClassLoader;
 		this.timeout = timeout;
 	}
 
@@ -54,10 +62,11 @@ public class TaskInputSplitProvider implements InputSplitProvider {
 	public InputSplit getNextInputSplit() {
 		try {
 			TaskManagerMessages.NextInputSplit nextInputSplit = AkkaUtils.ask(jobManager,
-					new JobManagerMessages.RequestNextInputSplit(jobId, vertexId, executionID),
-					timeout);
+					new JobManagerMessages.RequestNextInputSplit(jobId, vertexId, executionID), timeout);
 
-			return nextInputSplit.inputSplit();
+			byte[] serializedData = nextInputSplit.splitData();
+			Object deserialized = InstantiationUtil.deserializeObject(serializedData, usercodeClassLoader);
+			return (InputSplit) deserialized;
 		}
 		catch (Exception e) {
 			throw new RuntimeException("Requesting the next InputSplit failed.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 4ee16da..756cb4b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.jobmanager
 
 import java.io.{IOException, File}
 import java.net.InetSocketAddress
-
 import akka.actor._
 import akka.pattern.ask
 import org.apache.flink.configuration.{ConfigConstants, GlobalConfiguration, Configuration}
@@ -43,10 +42,10 @@ import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{NextInputSplit, Heartbeat}
 import org.apache.flink.runtime.profiling.ProfilingUtils
 import org.slf4j.LoggerFactory
-
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.language.postfixOps
+import org.apache.flink.util.InstantiationUtil
 
 /**
  * The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering
the
@@ -242,7 +241,10 @@ Actor with ActorLogMessages with ActorLogging {
       if(log.isDebugEnabled) {
         log.debug("Send next input split {}.", nextInputSplit)
       }
-      sender ! NextInputSplit(nextInputSplit)
+      
+      val serializedData = InstantiationUtil.serializeObject(nextInputSplit)
+      
+      sender ! NextInputSplit(serializedData)
 
     case JobStatusChanged(jobID, newJobStatus, timeStamp, optionalMessage) =>
       currentJobs.get(jobID) match {

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
index 968dc46..6e3a3b1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskManagerMessages.scala
@@ -46,9 +46,9 @@ object TaskManagerMessages {
    * Contains the next input split for a task. This message is a response to
    * [[org.apache.flink.runtime.messages.JobManagerMessages.RequestNextInputSplit]].
    *
-   * @param inputSplit
+   * @param splitData
    */
-  case class NextInputSplit(inputSplit: InputSplit)
+  case class NextInputSplit(splitData: Array[Byte])
 
   /**
    * Unregisters the task identified by [[executionID]] from the task manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 91eac35..de3fa7a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -372,7 +372,7 @@ import scala.collection.JavaConverters._
       }
 
       val splitProvider = new TaskInputSplitProvider(currentJobManager, jobID, vertexID,
-        executionID, timeout)
+        executionID, userCodeClassLoader, timeout)
 
       val env = new RuntimeEnvironment(currentJobManager, task, tdd, userCodeClassLoader,
         memoryManager, ioManager, splitProvider, bcVarManager, networkEnvironment.get)

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-tests/pom.xml b/flink-tests/pom.xml
index 6e10510..bcb9764 100644
--- a/flink-tests/pom.xml
+++ b/flink-tests/pom.xml
@@ -303,7 +303,7 @@ under the License.
 				<version>2.4</version><!--$NO-MVN-MAN-VER$-->
 				<executions>
 					<execution>
-						<id>create-test-dependency</id>
+						<id>create-kmeans-jar</id>
 						<phase>process-test-classes</phase>
 						<goals>
 							<goal>single</goal>
@@ -314,10 +314,29 @@ under the License.
 									<mainClass>org.apache.flink.test.util.testjar.KMeansForTest</mainClass>
 								</manifest>
 							</archive>
-							<finalName>maven</finalName>
+							<finalName>kmeans</finalName>
 							<attach>false</attach>
 							<descriptors>
-								<descriptor>src/test/assembly/test-assembly.xml</descriptor>
+								<descriptor>src/test/assembly/test-kmeans-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
+					<execution>
+						<id>create-custominputsplit-jar</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<manifest>
+									<mainClass>org.apache.flink.test.classloading.jar.CustomInputSpitProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>customsplit</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/test/assembly/test-custominput-assembly.xml</descriptor>
 							</descriptors>
 						</configuration>
 					</execution>
@@ -332,7 +351,7 @@ under the License.
 				<version>2.5</version><!--$NO-MVN-MAN-VER$-->
 				<executions>
 					<execution>
-						<id>remove-kmeansfortest</id>
+						<id>remove-kmeans-test-dependencies</id>
 						<phase>process-test-classes</phase>
 						<goals>
 							<goal>clean</goal>
@@ -349,6 +368,24 @@ under the License.
 							</filesets>
 						</configuration>
 					</execution>
+					<execution>
+						<id>remove-custominputformat-test-dependencies</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>clean</goal>
+						</goals>
+						<configuration>
+							<excludeDefaultDirectories>true</excludeDefaultDirectories>
+							<filesets>
+								<fileset>
+									<directory>${project.build.testOutputDirectory}</directory>
+									<includes>
+										<include>**/classloading/jar/*.class</include>
+									</includes>
+								</fileset>
+							</filesets>
+						</configuration>
+					</execution>
 				</executions>
 			</plugin>
 		</plugins>

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-tests/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-assembly.xml b/flink-tests/src/test/assembly/test-assembly.xml
deleted file mode 100644
index bad0e38..0000000
--- a/flink-tests/src/test/assembly/test-assembly.xml
+++ /dev/null
@@ -1,37 +0,0 @@
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-
--->
-
-<assembly>
-	<id>test-jar</id>
-	<formats>
-		<format>jar</format>
-	</formats>
-	<includeBaseDirectory>false</includeBaseDirectory>
-	<fileSets>
-		<fileSet>
-			<directory>${project.build.testOutputDirectory}</directory>
-			<outputDirectory>/</outputDirectory>
-			<!--modify/add include to match your package(s) -->
-			<includes>
-				<include>org/apache/flink/test/util/testjar/**</include>
-			</includes>
-		</fileSet>
-	</fileSets>
-</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-tests/src/test/assembly/test-custominput-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-custominput-assembly.xml b/flink-tests/src/test/assembly/test-custominput-assembly.xml
new file mode 100644
index 0000000..9d4800b
--- /dev/null
+++ b/flink-tests/src/test/assembly/test-custominput-assembly.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/test/classloading/jar/**</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-tests/src/test/assembly/test-kmeans-assembly.xml
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/assembly/test-kmeans-assembly.xml b/flink-tests/src/test/assembly/test-kmeans-assembly.xml
new file mode 100644
index 0000000..bad0e38
--- /dev/null
+++ b/flink-tests/src/test/assembly/test-kmeans-assembly.xml
@@ -0,0 +1,37 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>org/apache/flink/test/util/testjar/**</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
new file mode 100644
index 0000000..24cce08
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/InputSplitClassLoaderITCase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading;
+
+import java.io.File;
+
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class InputSplitClassLoaderITCase {
+	
+	private static final String JAR_FILE = "target/customsplit-test-jar.jar";
+	
+	@Test
+	public void testJobWithCustomInputFormat() {
+		try {
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+			
+			ForkableFlinkMiniCluster testCluster = new ForkableFlinkMiniCluster(config, false);
+			try {
+				int port = testCluster.getJobManagerRPCPort();
+				
+				PackagedProgram prog = new PackagedProgram(new File(JAR_FILE),
+						new String[] { JAR_FILE, "localhost", String.valueOf(port) } );
+				prog.invokeInteractiveModeForExecution();
+			}
+			finally {
+				testCluster.shutdown();
+			}
+		}
+		catch (Throwable t) {
+			t.printStackTrace();
+			Assert.fail(t.getMessage());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
new file mode 100644
index 0000000..52cfa02
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSpitProgram.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.classloading.jar;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.InputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+@SuppressWarnings("serial")
+public class CustomInputSpitProgram {
+	
+	public static void main(String[] args) throws Exception {
+		
+		final String jarFile = args[0];
+		final String host = args[1];
+		final int port = Integer.parseInt(args[2]);
+		
+		ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(host, port, jarFile);
+
+		DataSet<Integer> data = env.createInput(new CustomInputFormat());
+
+		data
+			.map(new MapFunction<Integer, Tuple2<Integer, Double>>() {
+				@Override
+				public Tuple2<Integer, Double> map(Integer value) {
+					return new Tuple2<Integer, Double>(value, value * 0.5);
+				}
+			})
+			.output(new DiscardingOutputFormat<Tuple2<Integer,Double>>());
+
+		env.execute();
+	}
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class CustomInputFormat implements InputFormat<Integer, CustomInputSplit>,
ResultTypeQueryable<Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private Integer value;
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+			return null;
+		}
+
+		@Override
+		public CustomInputSplit[] createInputSplits(int minNumSplits) {
+			CustomInputSplit[] splits = new CustomInputSplit[minNumSplits];
+			for (int i = 0; i < minNumSplits; i++) {
+				splits[i] = new CustomInputSplit(i);
+			}
+			return splits;
+		}
+
+		@Override
+		public InputSplitAssigner getInputSplitAssigner(CustomInputSplit[] inputSplits) {
+			return new CustomSplitAssigner(inputSplits);
+		}
+
+		@Override
+		public void open(CustomInputSplit split) {
+			this.value = split.getSplitNumber();
+		}
+
+		@Override
+		public boolean reachedEnd() {
+			return this.value == null;
+		}
+
+		@Override
+		public Integer nextRecord(Integer reuse) {
+			Integer val = this.value;
+			this.value = null;
+			return val;
+		}
+
+		@Override
+		public void close() {}
+
+		@Override
+		public TypeInformation<Integer> getProducedType() {
+			return BasicTypeInfo.INT_TYPE_INFO;
+		}
+	}
+
+	public static final class CustomInputSplit implements InputSplit {
+
+		private static final long serialVersionUID = 1L;
+
+		private int splitNumber;
+
+		public CustomInputSplit() {
+			this(-1);
+		}
+
+		public CustomInputSplit(int splitNumber) {
+			this.splitNumber = splitNumber;
+		}
+
+		@Override
+		public int getSplitNumber() {
+			return this.splitNumber;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeInt(splitNumber);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			splitNumber = in.readInt();
+		}
+	}
+
+	public static final class CustomSplitAssigner implements InputSplitAssigner {
+
+		private final List<CustomInputSplit> remainingSplits;
+
+		public CustomSplitAssigner(CustomInputSplit[] splits) {
+			this.remainingSplits = new ArrayList<CustomInputSplit>(Arrays.asList(splits));
+		}
+
+		@Override
+		public InputSplit getNextInputSplit(String host, int taskId) {
+			synchronized (this) {
+				int size = remainingSplits.size();
+				if (size > 0) {
+					return remainingSplits.remove(size - 1);
+				} else {
+					return null;
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
index 6928963..40d3d96 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -29,10 +29,10 @@ import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 import org.junit.Assert;
 import org.junit.Test;
 
-// When the API changes KMeansForTest needs to be rebuilt and the KMeansForTest.jar in resources
needs
-// to be replaced with the new one.
 
 public class PackagedProgramEndToEndITCase {
+	
+	private static final String JAR_PATH = "target/kmeans-test-jar.jar";
 
 	@Test
 	public void testEverything() {
@@ -58,7 +58,7 @@ public class PackagedProgramEndToEndITCase {
 			fwClusters.write(KMeansData.INITIAL_CENTERS);
 			fwClusters.close();
 
-			String jarPath = "target/maven-test-jar.jar";
+			
 
 			// run KMeans
 			Configuration config = new Configuration();
@@ -68,7 +68,7 @@ public class PackagedProgramEndToEndITCase {
 
 			RemoteExecutor ex = new RemoteExecutor("localhost", cluster.getJobManagerRPCPort());
 
-			ex.executeJar(jarPath,
+			ex.executeJar(JAR_PATH,
 					"org.apache.flink.test.util.testjar.KMeansForTest",
 					new String[] {
 							points.toURI().toString(),

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
index c840dc0..fa1d58a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/operators/ObjectReuseITCase.java
@@ -88,6 +88,7 @@ public class ObjectReuseITCase extends JavaProgramTestBase {
 		return toParameterList(tConfigs);
 	}
 	
+	@SuppressWarnings({"unchecked", "serial"})
 	private static class Progs {
 		
 		public static String runProgram(int progId, String resultPath) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/a07d59d7/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
index f0d6b9f..5056b82 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/functions/ClosureCleanerITCase.scala
@@ -185,7 +185,7 @@ object TestObjectWithBogusReturns {
       nums.map { x => return 1; x * 2}.print()
     } catch {
       case inv: InvalidProgramException => // all good
-      case _ => fail("Bogus return statement not detected.")
+      case _: Throwable => fail("Bogus return statement not detected.")
     }
 
     nums.writeAsText(resultPath, WriteMode.OVERWRITE)


Mime
View raw message