flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [03/34] git commit: Rework stratosphere-clients tests to not expect a binary blob in the test resources (build the jar file using maven) add a test for ensuring the presense of a custom classloader in the CliFrontend Add missing ClassLoader for the "info
Date Tue, 10 Jun 2014 19:35:00 GMT
Rework stratosphere-clients tests to not expect a binary blob in the test resources (build the jar file using maven)
add a test for ensuring the presense of a custom classloader in the CliFrontend
Add missing ClassLoader for the "info" command in the CliFrontend


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/6c4b85c9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/6c4b85c9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/6c4b85c9

Branch: refs/heads/release-0.5.1
Commit: 6c4b85c94472e6ab7ab1ac46ac462c3237bfc7c6
Parents: bce608c
Author: Robert Metzger <metzgerr@web.de>
Authored: Wed May 28 23:27:02 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Jun 10 21:24:32 2014 +0200

----------------------------------------------------------------------
 stratosphere-addons/avro/pom.xml                |  76 +++++
 .../avro/src/test/assembly/test-assembly.xml    |  30 ++
 .../api/avro/AvroExternalJarProgram.java        | 218 --------------
 .../api/avro/AvroExternalJarProgramITCase.java  |   4 +-
 .../avro/testjar/AvroExternalJarProgram.java    | 228 ++++++++++++++
 .../avro/src/test/resources/AvroTestProgram.jar | Bin 5713 -> 0 bytes
 stratosphere-clients/pom.xml                    |  86 +++++-
 .../src/main/assembly/test-assembly.xml         |  30 ++
 .../eu/stratosphere/client/program/Client.java  |   5 +
 .../client/program/PackagedProgram.java         |   9 +-
 .../client/CliFrontendPackageProgramTest.java   |  88 +++++-
 .../client/CliFrontendTestUtils.java            |  16 +-
 .../client/program/PackagedProgramTest.java     |   9 +-
 .../testjar/JobWithExternalDependency.java      |  30 ++
 .../stratosphere/client/testjar/WordCount.java  | 162 ++++++++++
 .../src/test/resources/test.jar                 | Bin 4929 -> 0 bytes
 stratosphere-tests/pom.xml                      |  67 +++++
 .../src/test/assembly/test-assembly.xml         |  30 ++
 .../PackagedProgramEndToEndITCase.java          |  12 +-
 .../test/util/testjar/KMeansForTest.java        | 294 +++++++++++++++++++
 .../src/test/resources/KMeansForTest.jar        | Bin 144530 -> 0 bytes
 21 files changed, 1154 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-addons/avro/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/pom.xml b/stratosphere-addons/avro/pom.xml
index f772362..e9da4e4 100644
--- a/stratosphere-addons/avro/pom.xml
+++ b/stratosphere-addons/avro/pom.xml
@@ -51,6 +51,82 @@
 		
 	</dependencies>
 
+	<build>
+		<plugins>
+		<!-- Exclude ExternalJar contents from regular build -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<excludes>
+						<exclude>**/src/test/java/eu/stratosphere/api/avro/testjar/*.java</exclude>
+					</excludes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.4</version>
+				<executions>
+					<execution>
+						<id>create-test-dependency</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<manifest>
+									<mainClass>eu.stratosphere.api.avro.testjar.AvroExternalJarProgram</mainClass>
+								</manifest>
+							</archive>
+							<finalName>maven</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/test/assembly/test-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+		
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>
+											org.apache.maven.plugins
+										</groupId>
+										<artifactId>
+											maven-assembly-plugin
+										</artifactId>
+										<versionRange>
+											[2.4,)
+										</versionRange>
+										<goals>
+											<goal>single</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
+
 	<profiles>
 		<profile>
 			<!-- A bug with java6 is causing the javadoc generation

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-addons/avro/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/assembly/test-assembly.xml b/stratosphere-addons/avro/src/test/assembly/test-assembly.xml
new file mode 100644
index 0000000..ebb1b07
--- /dev/null
+++ b/stratosphere-addons/avro/src/test/assembly/test-assembly.xml
@@ -0,0 +1,30 @@
+<!--
+ Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+
+ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+	 http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations under the License.
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>eu/stratosphere/api/avro/testjar/**</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgram.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgram.java
deleted file mode 100644
index 704cf87..0000000
--- a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgram.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/***********************************************************************************************************************
- *
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- *
- **********************************************************************************************************************/
-
-// ================================================================================================
-//  This file defines the classes for the AvroExternalJarProgramITCase.
-//  The program is exported into src/test/resources/AvroTestProgram.jar.
-//
-//  THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED
-//  AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL
-//  NOT BE COVERED BY THIS TEST.
-// ================================================================================================
-//package eu.stratosphere.api.avro;
-//
-//import java.util.ArrayList;
-//import java.util.List;
-//
-//import eu.stratosphere.api.java.DataSet;
-//import eu.stratosphere.api.java.ExecutionEnvironment;
-//import eu.stratosphere.api.java.functions.MapFunction;
-//import eu.stratosphere.api.java.functions.ReduceFunction;
-//import eu.stratosphere.api.java.io.AvroInputFormat;
-//import eu.stratosphere.api.java.io.DiscardingOuputFormat;
-//import eu.stratosphere.api.java.tuple.Tuple2;
-//import eu.stratosphere.core.fs.Path;
-//
-//public class AvroExternalJarProgram  {
-//
-//	public static final class Color {
-//		
-//		private String name;
-//		private double saturation;
-//		
-//		public Color() {
-//			name = "";
-//			saturation = 1.0;
-//		}
-//		
-//		public Color(String name, double saturation) {
-//			this.name = name;
-//			this.saturation = saturation;
-//		}
-//		
-//		public String getName() {
-//			return name;
-//		}
-//		
-//		public void setName(String name) {
-//			this.name = name;
-//		}
-//		
-//		public double getSaturation() {
-//			return saturation;
-//		}
-//		
-//		public void setSaturation(double saturation) {
-//			this.saturation = saturation;
-//		}
-//		
-//		@Override
-//		public String toString() {
-//			return name + '(' + saturation + ')';
-//		}
-//	}
-//	
-//	public static final class MyUser {
-//		
-//		private String name;
-//		private List<Color> colors;
-//		
-//		public MyUser() {
-//			name = "unknown";
-//			colors = new ArrayList<Color>();
-//		}
-//		
-//		public MyUser(String name, List<Color> colors) {
-//			this.name = name;
-//			this.colors = colors;
-//		}
-//		
-//		public String getName() {
-//			return name;
-//		}
-//		
-//		public List<Color> getColors() {
-//			return colors;
-//		}
-//		
-//		public void setName(String name) {
-//			this.name = name;
-//		}
-//		
-//		public void setColors(List<Color> colors) {
-//			this.colors = colors;
-//		}
-//		
-//		@Override
-//		public String toString() {
-//			return name + " : " + colors;
-//		}
-//	}
-//	
-//	
-//	public static final class SUser extends AvroBaseValue<MyUser> {
-//		
-//		static final long serialVersionUID = 1L;
-//
-//		public SUser() {}
-//	
-//		public SUser(MyUser u) {
-//			super(u);
-//		}
-//	}
-//	
-//	// --------------------------------------------------------------------------------------------
-//	
-//	// --------------------------------------------------------------------------------------------
-//	
-//	public static final class NameExtractor extends MapFunction<MyUser, Tuple2<String, MyUser>> {
-//		private static final long serialVersionUID = 1L;
-//
-//		@Override
-//		public Tuple2<String, MyUser> map(MyUser u) {
-//			String namePrefix = u.getName().substring(0, 1);
-//			return new Tuple2<String, MyUser>(namePrefix, u);
-//		}
-//	}
-//	
-//	public static final class NameGrouper extends ReduceFunction<Tuple2<String, MyUser>> {
-//		private static final long serialVersionUID = 1L;
-//
-//		@Override
-//		public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) {
-//			return val1;
-//		}
-//	}
-//
-//	// --------------------------------------------------------------------------------------------
-//	//  Test Data
-//	// --------------------------------------------------------------------------------------------
-//	
-//	public static final class Generator {
-//		
-//		private final Random rnd = new Random(2389756789345689276L);
-//		
-//		public MyUser nextUser() {
-//			return randomUser();
-//		}
-//		
-//		private MyUser randomUser() {
-//			
-//			int numColors = rnd.nextInt(5);
-//			ArrayList<Color> colors = new ArrayList<Color>(numColors);
-//			for (int i = 0; i < numColors; i++) {
-//				colors.add(new Color(randomString(), rnd.nextDouble()));
-//			}
-//			
-//			return new MyUser(randomString(), colors);
-//		}
-//		
-//		private String randomString() {
-//			char[] c = new char[this.rnd.nextInt(20) + 5];
-//			
-//			for (int i = 0; i < c.length; i++) {
-//				c[i] = (char) (this.rnd.nextInt(150) + 40);
-//			}
-//			
-//			return new String(c);
-//		}
-//	}
-//	
-//	public static void writeTestData(File testFile, int numRecords) throws IOException {
-//		
-//		DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
-//		DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
-//		
-//		dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
-//		
-//		
-//		Generator generator = new Generator();
-//		
-//		for (int i = 0; i < numRecords; i++) {
-//			MyUser user = generator.nextUser();
-//			dataFileWriter.append(user);
-//		}
-//		
-//		dataFileWriter.close();
-//	}
-//
-//	public static void main(String[] args) throws Exception {
-//		String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
-//		writeTestData(new File(testDataFile), 50);
-//	}
-//	
-//	public static void main(String[] args) throws Exception {
-//		String inputPath = args[0];
-//		
-//		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-//		
-//		DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
-//	
-//		DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
-//		
-//		result.output(new DiscardingOuputFormat<Tuple2<String,MyUser>>());
-//		env.execute();
-//	}
-//}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
index 3140be4..a766fcb 100644
--- a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
+++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/AvroExternalJarProgramITCase.java
@@ -31,7 +31,7 @@ public class AvroExternalJarProgramITCase {
 
 	private static final int TEST_JM_PORT = 43191;
 	
-	private static final String JAR_FILE = "/AvroTestProgram.jar";
+	private static final String JAR_FILE = "target/maven-test-jar.jar";
 	
 	private static final String TEST_DATA_FILE = "/testdata.avro";
 
@@ -49,7 +49,7 @@ public class AvroExternalJarProgramITCase {
 			testMiniCluster.setJobManagerRpcPort(TEST_JM_PORT);
 			testMiniCluster.start();
 			
-			String jarFile = getClass().getResource(JAR_FILE).getFile();
+			String jarFile = JAR_FILE;
 			String testData = getClass().getResource(TEST_DATA_FILE).toString();
 			
 			PackagedProgram program = new PackagedProgram(new File(jarFile), new String[] { testData });

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/testjar/AvroExternalJarProgram.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/testjar/AvroExternalJarProgram.java b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/testjar/AvroExternalJarProgram.java
new file mode 100644
index 0000000..1db21b3
--- /dev/null
+++ b/stratosphere-addons/avro/src/test/java/eu/stratosphere/api/avro/testjar/AvroExternalJarProgram.java
@@ -0,0 +1,228 @@
+package eu.stratosphere.api.avro.testjar;
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+// ================================================================================================
+//  This file defines the classes for the AvroExternalJarProgramITCase.
+//  The program is exported into src/test/resources/AvroTestProgram.jar.
+//
+//  THIS FILE MUST STAY FULLY COMMENTED SUCH THAT THE HERE DEFINED CLASSES ARE NOT COMPILED
+//  AND ADDED TO THE test-classes DIRECTORY. OTHERWISE, THE EXTERNAL CLASS LOADING WILL
+//  NOT BE COVERED BY THIS TEST.
+// ================================================================================================
+
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumWriter;
+
+import eu.stratosphere.api.avro.AvroBaseValue;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.functions.ReduceFunction;
+import eu.stratosphere.api.java.io.AvroInputFormat;
+import eu.stratosphere.api.java.io.DiscardingOuputFormat;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.core.fs.Path;
+
+public class AvroExternalJarProgram  {
+
+	public static final class Color {
+		
+		private String name;
+		private double saturation;
+		
+		public Color() {
+			name = "";
+			saturation = 1.0;
+		}
+		
+		public Color(String name, double saturation) {
+			this.name = name;
+			this.saturation = saturation;
+		}
+		
+		public String getName() {
+			return name;
+		}
+		
+		public void setName(String name) {
+			this.name = name;
+		}
+		
+		public double getSaturation() {
+			return saturation;
+		}
+		
+		public void setSaturation(double saturation) {
+			this.saturation = saturation;
+		}
+		
+		@Override
+		public String toString() {
+			return name + '(' + saturation + ')';
+		}
+	}
+	
+	public static final class MyUser {
+		
+		private String name;
+		private List<Color> colors;
+		
+		public MyUser() {
+			name = "unknown";
+			colors = new ArrayList<Color>();
+		}
+		
+		public MyUser(String name, List<Color> colors) {
+			this.name = name;
+			this.colors = colors;
+		}
+		
+		public String getName() {
+			return name;
+		}
+		
+		public List<Color> getColors() {
+			return colors;
+		}
+		
+		public void setName(String name) {
+			this.name = name;
+		}
+		
+		public void setColors(List<Color> colors) {
+			this.colors = colors;
+		}
+		
+		@Override
+		public String toString() {
+			return name + " : " + colors;
+		}
+	}
+	
+	
+	public static final class SUser extends AvroBaseValue<MyUser> {
+		
+		static final long serialVersionUID = 1L;
+
+		public SUser() {}
+	
+		public SUser(MyUser u) {
+			super(u);
+		}
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class NameExtractor extends MapFunction<MyUser, Tuple2<String, MyUser>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, MyUser> map(MyUser u) {
+			String namePrefix = u.getName().substring(0, 1);
+			return new Tuple2<String, MyUser>(namePrefix, u);
+		}
+	}
+	
+	public static final class NameGrouper extends ReduceFunction<Tuple2<String, MyUser>> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Tuple2<String, MyUser> reduce(Tuple2<String, MyUser> val1, Tuple2<String, MyUser> val2) {
+			return val1;
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Test Data
+	// --------------------------------------------------------------------------------------------
+	
+	public static final class Generator {
+		
+		private final Random rnd = new Random(2389756789345689276L);
+		
+		public MyUser nextUser() {
+			return randomUser();
+		}
+		
+		private MyUser randomUser() {
+			
+			int numColors = rnd.nextInt(5);
+			ArrayList<Color> colors = new ArrayList<Color>(numColors);
+			for (int i = 0; i < numColors; i++) {
+				colors.add(new Color(randomString(), rnd.nextDouble()));
+			}
+			
+			return new MyUser(randomString(), colors);
+		}
+		
+		private String randomString() {
+			char[] c = new char[this.rnd.nextInt(20) + 5];
+			
+			for (int i = 0; i < c.length; i++) {
+				c[i] = (char) (this.rnd.nextInt(150) + 40);
+			}
+			
+			return new String(c);
+		}
+	}
+	
+	public static void writeTestData(File testFile, int numRecords) throws IOException {
+		
+		DatumWriter<MyUser> userDatumWriter = new ReflectDatumWriter<MyUser>(MyUser.class);
+		DataFileWriter<MyUser> dataFileWriter = new DataFileWriter<MyUser>(userDatumWriter);
+		
+		dataFileWriter.create(ReflectData.get().getSchema(MyUser.class), testFile);
+		
+		
+		Generator generator = new Generator();
+		
+		for (int i = 0; i < numRecords; i++) {
+			MyUser user = generator.nextUser();
+			dataFileWriter.append(user);
+		}
+		
+		dataFileWriter.close();
+	}
+
+//	public static void main(String[] args) throws Exception {
+//		String testDataFile = new File("src/test/resources/testdata.avro").getAbsolutePath();
+//		writeTestData(new File(testDataFile), 50);
+//	}
+	
+	public static void main(String[] args) throws Exception {
+		String inputPath = args[0];
+		
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		DataSet<MyUser> input = env.createInput(new AvroInputFormat<MyUser>(new Path(inputPath), MyUser.class));
+	
+		DataSet<Tuple2<String, MyUser>> result = input.map(new NameExtractor()).groupBy(0).reduce(new NameGrouper());
+		
+		result.output(new DiscardingOuputFormat<Tuple2<String,MyUser>>());
+		env.execute();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-addons/avro/src/test/resources/AvroTestProgram.jar
----------------------------------------------------------------------
diff --git a/stratosphere-addons/avro/src/test/resources/AvroTestProgram.jar b/stratosphere-addons/avro/src/test/resources/AvroTestProgram.jar
deleted file mode 100644
index eb56b62..0000000
Binary files a/stratosphere-addons/avro/src/test/resources/AvroTestProgram.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-clients/pom.xml b/stratosphere-clients/pom.xml
index 4daf320..d812d3c 100644
--- a/stratosphere-clients/pom.xml
+++ b/stratosphere-clients/pom.xml
@@ -35,6 +35,12 @@
 			<artifactId>stratosphere-compiler</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		
+		<dependency>
+			<groupId>eu.stratosphere</groupId>
+			<artifactId>stratosphere-java</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 
 		<dependency>
 			<groupId>org.eclipse.jetty</groupId>
@@ -62,8 +68,7 @@
 			<scope>compile</scope>
 		</dependency>
 
-		<!-- commons-io is required by commons-fileupload
-			See http://commons.apache.org/proper/commons-fileupload/dependencies.html
+		<!-- commons-io is required by commons-fileupload See http://commons.apache.org/proper/commons-fileupload/dependencies.html 
 			and https://github.com/dimalabs/ozone/pull/157 -->
 		<dependency>
 			<groupId>commons-io</groupId>
@@ -73,4 +78,81 @@
 		</dependency>
 	</dependencies>
 
+	<!-- More information on this:
+		http://stackoverflow.com/questions/1401857/using-maven-to-build-separate-jar-files-for-unit-testing-a-custom-class-loader
+	 -->
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-compiler-plugin</artifactId>
+				<configuration>
+					<excludes>
+						<exclude>**/src/test/java/eu/stratosphere/client/testjar/*.java</exclude>
+					</excludes>
+				</configuration>
+			</plugin>
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.4</version>
+				<executions>
+					<execution>
+						<id>create-test-dependency</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<manifest>
+									<mainClass>eu.stratosphere.client.testjar.WordCount</mainClass>
+								</manifest>
+							</archive>
+							<finalName>maven</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/main/assembly/test-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+		
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>
+											org.apache.maven.plugins
+										</groupId>
+										<artifactId>
+											maven-assembly-plugin
+										</artifactId>
+										<versionRange>
+											[2.4,)
+										</versionRange>
+										<goals>
+											<goal>single</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/main/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/assembly/test-assembly.xml b/stratosphere-clients/src/main/assembly/test-assembly.xml
new file mode 100644
index 0000000..c45d378
--- /dev/null
+++ b/stratosphere-clients/src/main/assembly/test-assembly.xml
@@ -0,0 +1,30 @@
+<!--
+ Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+
+ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+	 http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations under the License.
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>eu/stratosphere/client/testjar**</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
index 632c854..dfe9fdd 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/Client.java
@@ -21,6 +21,8 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import com.google.common.base.Preconditions;
+
 import eu.stratosphere.api.common.JobExecutionResult;
 import eu.stratosphere.api.common.Plan;
 import eu.stratosphere.api.java.ExecutionEnvironment;
@@ -67,6 +69,7 @@ public class Client {
 	 * @param jobManagerAddress Address and port of the job-manager.
 	 */
 	public Client(InetSocketAddress jobManagerAddress, Configuration config) {
+		Preconditions.checkNotNull(config, "Configuration is null");
 		this.configuration = config;
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerAddress.getAddress().getHostAddress());
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerAddress.getPort());
@@ -84,6 +87,7 @@ public class Client {
 	 * @param config The config used to obtain the job-manager's address.
 	 */
 	public Client(Configuration config) {
+		Preconditions.checkNotNull(config, "Configuration is null");
 		this.configuration = config;
 		
 		// instantiate the address to the job manager
@@ -126,6 +130,7 @@ public class Client {
 	}
 	
 	public OptimizedPlan getOptimizedPlan(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException {
+		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
 			return getOptimizedPlan(prog.getPlanWithJars(), parallelism);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
index 2ee1765..a0fc1b7 100644
--- a/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
+++ b/stratosphere-clients/src/main/java/eu/stratosphere/client/program/PackagedProgram.java
@@ -73,7 +73,7 @@ public class PackagedProgram {
 	
 	private final List<File> extractedTempLibraries;
 	
-	private final ClassLoader userCodeClassLoader;
+	private ClassLoader userCodeClassLoader;
 	
 	private Plan plan;
 
@@ -204,6 +204,7 @@ public class PackagedProgram {
 	 *         missing parameters for generation.
 	 */
 	public String getPreviewPlan() throws ProgramInvocationException {
+		Thread.currentThread().setContextClassLoader(this.getUserCodeClassLoader());
 		List<DataSinkNode> previewPlan;
 		
 		if (isUsingProgramEntryPoint()) {
@@ -312,6 +313,12 @@ public class PackagedProgram {
 	public ClassLoader getUserCodeClassLoader() {
 		return this.userCodeClassLoader;
 	}
+
+	public void setUserCodeClassLoader(ClassLoader cl) {
+		this.userCodeClassLoader = cl;
+	}
+	
+	
 	
 	public List<File> getAllLibraries() {
 		List<File> libs = new ArrayList<File>(this.extractedTempLibraries.size() + 1);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendPackageProgramTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendPackageProgramTest.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendPackageProgramTest.java
index 3ff5663..00cda03 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendPackageProgramTest.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendPackageProgramTest.java
@@ -15,9 +15,13 @@
 
 package eu.stratosphere.client;
 
+import static eu.stratosphere.client.CliFrontendTestUtils.TEST_JAR_CLASSLOADERTEST_CLASS;
+import static eu.stratosphere.client.CliFrontendTestUtils.TEST_JAR_MAIN_CLASS;
+import static eu.stratosphere.client.CliFrontendTestUtils.getNonJarFilePath;
+import static eu.stratosphere.client.CliFrontendTestUtils.getTestJarPath;
+import static eu.stratosphere.client.CliFrontendTestUtils.pipeSystemOutToNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static eu.stratosphere.client.CliFrontendTestUtils.*;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -26,7 +30,12 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import eu.stratosphere.client.program.Client;
 import eu.stratosphere.client.program.PackagedProgram;
+import eu.stratosphere.client.program.ProgramInvocationException;
+import eu.stratosphere.compiler.CompilerException;
+import eu.stratosphere.configuration.ConfigConstants;
+import eu.stratosphere.configuration.Configuration;
 
 
 public class CliFrontendPackageProgramTest {
@@ -186,4 +195,81 @@ public class CliFrontendPackageProgramTest {
 			fail("Program caused an exception: " + e.getMessage());
 		}
 	}
+	
+	/**
+	 * Ensure that we will never have the following error.
+	 * 
+	 * The test works as follows:
+	 * - Use the CliFrontend to invoke a jar file that loads a class which is only available
+	 * 	in the jarfile itself (via a custom classloader)
+	 * - Change the Usercode classloader of the PackagedProgram to a special classloader for this test
+	 * - the classloader will accept the special class (and return a String.class)
+	 * 
+	 * 	eu.stratosphere.client.program.ProgramInvocationException: The main method caused an error.
+		at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:398)
+		at eu.stratosphere.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:301)
+		at eu.stratosphere.client.program.Client.getOptimizedPlan(Client.java:140)
+		at eu.stratosphere.client.program.Client.getOptimizedPlanAsJson(Client.java:125)
+		at eu.stratosphere.client.CliFrontend.info(CliFrontend.java:439)
+		at eu.stratosphere.client.CliFrontend.parseParameters(CliFrontend.java:931)
+		at eu.stratosphere.client.CliFrontend.main(CliFrontend.java:951)
+	Caused by: java.io.IOException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.RCFileInputFormat
+		at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:102)
+		at org.apache.hcatalog.mapreduce.HCatInputFormat.setInput(HCatInputFormat.java:54)
+		at tlabs.CDR_In_Report.createHCatInputFormat(CDR_In_Report.java:322)
+		at tlabs.CDR_Out_Report.main(CDR_Out_Report.java:380)
+		at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
+		at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
+		at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
+		at java.lang.reflect.Method.invoke(Method.java:622)
+		at eu.stratosphere.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:383)
+
+	 */
+	@Test
+	public void testPlanWithExternalClass() throws CompilerException, ProgramInvocationException {
+		final Boolean callme[] = { false }; // create a final object reference, to be able to change its val later
+		try {
+			String[] parameters = {getTestJarPath(), "-c", TEST_JAR_CLASSLOADERTEST_CLASS , "some", "program"};
+			CommandLine line = new PosixParser().parse(CliFrontend.getProgramSpecificOptions(new Options()), parameters, false);
+			
+			CliFrontend frontend = new CliFrontend();
+			Object result = frontend.buildProgram(line);
+			assertTrue(result instanceof PackagedProgram);
+			
+			PackagedProgram prog = (PackagedProgram) result;
+			
+			Assert.assertArrayEquals(new String[] {"some", "program"}, prog.getArguments());
+			Assert.assertEquals(TEST_JAR_CLASSLOADERTEST_CLASS, prog.getMainClassName());
+			prog.setUserCodeClassLoader(new ClassLoader(prog.getUserCodeClassLoader()) {
+				@Override
+				public Class<?> loadClass(String name) throws ClassNotFoundException {
+					assertTrue(name.equals("org.apache.hadoop.hive.ql.io.RCFileInputFormat"));
+					callme[0] = true;
+					return String.class; // Intentionally return the wrong class.
+				}
+			});
+			
+			Configuration c = new Configuration();
+			c.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "devil");
+			Client cli = new Client(c);
+			
+			cli.getOptimizedPlanAsJson(prog, 666);
+		} catch(ProgramInvocationException pie) {
+			assertTrue("Classloader was not called", callme[0]);
+			// class not found exception is expected as some point
+			if( ! ( pie.getCause() instanceof ClassNotFoundException ) ) {
+				System.err.println(pie.getMessage());
+				pie.printStackTrace();
+				fail("Program caused an exception: " + pie.getMessage());
+			}
+		}
+		catch (Exception e) {
+			assertTrue("Classloader was not called", callme[0]);
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail("Program caused an exception: " + e.getMessage());
+		}
+		
+		
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendTestUtils.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendTestUtils.java
index fa2c300..342b711 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendTestUtils.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/CliFrontendTestUtils.java
@@ -18,15 +18,20 @@ package eu.stratosphere.client;
 import static org.junit.Assert.fail;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.PrintStream;
 import java.lang.reflect.Field;
+import java.net.MalformedURLException;
 import java.util.Map;
 
 import eu.stratosphere.configuration.GlobalConfiguration;
 
 public class CliFrontendTestUtils {
 	
-	public static final String TEST_JAR_MAIN_CLASS = "eu.stratosphere.example.java.wordcount.WordCount";
+	public static final String TEST_JAR_MAIN_CLASS = "eu.stratosphere.client.testjar.WordCount";
+	
+	public static final String TEST_JAR_CLASSLOADERTEST_CLASS = "eu.stratosphere.client.testjar.JobWithExternalDependency";
+	
 	
 	public static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";
 	
@@ -37,8 +42,13 @@ public class CliFrontendTestUtils {
 	public static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
 	
 	
-	public static String getTestJarPath() {
-		return CliFrontendRunTest.class.getResource("/test.jar").getFile();
+	public static String getTestJarPath() throws FileNotFoundException, MalformedURLException {
+		File f = new File("target/maven-test-jar.jar");
+		if(!f.exists()) {
+			throw new FileNotFoundException("Test jar not present. Invoke tests using maven "
+					+ "or build the jar using 'mvn process-test-classes' in stratosphere-clients");
+		}
+		return f.getAbsolutePath();
 	}
 	
 	public static String getNonJarFilePath() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/test/java/eu/stratosphere/client/program/PackagedProgramTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/program/PackagedProgramTest.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/program/PackagedProgramTest.java
index 9bd9b43..856d0a1 100644
--- a/stratosphere-clients/src/test/java/eu/stratosphere/client/program/PackagedProgramTest.java
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/program/PackagedProgramTest.java
@@ -15,23 +15,20 @@
 package eu.stratosphere.client.program;
 
 import java.io.File;
-import java.net.URL;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import eu.stratosphere.client.CliFrontendTestUtils;
+
 
 public class PackagedProgramTest {
 
-	private static final String TEST_PROG_FILE_PATH = "/test.jar";
-	
 	@Test
 	public void testGetPreviewPlan() {
 		try {
 			
-			URL jarFileURL = getClass().getResource(TEST_PROG_FILE_PATH);
-			
-			PackagedProgram prog = new PackagedProgram(new File(jarFileURL.getFile()));
+			PackagedProgram prog = new PackagedProgram(new File(CliFrontendTestUtils.getTestJarPath()));
 			Assert.assertNotNull(prog.getPreviewPlan());
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/JobWithExternalDependency.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/JobWithExternalDependency.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/JobWithExternalDependency.java
new file mode 100644
index 0000000..0387e65
--- /dev/null
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/JobWithExternalDependency.java
@@ -0,0 +1,30 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.client.testjar;
+
+/**
+ * Simulate a class that requires an external dependency
+ *
+ */
+public class JobWithExternalDependency {
+	
+	public static final String EXTERNAL_CLASS = "org.apache.hadoop.hive.ql.io.RCFileInputFormat";
+
+	public static void main(String[] args) throws ClassNotFoundException {
+		ClassLoader cl = Thread.currentThread().getContextClassLoader();
+		Class.forName(EXTERNAL_CLASS, false, cl);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
new file mode 100644
index 0000000..458c848
--- /dev/null
+++ b/stratosphere-clients/src/test/java/eu/stratosphere/client/testjar/WordCount.java
@@ -0,0 +1,162 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.client.testjar;
+
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.aggregation.Aggregations;
+import eu.stratosphere.api.java.functions.FlatMapFunction;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.util.Collector;
+
+/**
+ * Wordcount for placing at least something into the jar file.
+ * 
+ */
+public class WordCount {
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	public static void main(String[] args) throws Exception {
+		
+		if(!parseParameters(args)) {
+			return;
+		}
+		
+		// set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		// get input data
+		DataSet<String> text = getTextDataSet(env);
+		
+		DataSet<Tuple2<String, Integer>> counts = 
+				// split up the lines in pairs (2-tuples) containing: (word,1)
+				text.flatMap(new Tokenizer())
+				// group by the tuple field "0" and sum up tuple field "1"
+				.groupBy(0)
+				.aggregate(Aggregations.SUM, 1);
+
+		// emit result
+		if(fileOutput) {
+			counts.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			counts.print();
+		}
+		
+		// execute program
+		env.execute("WordCount Example");
+	}
+	
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+	
+	/**
+	 * Implements the string tokenizer that splits sentences into words as a user-defined
+	 * FlatMapFunction. The function takes a line (String) and splits it into 
+	 * multiple pairs in the form of "(word,1)" (Tuple2<String, Integer>).
+	 */
+	public static final class Tokenizer extends FlatMapFunction<String, Tuple2<String, Integer>> {
+
+		@Override
+		public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
+			// normalize and split the line
+			String[] tokens = value.toLowerCase().split("\\W+");
+			
+			// emit the pairs
+			for (String token : tokens) {
+				if (token.length() > 0) {
+					out.collect(new Tuple2<String, Integer>(token, 1));
+				}
+			}
+		}
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String textPath;
+	private static String outputPath;
+	
+	private static boolean parseParameters(String[] args) {
+		
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(args.length == 2) {
+				textPath = args[0];
+				outputPath = args[1];
+			} else {
+				System.err.println("Usage: WordCount <text path> <result path>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing WordCount example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from a file.");
+			System.out.println("  Usage: WordCount <text path> <result path>");
+		}
+		return true;
+	}
+	
+	private static DataSet<String> getTextDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			// read the text file from given input path
+			return env.readTextFile(textPath);
+		} else {
+			// get default test text data
+			return env.fromElements(
+					"To be, or not to be,--that is the question:--",
+					"Whether 'tis nobler in the mind to suffer",
+					"The slings and arrows of outrageous fortune",
+					"Or to take arms against a sea of troubles,",
+					"And by opposing end them?--To die,--to sleep,--",
+					"No more; and by a sleep to say we end",
+					"The heartache, and the thousand natural shocks",
+					"That flesh is heir to,--'tis a consummation",
+					"Devoutly to be wish'd. To die,--to sleep;--",
+					"To sleep! perchance to dream:--ay, there's the rub;",
+					"For in that sleep of death what dreams may come,",
+					"When we have shuffled off this mortal coil,",
+					"Must give us pause: there's the respect",
+					"That makes calamity of so long life;",
+					"For who would bear the whips and scorns of time,",
+					"The oppressor's wrong, the proud man's contumely,",
+					"The pangs of despis'd love, the law's delay,",
+					"The insolence of office, and the spurns",
+					"That patient merit of the unworthy takes,",
+					"When he himself might his quietus make",
+					"With a bare bodkin? who would these fardels bear,",
+					"To grunt and sweat under a weary life,",
+					"But that the dread of something after death,--",
+					"The undiscover'd country, from whose bourn",
+					"No traveller returns,--puzzles the will,",
+					"And makes us rather bear those ills we have",
+					"Than fly to others that we know not of?",
+					"Thus conscience does make cowards of us all;",
+					"And thus the native hue of resolution",
+					"Is sicklied o'er with the pale cast of thought;",
+					"And enterprises of great pith and moment,",
+					"With this regard, their currents turn awry,",
+					"And lose the name of action.--Soft you now!",
+					"The fair Ophelia!--Nymph, in thy orisons",
+					"Be all my sins remember'd."
+					);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-clients/src/test/resources/test.jar
----------------------------------------------------------------------
diff --git a/stratosphere-clients/src/test/resources/test.jar b/stratosphere-clients/src/test/resources/test.jar
deleted file mode 100644
index d0ce39b..0000000
Binary files a/stratosphere-clients/src/test/resources/test.jar and /dev/null differ

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-tests/pom.xml
----------------------------------------------------------------------
diff --git a/stratosphere-tests/pom.xml b/stratosphere-tests/pom.xml
index 6029ec3..7dec412 100644
--- a/stratosphere-tests/pom.xml
+++ b/stratosphere-tests/pom.xml
@@ -86,6 +86,11 @@
 						</goals>
 					</execution>
 				</executions>
+				<configuration>
+					<excludes>
+						<exclude>**/src/test/java/eu/stratosphere/test/util/testjar/*.java</exclude>
+					</excludes>
+				</configuration>
 			</plugin>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
@@ -114,6 +119,68 @@
 					<perCoreThreadCount>false</perCoreThreadCount>
 				</configuration>
 			</plugin>
+			
+			<plugin>
+				<artifactId>maven-assembly-plugin</artifactId>
+				<version>2.4</version>
+				<executions>
+					<execution>
+						<id>create-test-dependency</id>
+						<phase>process-test-classes</phase>
+						<goals>
+							<goal>single</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<manifest>
+									<mainClass>eu.stratosphere.test.util.testjar.KMeansForTest</mainClass>
+								</manifest>
+							</archive>
+							<finalName>maven</finalName>
+							<attach>false</attach>
+							<descriptors>
+								<descriptor>src/test/assembly/test-assembly.xml</descriptor>
+							</descriptors>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
+		
+		<pluginManagement>
+			<plugins>
+				<!--This plugin's configuration is used to store Eclipse m2e settings only. It has no influence on the Maven build itself.-->
+				<plugin>
+					<groupId>org.eclipse.m2e</groupId>
+					<artifactId>lifecycle-mapping</artifactId>
+					<version>1.0.0</version>
+					<configuration>
+						<lifecycleMappingMetadata>
+							<pluginExecutions>
+								<pluginExecution>
+									<pluginExecutionFilter>
+										<groupId>
+											org.apache.maven.plugins
+										</groupId>
+										<artifactId>
+											maven-assembly-plugin
+										</artifactId>
+										<versionRange>
+											[2.4,)
+										</versionRange>
+										<goals>
+											<goal>single</goal>
+										</goals>
+									</pluginExecutionFilter>
+									<action>
+										<ignore></ignore>
+									</action>
+								</pluginExecution>
+							</pluginExecutions>
+						</lifecycleMappingMetadata>
+					</configuration>
+				</plugin>
+			</plugins>
+		</pluginManagement>
 	</build>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-tests/src/test/assembly/test-assembly.xml
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/assembly/test-assembly.xml b/stratosphere-tests/src/test/assembly/test-assembly.xml
new file mode 100644
index 0000000..2b7f111
--- /dev/null
+++ b/stratosphere-tests/src/test/assembly/test-assembly.xml
@@ -0,0 +1,30 @@
+<!--
+ Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+
+ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+	 http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations under the License.
+-->
+
+<assembly>
+	<id>test-jar</id>
+	<formats>
+		<format>jar</format>
+	</formats>
+	<includeBaseDirectory>false</includeBaseDirectory>
+	<fileSets>
+		<fileSet>
+			<directory>${project.build.testOutputDirectory}</directory>
+			<outputDirectory>/</outputDirectory>
+			<!--modify/add include to match your package(s) -->
+			<includes>
+				<include>eu/stratosphere/test/util/testjar/**</include>
+			</includes>
+		</fileSet>
+	</fileSets>
+</assembly>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
index ba86bde..3626ba7 100644
--- a/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/localDistributed/PackagedProgramEndToEndITCase.java
@@ -14,7 +14,6 @@ package eu.stratosphere.test.localDistributed;
 
 import java.io.File;
 import java.io.FileWriter;
-import java.net.URL;
 
 import eu.stratosphere.client.minicluster.NepheleMiniCluster;
 import org.junit.Assert;
@@ -54,21 +53,20 @@ public class PackagedProgramEndToEndITCase {
 			fwClusters.write(KMeansData.INITIAL_CENTERS);
 			fwClusters.close();
 
-			URL jarFileURL = getClass().getResource("/KMeansForTest.jar");
-			String jarPath = jarFileURL.getFile();
+			String jarPath = "target/maven-test-jar.jar";
 
 			// run KMeans
 			cluster.setNumTaskManager(2);
 			cluster.start();
 			RemoteExecutor ex = new RemoteExecutor("localhost", 6498);
-
+			
 			ex.executeJar(jarPath,
-					"eu.stratosphere.examples.scala.testing.KMeansForTest",
-					new String[] {"4",
+					"eu.stratosphere.test.util.testjar.KMeansForTest",
+					new String[] {
 							points.toURI().toString(),
 							clusters.toURI().toString(),
 							outFile.toURI().toString(),
-							"1"});
+							"25"});
 
 			points.delete();
 			clusters.delete();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
new file mode 100644
index 0000000..c13ddc8
--- /dev/null
+++ b/stratosphere-tests/src/test/java/eu/stratosphere/test/util/testjar/KMeansForTest.java
@@ -0,0 +1,294 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+
+package eu.stratosphere.test.util.testjar;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import eu.stratosphere.api.common.Plan;
+import eu.stratosphere.api.common.Program;
+import eu.stratosphere.api.java.DataSet;
+import eu.stratosphere.api.java.ExecutionEnvironment;
+import eu.stratosphere.api.java.IterativeDataSet;
+import eu.stratosphere.api.java.RemoteEnvironment;
+import eu.stratosphere.api.java.functions.MapFunction;
+import eu.stratosphere.api.java.functions.ReduceFunction;
+import eu.stratosphere.api.java.tuple.Tuple2;
+import eu.stratosphere.api.java.tuple.Tuple3;
+import eu.stratosphere.configuration.Configuration;
+
+@SuppressWarnings("serial")
+public class KMeansForTest implements Program{
+	
+	// *************************************************************************
+	//     PROGRAM
+	// *************************************************************************
+	
+	
+	@Override
+	public Plan getPlan(String... args) {
+		if(!parseParameters(args)) {
+			throw new RuntimeException("Unable to parse the arguments");
+		}
+	
+		// set up execution environment
+		ExecutionEnvironment env = new RemoteEnvironment("localhost", 1, null);
+		
+		// get input data
+		DataSet<Point> points = getPointDataSet(env);
+		DataSet<Centroid> centroids = getCentroidDataSet(env);
+		
+		// set number of bulk iterations for KMeans algorithm
+		IterativeDataSet<Centroid> loop = centroids.iterate(numIterations);
+		
+		DataSet<Centroid> newCentroids = points
+			// compute closest centroid for each point
+			.map(new SelectNearestCenter()).withBroadcastSet(loop, "centroids")
+			// count and sum point coordinates for each centroid
+			.map(new CountAppender())
+			.groupBy(0).reduce(new CentroidAccumulator())
+			// compute new centroids from point counts and coordinate sums
+			.map(new CentroidAverager());
+		
+		// feed new centroids back into next iteration
+		DataSet<Centroid> finalCentroids = loop.closeWith(newCentroids);
+		
+		DataSet<Tuple2<Integer, Point>> clusteredPoints = points
+				// assign points to final clusters
+				.map(new SelectNearestCenter()).withBroadcastSet(finalCentroids, "centroids");
+		
+		// emit result
+		if(fileOutput) {
+			clusteredPoints.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			clusteredPoints.print();
+		}
+		return env.createProgramPlan();
+	}
+	
+	// *************************************************************************
+	//     DATA TYPES
+	// *************************************************************************
+	
+	/**
+	 * A simple two-dimensional point.
+	 */
+	public static class Point implements Serializable {
+		
+		public double x, y;
+		
+		public Point() {}
+
+		public Point(double x, double y) {
+			this.x = x;
+			this.y = y;
+		}
+		
+		public Point add(Point other) {
+			x += other.x;
+			y += other.y;
+			return this;
+		}
+		
+		public Point div(long val) {
+			x /= val;
+			y /= val;
+			return this;
+		}
+		
+		public double euclideanDistance(Point other) {
+			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
+		}
+		
+		public void clear() {
+			x = y = 0.0;
+		}
+		
+		@Override
+		public String toString() {
+			return x + " " + y;
+		}
+	}
+	
+	/**
+	 * A simple two-dimensional centroid, basically a point with an ID. 
+	 */
+	public static class Centroid extends Point {
+		
+		public int id;
+		
+		public Centroid() {}
+		
+		public Centroid(int id, double x, double y) {
+			super(x,y);
+			this.id = id;
+		}
+		
+		public Centroid(int id, Point p) {
+			super(p.x, p.y);
+			this.id = id;
+		}
+		
+		@Override
+		public String toString() {
+			return id + " " + super.toString();
+		}
+	}
+	
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+	
+	/** Converts a Tuple2<Double,Double> into a Point. */
+	public static final class TuplePointConverter extends MapFunction<Tuple2<Double, Double>, Point> {
+
+		@Override
+		public Point map(Tuple2<Double, Double> t) throws Exception {
+			return new Point(t.f0, t.f1);
+		}
+	}
+	
+	/** Converts a Tuple3<Integer, Double,Double> into a Centroid. */
+	public static final class TupleCentroidConverter extends MapFunction<Tuple3<Integer, Double, Double>, Centroid> {
+
+		@Override
+		public Centroid map(Tuple3<Integer, Double, Double> t) throws Exception {
+			return new Centroid(t.f0, t.f1, t.f2);
+		}
+	}
+	
+	/** Determines the closest cluster center for a data point. */
+	public static final class SelectNearestCenter extends MapFunction<Point, Tuple2<Integer, Point>> {
+		private Collection<Centroid> centroids;
+
+		/** Reads the centroid values from a broadcast variable into a collection. */
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			this.centroids = getRuntimeContext().getBroadcastVariable("centroids");
+		}
+		
+		@Override
+		public Tuple2<Integer, Point> map(Point p) throws Exception {
+			
+			double minDistance = Double.MAX_VALUE;
+			int closestCentroidId = -1;
+			
+			// check all cluster centers
+			for (Centroid centroid : centroids) {
+				// compute distance
+				double distance = p.euclideanDistance(centroid);
+				
+				// update nearest cluster if necessary 
+				if (distance < minDistance) {
+					minDistance = distance;
+					closestCentroidId = centroid.id;
+				}
+			}
+
+			// emit a new record with the center id and the data point.
+			return new Tuple2<Integer, Point>(closestCentroidId, p);
+		}
+	}
+	
+	/** Appends a count variable to the tuple. */ 
+	public static final class CountAppender extends MapFunction<Tuple2<Integer, Point>, Tuple3<Integer, Point, Long>> {
+
+		@Override
+		public Tuple3<Integer, Point, Long> map(Tuple2<Integer, Point> t) {
+			return new Tuple3<Integer, Point, Long>(t.f0, t.f1, 1L);
+		} 
+	}
+	
+	/** Sums and counts point coordinates. */
+	public static final class CentroidAccumulator extends ReduceFunction<Tuple3<Integer, Point, Long>> {
+
+		@Override
+		public Tuple3<Integer, Point, Long> reduce(Tuple3<Integer, Point, Long> val1, Tuple3<Integer, Point, Long> val2) {
+			return new Tuple3<Integer, Point, Long>(val1.f0, val1.f1.add(val2.f1), val1.f2 + val2.f2);
+		}
+	}
+	
+	/** Computes new centroid from coordinate sum and count of points. */
+	public static final class CentroidAverager extends MapFunction<Tuple3<Integer, Point, Long>, Centroid> {
+
+		@Override
+		public Centroid map(Tuple3<Integer, Point, Long> value) {
+			return new Centroid(value.f0, value.f1.div(value.f2));
+		}
+	}
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+	
+	private static boolean fileOutput = false;
+	private static String pointsPath = null;
+	private static String centersPath = null;
+	private static String outputPath = null;
+	private static int numIterations = 10;
+	
+	private static boolean parseParameters(String[] programArguments) {
+		
+		if(programArguments.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+			if(programArguments.length == 4) {
+				pointsPath = programArguments[0];
+				centersPath = programArguments[1];
+				outputPath = programArguments[2];
+				numIterations = Integer.parseInt(programArguments[3]);
+			} else {
+				System.err.println("Usage: KMeans <points path> <centers path> <result path> <num iterations>");
+				return false;
+			}
+		} else {
+			System.out.println("Executing K-Means example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  We provide a data generator to create synthetic input files for this program.");
+			System.out.println("  Usage: KMeans <points path> <centers path> <result path> <num iterations>");
+		}
+		return true;
+	}
+	
+	private static DataSet<Point> getPointDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			// read points from CSV file
+			return env.readCsvFile(pointsPath)
+						.fieldDelimiter('|')
+						.includeFields(true, true)
+						.types(Double.class, Double.class)
+						.map(new TuplePointConverter());
+		} else {
+			throw new UnsupportedOperationException("Use file output");
+		}
+	}
+	
+	private static DataSet<Centroid> getCentroidDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env.readCsvFile(centersPath)
+						.fieldDelimiter('|')
+						.includeFields(true, true, true)
+						.types(Integer.class, Double.class, Double.class)
+						.map(new TupleCentroidConverter());
+		} else {
+			throw new UnsupportedOperationException("Use file output");
+		}
+	}
+
+	
+		
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/6c4b85c9/stratosphere-tests/src/test/resources/KMeansForTest.jar
----------------------------------------------------------------------
diff --git a/stratosphere-tests/src/test/resources/KMeansForTest.jar b/stratosphere-tests/src/test/resources/KMeansForTest.jar
deleted file mode 100644
index 34683ad..0000000
Binary files a/stratosphere-tests/src/test/resources/KMeansForTest.jar and /dev/null differ


Mime
View raw message