flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/3] git commit: [FLINK-12] Clean up configuration object - Remove class loader (was inconsistently used and set) - Objects are stored in their type, rather than as a string
Date Fri, 03 Oct 2014 12:40:38 GMT
[FLINK-12] Clean up configuration object
  - Remove class loader (was inconsistently used and set)
  - Objects are stored in their type, rather than as a string


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/02314adc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/02314adc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/02314adc

Branch: refs/heads/master
Commit: 02314adca221982accb762f3bbe6c5a727e663a4
Parents: da3e507
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Sep 26 17:05:50 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Oct 3 14:21:54 2014 +0200

----------------------------------------------------------------------
 .../java/record/io/jdbc/JDBCOutputFormat.java   |  20 +-
 .../spargel/java/record/SpargelIteration.java   |  24 +-
 .../flink/streaming/api/StreamConfig.java       |  10 +-
 .../flink/client/CliFrontendTestUtils.java      |  12 +-
 .../org/apache/flink/compiler/PactCompiler.java |   6 +-
 .../flink/compiler/dag/DataSourceNode.java      |   1 -
 .../api/common/functions/RuntimeContext.java    |   8 +
 .../flink/configuration/Configuration.java      | 459 +++++++++++--------
 .../configuration/GlobalConfiguration.java      | 405 ++++------------
 .../flink/configuration/ConfigurationTest.java  | 153 ++++++-
 .../configuration/GlobalConfigurationTest.java  |  19 +-
 flink-core/src/test/resources/logback-test.xml  |   1 +
 .../operators/translation/WrappingFunction.java |   5 +
 .../api/java/record/io/CsvInputFormat.java      |  27 +-
 .../api/java/record/io/CsvOutputFormat.java     |  20 +-
 .../task/AbstractIterativePactTask.java         |   7 +-
 .../iterative/task/IterationHeadPactTask.java   |   2 +-
 .../task/IterationSynchronizationSinkTask.java  |   4 +-
 .../flink/runtime/operators/DataSinkTask.java   |   1 -
 .../flink/runtime/operators/DataSourceTask.java |   1 -
 .../runtime/operators/RegularPactTask.java      |   3 +-
 .../chaining/ChainedCollectorMapDriver.java     |   2 +-
 .../operators/chaining/ChainedDriver.java       |   3 +-
 .../operators/udf/RuntimeUDFContext.java        |  19 +-
 .../runtime/operators/util/TaskConfig.java      |  29 +-
 .../test/iterative/DanglingPageRankITCase.java  |  28 +-
 .../flink/test/iterative/PageRankITCase.java    |  27 +-
 .../ConnectedComponentsNepheleITCase.java       |   1 -
 .../nephele/DanglingPageRankNepheleITCase.java  |   1 -
 ...nglingPageRankWithCombinerNepheleITCase.java |   1 -
 .../ComputeEdgeDegreesITCase.java               |   4 +-
 .../EnumTrianglesOnEdgesWithDegreesITCase.java  |   5 +-
 .../recordJobTests/EnumTrianglesRDFITCase.java  |   5 +-
 .../recordJobTests/MergeOnlyJoinITCase.java     |   5 +-
 .../test/recordJobTests/PairwiseSPITCase.java   |   4 +-
 .../test/recordJobTests/TPCHQuery10ITCase.java  |   3 +-
 .../test/recordJobTests/TPCHQuery3ITCase.java   |   3 +-
 37 files changed, 641 insertions(+), 687 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
index ef8323b..780001a 100644
--- a/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
+++ b/flink-addons/flink-jdbc/src/main/java/org/apache/flink/api/java/record/io/jdbc/JDBCOutputFormat.java
@@ -98,15 +98,21 @@ public class JDBCOutputFormat implements OutputFormat<Record> {
 		@SuppressWarnings("unchecked")
 		Class<Value>[] classes = new Class[this.fieldCount];
 		this.fieldClasses = classes;
+		
+		ClassLoader cl = getClass().getClassLoader();
 
-		for (int i = 0; i < this.fieldCount; i++) {
-			@SuppressWarnings("unchecked")
-			Class<? extends Value> clazz = (Class<? extends Value>) parameters.getClass(FIELD_TYPE_KEY + i, null);
-			if (clazz == null) {
-				throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: "
-						+ "No type class for parameter " + i);
+		try {
+			for (int i = 0; i < this.fieldCount; i++) {
+				Class<? extends Value> clazz = parameters.<Value>getClass(FIELD_TYPE_KEY + i, null, cl);
+				if (clazz == null) {
+					throw new IllegalArgumentException("Invalid configuration for JDBCOutputFormat: "
+							+ "No type class for parameter " + i);
+				}
+				this.fieldClasses[i] = clazz;
 			}
-			this.fieldClasses[i] = clazz;
+		}
+		catch (ClassNotFoundException e) {
+			throw new RuntimeException("Could not load data type classes.", e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
index 38294f6..780bc94 100644
--- a/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
+++ b/flink-addons/flink-spargel/src/main/java/org/apache/flink/spargel/java/record/SpargelIteration.java
@@ -189,16 +189,20 @@ public class SpargelIteration {
 		public void open(Configuration parameters) throws Exception {
 			// instantiate only the first time
 			if (vertexUpdateFunction == null) {
-				Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, Key.class);
-				Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, Value.class);
-				Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
+				ClassLoader cl = getRuntimeContext().getUserCodeClassLoader();
+				
+				Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, cl);
+				Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, cl);
+				Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, cl);
 				
 				vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
 				vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
 				messageIter = new MessageIterator<M>(InstantiationUtil.instantiate(messageClass, Value.class));
 				
+				ClassLoader ucl = getRuntimeContext().getUserCodeClassLoader();
+				
 				try {
-					this.vertexUpdateFunction = (VertexUpdateFunction<K, V, M>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, parameters.getClassLoader());
+					this.vertexUpdateFunction = (VertexUpdateFunction<K, V, M>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, ucl);
 				} catch (Exception e) {
 					String message = e.getMessage() == null ? "." : ": " + e.getMessage();
 					throw new Exception("Could not instantiate VertexUpdateFunction" + message, e);
@@ -248,10 +252,12 @@ public class SpargelIteration {
 		public void open(Configuration parameters) throws Exception {
 			// instantiate only the first time
 			if (messagingFunction == null) {
-				Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, Key.class);
-				Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, Value.class);
+				ClassLoader cl = getRuntimeContext().getUserCodeClassLoader();
+				
+				Class<K> vertexKeyClass = parameters.getClass(KEY_PARAM, null, cl);
+				Class<V> vertexValueClass = parameters.getClass(VALUE_PARAM, null, cl);
 //				Class<M> messageClass = parameters.getClass(MESSAGE_PARAM, null, Value.class);
-				Class<E> edgeClass = parameters.getClass(EDGE_PARAM, null, Value.class);
+				Class<E> edgeClass = parameters.getClass(EDGE_PARAM, null, cl);
 				
 				vertexKey = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
 				vertexValue = InstantiationUtil.instantiate(vertexValueClass, Value.class);
@@ -259,8 +265,10 @@ public class SpargelIteration {
 				K edgeKeyHolder = InstantiationUtil.instantiate(vertexKeyClass, Key.class);
 				E edgeValueHolder = InstantiationUtil.instantiate(edgeClass, Value.class);
 				
+				ClassLoader ucl = getRuntimeContext().getUserCodeClassLoader();
+				
 				try {
-					this.messagingFunction = (MessagingFunction<K, V, M, E>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, parameters.getClassLoader());
+					this.messagingFunction = (MessagingFunction<K, V, M, E>) InstantiationUtil.readObjectFromConfig(parameters, UDF_PARAM, ucl);
 				} catch (Exception e) {
 					String message = e.getMessage() == null ? "." : ": " + e.getMessage();
 					throw new Exception("Could not instantiate MessagingFunction" + message, e);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 42c1adf..f1913d6 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -309,9 +309,13 @@ public class StreamConfig {
 		config.setClass("functionClass", functionClass);
 	}
 
-	@SuppressWarnings("unchecked")
-	public Class<? extends AbstractRichFunction> getFunctionClass() {
-		return (Class<? extends AbstractRichFunction>) config.getClass("functionClass", null);
+	public Class<? extends AbstractRichFunction> getFunctionClass(ClassLoader cl) {
+		try {
+			return config.getClass("functionClass", null, cl);
+		}
+		catch (ClassNotFoundException e) {
+			throw new RuntimeException("Could not load function class", e);
+		}
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index a680f4d..9d0d526 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -29,6 +29,7 @@ import java.net.MalformedURLException;
 import java.util.Map;
 
 import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 
 public class CliFrontendTestUtils {
@@ -86,16 +87,19 @@ public class CliFrontendTestUtils {
 	
 	public static void clearGlobalConfiguration() {
 		try {
-			Field singletonInstanceField = GlobalConfiguration.class.getDeclaredField("configuration");
-			Field confDataMapField = GlobalConfiguration.class.getDeclaredField("confData");
+			Field singletonInstanceField = GlobalConfiguration.class.getDeclaredField("SINGLETON");
+			Field conf = GlobalConfiguration.class.getDeclaredField("config");
+			Field map = Configuration.class.getDeclaredField("confData");
 			
 			singletonInstanceField.setAccessible(true);
-			confDataMapField.setAccessible(true);
+			conf.setAccessible(true);
+			map.setAccessible(true);
 			
 			GlobalConfiguration gconf = (GlobalConfiguration) singletonInstanceField.get(null);
 			if (gconf != null) {
+				Configuration confObject = (Configuration) conf.get(gconf);
 				@SuppressWarnings("unchecked")
-				Map<String, String> confData = (Map<String, String>) confDataMapField.get(gconf);
+				Map<String, Object> confData = (Map<String, Object>) map.get(confObject);
 				confData.clear();
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
index 7445759..0ff0aee 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/PactCompiler.java
@@ -94,7 +94,6 @@ import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
 import org.apache.flink.compiler.plan.WorksetPlanNode;
 import org.apache.flink.compiler.postpass.OptimizerPostPass;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -410,11 +409,10 @@ public class PactCompiler {
 		this.statistics = stats;
 		this.costEstimator = estimator;
 
-		Configuration config = GlobalConfiguration.getConfiguration();
-
 		// determine the default parallelization degree
-		this.defaultDegreeOfParallelism = config.getInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
+		this.defaultDegreeOfParallelism = GlobalConfiguration.getInteger(ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
 			ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE);
+		
 		if (defaultDegreeOfParallelism < 1) {
 			LOG.warn("Config value " + defaultDegreeOfParallelism + " for option "
 					+ ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE + " is invalid. Ignoring and using a value of 1.");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
index 6742581..3de4e03 100644
--- a/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
+++ b/flink-compiler/src/main/java/org/apache/flink/compiler/dag/DataSourceNode.java
@@ -109,7 +109,6 @@ public class DataSourceNode extends OptimizerNode {
 			try {
 				format = getPactContract().getFormatWrapper().getUserCodeObject();
 				Configuration config = getPactContract().getParameters();
-				config.setClassLoader(format.getClass().getClassLoader());
 				format.configure(config);
 			}
 			catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 63457fa..5f6aaa5 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -60,6 +60,14 @@ public interface RuntimeContext {
 	 */
 	int getIndexOfThisSubtask();
 	
+	/**
+	 * Gets the ClassLoader to load classes that were are not in system's classpath, but are part of the
+	 * jar file of a user job.
+	 * 
+	 * @return The ClassLoader for user code classes.
+	 */
+	ClassLoader getUserCodeClassLoader();
+	
 	// --------------------------------------------------------------------------------------------
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
index a6f2cca..e798609 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/Configuration.java
@@ -16,140 +16,86 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.configuration;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
-
-import com.google.common.io.BaseEncoding;
+import org.apache.flink.types.StringValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * Lightweight configuration object which can store key/value pairs. Configuration objects
- * can be extracted from or integrated into the {@link GlobalConfiguration} object. They can
- * be transported via Nephele's IPC system to distribute configuration data at runtime.
- * This class is thread-safe.
- * 
+ * Lightweight configuration object which can store key/value pairs.
  */
 public class Configuration implements IOReadableWritable, java.io.Serializable {
 
 	private static final long serialVersionUID = 1L;
-
-	/**
-	 * Stores the concrete key/value pairs of this configuration object.
-	 */
-	private final Map<String, String> confData = new HashMap<String, String>();
-
-	/**
-	 * The class loader to be used for the <code>getClass</code> method.
-	 */
-	private transient ClassLoader classLoader;
-
-	/**
-	 * Constructs a new configuration object.
-	 */
-	public Configuration() {
-		this.classLoader = this.getClass().getClassLoader();
-	}
-
-	/**
-	 * Constructs a new configuration object.
-	 * 
-	 * @param classLoader
-	 *        the class loader to be use for the <code>getClass</code> method
-	 */
-	public Configuration(final ClassLoader classLoader) {
-		this.classLoader = classLoader;
-	}
 	
+	private static final byte TYPE_STRING = 0;
+	private static final byte TYPE_INT = 1;
+	private static final byte TYPE_LONG = 2;
+	private static final byte TYPE_BOOLEAN = 3;
+	private static final byte TYPE_FLOAT = 4;
+	private static final byte TYPE_DOUBLE = 5;
+	private static final byte TYPE_BYTES = 6;
+	
+	/** The log object used for debugging. */
+	private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);
 	
-	/**
-	 * @return the class loader that knows where to locate user classes
-	 */
-	public ClassLoader getClassLoader() {
-		return this.classLoader;
-	}
-
-	/**
-	 * Sets the class loader that knows where to locate user classes
-	 * 
-	 * @param classLoader
-	 *        the class loader to be use for the <code>getClass</code> method
-	 */
-	public void setClassLoader(ClassLoader classLoader) {
-		this.classLoader = classLoader;
-	}
 
+	/** Stores the concrete key/value pairs of this configuration object. */
+	private final Map<String, Object> confData = new HashMap<String, Object>();
+	
 	// --------------------------------------------------------------------------------------------
-
+	
+	public Configuration() {}
+	
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Returns the class associated with the given key as a string.
 	 * 
-	 * @param <T>
-	 *        the ancestor of both the default value and the potential value
-	 * @param key
-	 *        the key pointing to the associated value
-	 * @param defaultValue
-	 *        the optional default value returned if no entry exists
-	 * @param ancestor
-	 *        the ancestor of both the default value and the potential value
-	 * @return the (default) value associated with the given key
-	 * @throws IllegalStateException
-	 *         if the class identified by the associated value cannot be resolved
-	 * @see #setClass(String, Class)
+	 * @param <T> The type of the class to return.
+
+	 * @param key The key pointing to the associated value
+	 * @param defaultValue The optional default value returned if no entry exists
+	 * @param classLoader The class loader used to resolve the class.
+	 * 
+	 * @return The value associated with the given key, or the default value, if to entry for the key exists.
 	 */
 	@SuppressWarnings("unchecked")
-	public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, Class<? super T> ancestor) {
-		String className = getStringInternal(key);
-		if (className == null) {
+	public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, ClassLoader classLoader) throws ClassNotFoundException {
+		Object o = getRawValue(key);
+		if (o == null) {
 			return (Class<T>) defaultValue;
 		}
-
-		try {
-			return (Class<T>) Class.forName(className, true, this.classLoader);
-		} catch (ClassNotFoundException e) {
-			throw new IllegalStateException(e);
+		
+		if (o.getClass() == String.class) {
+			return (Class<T>) Class.forName((String) o, true, classLoader);
 		}
-	}
-
-	/**
-	 * Returns the class associated with the given key as a string.
-	 * 
-	 * @param key
-	 *        the key pointing to the associated value
-	 * @param defaultValue
-	 *        the default value which is returned in case there is no value associated with the given key
-	 * @return the (default) value associated with the given key
-	 * @throws IllegalStateException
-	 *         if the class identified by the associated value cannot be resolved
-	 * @see #setClass(String, Class)
-	 */
-	public Class<?> getClass(String key, Class<?> defaultValue) {
-		return getClass(key, defaultValue, Object.class);
+		
+		LOG.warn("Configuration cannot evaluate value " + o + " as a class name");
+		return (Class<T>) defaultValue;
 	}
 
 	/**
 	 * Adds the given key/value pair to the configuration object. The class can be retrieved by invoking
-	 * {@link #getClass(String, Class, Class)} if it is in the scope of the class loader on the caller.
+	 * {@link #getClass(String, Class, ClassLoader)} if it is in the scope of the class loader on the caller.
 	 * 
-	 * @param key
-	 *        the key of the pair to be added
-	 * @param klazz
-	 *        the value of the pair to be added
-	 * @see #getClass(String, Class)
-	 * @see #getClass(String, Class, Class)
+	 * @param key The key of the pair to be added
+	 * @param klazz The value of the pair to be added
+	 * @see #getClass(String, Class, ClassLoader)
 	 */
 	public void setClass(String key, Class<?> klazz) {
-		setStringInternal(key, klazz.getName());
+		setValueInternal(key, klazz.getName());
 	}
 
 	/**
@@ -162,8 +108,12 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 * @return the (default) value associated with the given key
 	 */
 	public String getString(String key, String defaultValue) {
-		String val = getStringInternal(key);
-		return val == null ? defaultValue : val;
+		Object o = getRawValue(key);
+		if (o == null) {
+			return defaultValue;
+		} else {
+			return o.toString();
+		}
 	}
 	
 	/**
@@ -175,7 +125,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 *        the value of the key/value pair to be added
 	 */
 	public void setString(String key, String value) {
-		setStringInternal(key, value);
+		setValueInternal(key, value);
 	}
 
 	/**
@@ -188,11 +138,31 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 * @return the (default) value associated with the given key
 	 */
 	public int getInteger(String key, int defaultValue) {
-		String val = getStringInternal(key);
-		if (val == null) {
+		Object o = getRawValue(key);
+		if (o == null) {
 			return defaultValue;
-		} else {
-			return Integer.parseInt(val);
+		}
+		
+		if (o.getClass() == Integer.class) {
+			return (Integer) o;
+		}
+		else if (o.getClass() == Long.class) {
+			long value = ((Long) o).longValue();
+			if (value <= Integer.MAX_VALUE && value >= Integer.MIN_VALUE) {
+				return (int) value;
+			} else {
+				LOG.warn("Configuation value " + value + " overflows/underflows the integer type.");
+				return defaultValue;
+			}
+		}
+		else {
+			try {
+				return Integer.parseInt(o.toString());
+			}
+			catch (NumberFormatException e) {
+				LOG.warn("Configuration cannot evaluate value " + o + " as an integer number");
+				return defaultValue;
+			}
 		}
 	}
 
@@ -205,7 +175,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 *        the value of the key/value pair to be added
 	 */
 	public void setInteger(String key, int value) {
-		setStringInternal(key, Integer.toString(value));
+		setValueInternal(key, value);
 	}
 
 	/**
@@ -218,11 +188,25 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 * @return the (default) value associated with the given key
 	 */
 	public long getLong(String key, long defaultValue) {
-		String val = getStringInternal(key);
-		if (val == null) {
+		Object o = getRawValue(key);
+		if (o == null) {
 			return defaultValue;
-		} else {
-			return Long.parseLong(val);
+		}
+		
+		if (o.getClass() == Long.class) {
+			return (Long) o;
+		}
+		else if (o.getClass() == Integer.class) {
+			return ((Integer) o).longValue();
+		}
+		else {
+			try {
+				return Long.parseLong(o.toString());
+			}
+			catch (NumberFormatException e) {
+				LOG.warn("Configuration cannot evaluate value " + o + " as a long integer number");
+				return defaultValue;
+			}
 		}
 	}
 
@@ -235,7 +219,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 *        the value of the key/value pair to be added
 	 */
 	public void setLong(String key, long value) {
-		setStringInternal(key, Long.toString(value));
+		setValueInternal(key, value);
 	}
 
 	/**
@@ -248,11 +232,16 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 * @return the (default) value associated with the given key
 	 */
 	public boolean getBoolean(String key, boolean defaultValue) {
-		String val = getStringInternal(key);
-		if (val == null) {
+		Object o = getRawValue(key);
+		if (o == null) {
 			return defaultValue;
-		} else {
-			return Boolean.parseBoolean(val);
+		}
+		
+		if (o.getClass() == Boolean.class) {
+			return (Boolean) o;
+		}
+		else {
+			return Boolean.parseBoolean(o.toString());
 		}
 	}
 
@@ -264,8 +253,8 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 * @param value
 	 *        the value of the key/value pair to be added
 	 */
-	public void setBoolean(final String key, final boolean value) {
-		setStringInternal(key, Boolean.toString(value));
+	public void setBoolean(String key, boolean value) {
+		setValueInternal(key, value);
 	}
 
 	/**
@@ -277,12 +266,32 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 *        the default value which is returned in case there is no value associated with the given key
 	 * @return the (default) value associated with the given key
 	 */
-	public float getFloat(final String key, final float defaultValue) {
-		String val = getStringInternal(key);
-		if (val == null) {
+	public float getFloat(String key, float defaultValue) {
+		Object o = getRawValue(key);
+		if (o == null) {
 			return defaultValue;
-		} else {
-			return Float.parseFloat(val);
+		}
+		
+		if (o.getClass() == Float.class) {
+			return (Float) o;
+		}
+		else if (o.getClass() == Double.class) {
+			double value = ((Double) o);
+			if (value <= Float.MAX_VALUE && value >= Float.MIN_VALUE) {
+				return (float) value;
+			} else {
+				LOG.warn("Configuation value " + value + " overflows/underflows the float type.");
+				return defaultValue;
+			}
+		}
+		else {
+			try {
+				return Float.parseFloat(o.toString());
+			}
+			catch (NumberFormatException e) {
+				LOG.warn("Configuration cannot evaluate value " + o + " as a float value");
+				return defaultValue;
+			}
 		}
 	}
 
@@ -295,7 +304,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 *        the value of the key/value pair to be added
 	 */
 	public void setFloat(String key, float value) {
-		setStringInternal(key, Float.toString(value));
+		setValueInternal(key, value);
 	}
 	
 	/**
@@ -308,11 +317,25 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 * @return the (default) value associated with the given key
 	 */
 	public double getDouble(String key, double defaultValue) {
-		String val = getStringInternal(key);
-		if (val == null) {
+		Object o = getRawValue(key);
+		if (o == null) {
 			return defaultValue;
-		} else {
-			return Double.parseDouble(val);
+		}
+		
+		if (o.getClass() == Double.class) {
+			return (Double) o;
+		}
+		else if (o.getClass() == Float.class) {
+			return ((Float) o).doubleValue();
+		}
+		else {
+			try {
+				return Double.parseDouble(o.toString());
+			}
+			catch (NumberFormatException e) {
+				LOG.warn("Configuration cannot evaluate value " + o + " as a double value");
+				return defaultValue;
+			}
 		}
 	}
 
@@ -325,7 +348,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 *        the value of the key/value pair to be added
 	 */
 	public void setDouble(String key, double value) {
-		setStringInternal(key, Double.toString(value));
+		setValueInternal(key, value);
 	}
 	
 	/**
@@ -338,11 +361,17 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 * @return the (default) value associated with the given key.
 	 */
 	public byte[] getBytes(String key, byte[] defaultValue) {
-		final String encoded = getStringInternal(key);
-		if (encoded == null) {
+		
+		Object o = getRawValue(key);
+		if (o == null) {
+			return defaultValue;
+		}
+		else if (o.getClass() == byte[].class) {
+			return (byte[]) o;
+		}
+		else {
+			LOG.warn("Configuration cannot evaluate value " + o + " as a byte[] value");
 			return defaultValue;
-		} else {
-			return BaseEncoding.base64().decode(encoded);
 		}
 	}
 	
@@ -355,10 +384,11 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 *        The bytes to be added.
 	 */
 	public void setBytes(String key, byte[] bytes) {
-		final String encoded = BaseEncoding.base64().encode(bytes);
-		setStringInternal(key, encoded);
+		setValueInternal(key, bytes);
 	}
 
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Returns the keys of all key/value pairs stored inside this
 	 * configuration object.
@@ -366,19 +396,9 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	 * @return the keys of all key/value pairs stored inside this configuration object
 	 */
 	public Set<String> keySet() {
-
-		// Copy key set, so return value is independent from the object's internal data structure
-		final Set<String> retVal = new HashSet<String>();
-
 		synchronized (this.confData) {
-
-			final Iterator<String> it = this.confData.keySet().iterator();
-			while (it.hasNext()) {
-				retVal.add(it.next());
-			}
+			return new HashSet<String>(this.confData.keySet());
 		}
-
-		return retVal;
 	}
 
 	public void addAll(Configuration other) {
@@ -405,7 +425,7 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 
 		synchronized (this.confData) {
 			synchronized (other.confData) {
-				for (Map.Entry<String, String> entry : other.confData.entrySet()) {
+				for (Map.Entry<String, Object> entry : other.confData.entrySet()) {
 					bld.setLength(pl);
 					bld.append(entry.getKey());
 					this.confData.put(bld.toString(), entry.getValue());
@@ -428,93 +448,164 @@ public class Configuration implements IOReadableWritable, java.io.Serializable {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	private String getStringInternal(String key) {
+	private <T> void setValueInternal(String key, T value) {
 		if (key == null) {
 			throw new NullPointerException("Key must not be null.");
 		}
+		if (value == null) {
+			throw new NullPointerException("Value must not be null.");
+		}
 		
 		synchronized (this.confData) {
-			return this.confData.get(key);
+			this.confData.put(key, value);
 		}
 	}
 	
-	private void setStringInternal(String key, String value) {
+	private Object getRawValue(String key) {
 		if (key == null) {
 			throw new NullPointerException("Key must not be null.");
 		}
-		if (value == null) {
-			throw new NullPointerException("Value must not be null.");
-		}
-			
 		
 		synchronized (this.confData) {
-			this.confData.put(key, value);
+			return this.confData.get(key);
 		}
 	}
 	
 	// --------------------------------------------------------------------------------------------
 
 	@Override
-	public void read(final DataInputView in) throws IOException {
-
+	public void read(DataInputView in) throws IOException {
 		synchronized (this.confData) {
-
 			final int numberOfProperties = in.readInt();
 
 			for (int i = 0; i < numberOfProperties; i++) {
-				final String key = StringRecord.readString(in);
-				final String value = StringRecord.readString(in);
+				String key = StringValue.readString(in);
+				Object value;
+				
+				byte type = in.readByte();
+				switch (type) {
+					case TYPE_STRING:
+						value = StringValue.readString(in);
+						break;
+					case TYPE_INT:
+						value = in.readInt();
+						break;
+					case TYPE_LONG:
+						value = in.readLong();
+						break;
+					case TYPE_FLOAT:
+						value = in.readFloat();
+						break;
+					case TYPE_DOUBLE:
+						value = in.readDouble();
+						break;
+					case TYPE_BOOLEAN:
+						value = in.readBoolean();
+						break;
+					case TYPE_BYTES:
+						byte[] bytes = new byte[in.readInt()];
+						in.readFully(bytes);
+						value = bytes;
+						break;
+					default:
+						throw new IOException("Unrecognized type: " + type);
+				}
+				
 				this.confData.put(key, value);
 			}
 		}
 	}
 
-
 	@Override
 	public void write(final DataOutputView out) throws IOException {
-
 		synchronized (this.confData) {
-
 			out.writeInt(this.confData.size());
-
-			final Iterator<String> it = this.confData.keySet().iterator();
-			while (it.hasNext()) {
-				final String key = it.next();
-				final String value = this.confData.get(key);
-				StringRecord.writeString(out, key);
-				StringRecord.writeString(out, value);
+			
+			for (Map.Entry<String, Object> entry : this.confData.entrySet()) {
+				String key = entry.getKey();
+				Object val = entry.getValue();
+						
+				StringValue.writeString(key, out);
+				Class<?> clazz = val.getClass();
+				
+				if (clazz == String.class) {
+					out.write(TYPE_STRING);
+					StringValue.writeString((String) val, out);
+				}
+				else if (clazz == Integer.class) {
+					out.write(TYPE_INT);
+					out.writeInt((Integer) val);
+				}
+				else if (clazz == Long.class) {
+					out.write(TYPE_LONG);
+					out.writeLong((Long) val);
+				}
+				else if (clazz == Float.class) {
+					out.write(TYPE_FLOAT);
+					out.writeFloat((Float) val);
+				}
+				else if (clazz == Double.class) {
+					out.write(TYPE_DOUBLE);
+					out.writeDouble((Double) val);
+				}
+				else if (clazz == byte[].class) {
+					out.write(TYPE_BYTES);
+					byte[] bytes = (byte[]) val;
+					out.writeInt(bytes.length);
+					out.write(bytes);
+				}
+				else if (clazz == Boolean.class) {
+					out.write(TYPE_BOOLEAN);
+					out.writeBoolean((Boolean) val);
+				}
+				else {
+					throw new IllegalArgumentException("Unrecognized type");
+				}
 			}
 		}
 	}
 	
-	private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException {
-		s.defaultReadObject();
-		this.classLoader = getClass().getClassLoader();
-	}
-	
 	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = prime * result + confData.hashCode();
-		return result;
+		int hash = 0;
+		for (String s : this.confData.keySet()) {
+			hash ^= s.hashCode();
+		}
+		return hash;
 	}
 
 	@Override
-	public boolean equals(final Object obj) {
+	public boolean equals(Object obj) {
 		if (this == obj) {
 			return true;
 		}
-		if (obj == null) {
-			return false;
+		else if (obj instanceof Configuration) {
+			Map<String, Object> otherConf = ((Configuration) obj).confData;
+			
+			for (Map.Entry<String, Object> e : this.confData.entrySet()) {
+				Object thisVal = e.getValue();
+				Object otherVal = otherConf.get(e.getKey());
+				
+				if (thisVal.getClass() != byte[].class) {
+					if (!thisVal.equals(otherVal)) {
+						return false;
+					}
+				} else if (otherVal.getClass() == byte[].class) {
+					if (!Arrays.equals((byte[]) thisVal, (byte[]) otherVal)) {
+						return false;
+					}
+				} else {
+					return false;
+				}
+			}
+			
+			return true;
 		}
-		if (getClass() != obj.getClass()) {
+		else {
 			return false;
 		}
-		final Configuration other = (Configuration) obj;
-		return confData.equals(other.confData);
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 23846ca..e57aa6c 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.configuration;
 
 import java.io.BufferedReader;
@@ -25,119 +24,68 @@ import java.io.FileInputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.flink.util.StringUtils;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 import org.w3c.dom.Text;
-import org.xml.sax.SAXException;
 
 /**
- * Global configuration object in Nephele. Similar to Java properties configuration
+ * Global configuration object for Flink. Similar to Java properties configuration
  * objects it includes key-value pairs which represent the framework's configuration.
- * <p>
- * This class is thread-safe.
  */
 public final class GlobalConfiguration {
 
-	/**
-	 * The log object used for debugging.
-	 */
+	/** The log object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(GlobalConfiguration.class);
 
-	/**
-	 * The global configuration object accessible through a singleton pattern.
-	 */
-	private static GlobalConfiguration configuration = null;
-
-	/**
-	 * The key to the directory this configuration was read from.
-	 */
-	private static final String CONFIGDIRKEY = "config.dir";
+	/** The global configuration object accessible through a singleton pattern. */
+	private static GlobalConfiguration SINGLETON = null;
 
-	/**
-	 * The internal map holding the key-value pairs the configuration consists of.
-	 */
-	private final Map<String, String> confData = new HashMap<String, String>();
+	/** The internal map holding the key-value pairs the configuration consists of. */
+	private final Configuration config = new Configuration();
 
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Retrieves the singleton object of the global configuration.
 	 * 
 	 * @return the global configuration object
 	 */
-	private static synchronized GlobalConfiguration get() {
-
-		if (configuration == null) {
-			configuration = new GlobalConfiguration();
+	private static GlobalConfiguration get() {
+		// lazy initialization currently only for testibility
+		synchronized (GlobalConfiguration.class) {
+			if (SINGLETON == null) {
+				SINGLETON = new GlobalConfiguration();
+			}
+			return SINGLETON;
 		}
-
-		return configuration;
 	}
 
 	/**
 	 * The constructor used to construct the singleton instance of the global configuration.
 	 */
-	private GlobalConfiguration() {
-	}
-
-	/**
-	 * Returns the value associated with the given key as a string.
-	 * 
-	 * @param key
-	 *        the key pointing to the associated value
-	 * @param defaultValue
-	 *        the default value which is returned in case there is no value associated with the given key
-	 * @return the (default) value associated with the given key
-	 */
-	public static String getString(final String key, final String defaultValue) {
-
-		return get().getStringInternal(key, defaultValue);
-	}
+	private GlobalConfiguration() {}
 
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Returns the value associated with the given key as a string.
 	 * 
 	 * @param key
-	 *        key the key pointing to the associated value
-	 * @param defaultValue
-	 *        defaultValue the default value which is returned in case there is no value associated with the given key
-	 * @return the (default) value associated with the given key
-	 */
-	private String getStringInternal(final String key, final String defaultValue) {
-
-		synchronized (this.confData) {
-
-			if (!this.confData.containsKey(key)) {
-				return defaultValue;
-			}
-
-			return this.confData.get(key);
-		}
-	}
-
-	/**
-	 * Returns the value associated with the given key as a long integer.
-	 * 
-	 * @param key
 	 *        the key pointing to the associated value
 	 * @param defaultValue
 	 *        the default value which is returned in case there is no value associated with the given key
 	 * @return the (default) value associated with the given key
 	 */
-	public static long getLong(final String key, final long defaultValue) {
-
-		return get().getLongInternal(key, defaultValue);
+	public static String getString(String key, String defaultValue) {
+		return get().config.getString(key, defaultValue);
 	}
 
 	/**
@@ -149,39 +97,8 @@ public final class GlobalConfiguration {
 	 *        the default value which is returned in case there is no value associated with the given key
 	 * @return the (default) value associated with the given key
 	 */
-	private long getLongInternal(final String key, final long defaultValue) {
-
-		long retVal = defaultValue;
-
-		try {
-			synchronized (this.confData) {
-
-				if (this.confData.containsKey(key)) {
-					retVal = Long.parseLong(this.confData.get(key));
-				}
-			}
-		} catch (NumberFormatException e) {
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug(StringUtils.stringifyException(e));
-			}
-		}
-
-		return retVal;
-	}
-
-	/**
-	 * Returns the value associated with the given key as an integer.
-	 * 
-	 * @param key
-	 *        the key pointing to the associated value
-	 * @param defaultValue
-	 *        the default value which is returned in case there is no value associated with the given key
-	 * @return the (default) value associated with the given key
-	 */
-	public static int getInteger(final String key, final int defaultValue) {
-
-		return get().getIntegerInternal(key, defaultValue);
+	public static long getLong(String key, long defaultValue) {
+		return get().config.getLong(key, defaultValue);
 	}
 
 	/**
@@ -193,25 +110,8 @@ public final class GlobalConfiguration {
 	 *        the default value which is returned in case there is no value associated with the given key
 	 * @return the (default) value associated with the given key
 	 */
-	private int getIntegerInternal(final String key, final int defaultValue) {
-
-		int retVal = defaultValue;
-
-		try {
-			synchronized (this.confData) {
-
-				if (this.confData.containsKey(key)) {
-					retVal = Integer.parseInt(this.confData.get(key));
-				}
-			}
-		} catch (NumberFormatException e) {
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug(StringUtils.stringifyException(e));
-			}
-		}
-
-		return retVal;
+	public static int getInteger(String key, int defaultValue) {
+		return get().config.getInteger(key, defaultValue);
 	}
 	
 	/**
@@ -224,38 +124,7 @@ public final class GlobalConfiguration {
 	 * @return the (default) value associated with the given key
 	 */
 	public static float getFloat(String key, float defaultValue) {
-
-		return get().getFloatInternal(key, defaultValue);
-	}
-
-	/**
-	 * Returns the value associated with the given key as an integer.
-	 * 
-	 * @param key
-	 *        the key pointing to the associated value
-	 * @param defaultValue
-	 *        the default value which is returned in case there is no value associated with the given key
-	 * @return the (default) value associated with the given key
-	 */
-	private float getFloatInternal(String key, float defaultValue) {
-
-		float retVal = defaultValue;
-
-		try {
-			synchronized (this.confData) {
-
-				if (this.confData.containsKey(key)) {
-					retVal = Float.parseFloat(this.confData.get(key));
-				}
-			}
-		} catch (NumberFormatException e) {
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug(StringUtils.stringifyException(e));
-			}
-		}
-
-		return retVal;
+		return get().config.getFloat(key, defaultValue);
 	}
 
 	/**
@@ -267,33 +136,8 @@ public final class GlobalConfiguration {
 	 *        the default value which is returned in case there is no value associated with the given key
 	 * @return the (default) value associated with the given key
 	 */
-	public static boolean getBoolean(final String key, final boolean defaultValue) {
-
-		return get().getBooleanInternal(key, defaultValue);
-	}
-
-	/**
-	 * Returns the value associated with the given key as a boolean.
-	 * 
-	 * @param key
-	 *        the key pointing to the associated value
-	 * @param defaultValue
-	 *        the default value which is returned in case there is no value associated with the given key
-	 * @return the (default) value associated with the given key
-	 */
-	private boolean getBooleanInternal(final String key, final boolean defaultValue) {
-
-		boolean retVal = defaultValue;
-
-		synchronized (this.confData) {
-
-			final String value = this.confData.get(key);
-			if (value != null) {
-				retVal = Boolean.parseBoolean(value);
-			}
-		}
-
-		return retVal;
+	public static boolean getBoolean(String key, boolean defaultValue) {
+		return get().config.getBoolean(key, defaultValue);
 	}
 
 	/**
@@ -319,7 +163,7 @@ public final class GlobalConfiguration {
 			return;
 		}
 		
-		if(confDirFile.isFile()) {
+		if (confDirFile.isFile()) {
 			final File file = new File(configDir);
 			if(configDir.endsWith(".xml")) {
 				get().loadXMLResource( file );
@@ -329,7 +173,6 @@ public final class GlobalConfiguration {
 				LOG.warn("The given configuration has an unknown extension.");
 				return;
 			}
-			configuration.confData.put(CONFIGDIRKEY, file.getAbsolutePath() );
 			return;
 		}
 
@@ -352,11 +195,6 @@ public final class GlobalConfiguration {
 		for (File f : yamlFiles) {
 			get().loadYAMLResource(f);
 		}
-
-		// Store the path to the configuration directory itself
-		if (configuration != null) {
-			configuration.confData.put(CONFIGDIRKEY, configDir);
-		}
 	}
 
 	/**
@@ -379,52 +217,57 @@ public final class GlobalConfiguration {
 	 * @param file the YAML file to read from
 	 * @see <a href="http://www.yaml.org/spec/1.2/spec.html">YAML 1.2 specification</a>
 	 */
-	private void loadYAMLResource(final File file) {
-
-		BufferedReader reader = null;
-		try {
-			reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
+	private void loadYAMLResource(File file) {
 
-			String line = null;
-			while ((line = reader.readLine()) != null) {
-
-				// 1. check for comments
-				String[] comments = line.split("#", 2);
-				String conf = comments[0];
-
-				// 2. get key and value
-				if (conf.length() > 0) {
-					String[] kv = conf.split(": ", 2);
-
-					// skip line with no valid key-value pair
-					if (kv.length == 1) {
-						LOG.warn("Error while trying to split key and value in configuration file " + file + ": " + line);
-						continue;
-					}
+		synchronized (getClass()) {
 
-					String key = kv[0].trim();
-					String value = kv[1].trim();
-					
-					// sanity check
-					if (key.length() == 0 || value.length() == 0) {
-						LOG.warn("Error after splitting key and value in configuration file " + file + ": " + line);
-						continue;
+			BufferedReader reader = null;
+			try {
+				reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
+	
+				String line = null;
+				while ((line = reader.readLine()) != null) {
+	
+					// 1. check for comments
+					String[] comments = line.split("#", 2);
+					String conf = comments[0];
+	
+					// 2. get key and value
+					if (conf.length() > 0) {
+						String[] kv = conf.split(": ", 2);
+	
+						// skip line with no valid key-value pair
+						if (kv.length == 1) {
+							LOG.warn("Error while trying to split key and value in configuration file " + file + ": " + line);
+							continue;
+						}
+	
+						String key = kv[0].trim();
+						String value = kv[1].trim();
+						
+						// sanity check
+						if (key.length() == 0 || value.length() == 0) {
+							LOG.warn("Error after splitting key and value in configuration file " + file + ": " + line);
+							continue;
+						}
+	
+						LOG.debug("Loading configuration property: {}, {}", key, value);
+	
+						this.config.setString(key, value);
 					}
-
-					LOG.debug("Loading configuration property: {}, {}", key, value);
-
-					this.confData.put(key, value);
 				}
 			}
-		} catch (IOException e) {
-			e.printStackTrace();
-		} finally {
-			try {
-				if(reader != null) {
-					reader.close();
+			catch (IOException e) {
+				LOG.error("Error parsing YAML configuration.", e);
+			}
+			finally {
+				try {
+					if(reader != null) {
+						reader.close();
+					}
+				} catch (IOException e) {
+					LOG.warn("Cannot to close reader with IOException.", e);
 				}
-			} catch (IOException e) {
-				LOG.warn("Cannot to close reader with IOException.", e);
 			}
 		}
 	}
@@ -435,7 +278,7 @@ public final class GlobalConfiguration {
 	 * @param file
 	 *        the XML document file
 	 */
-	private void loadXMLResource(final File file) {
+	private void loadXMLResource(File file) {
 
 		final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
 		// Ignore comments in the XML file
@@ -468,7 +311,7 @@ public final class GlobalConfiguration {
 			final NodeList props = root.getChildNodes();
 			int propNumber = -1;
 
-			synchronized (this.confData) {
+			synchronized (getClass()) {
 
 				for (int i = 0; i < props.getLength(); i++) {
 
@@ -528,85 +371,28 @@ public final class GlobalConfiguration {
 					if (key != null && value != null) {
 						// Put key, value pair into the map
 						LOG.debug("Loading configuration property: {}, {}", key, value);
-						this.confData.put(key, value);
+						this.config.setString(key, value);
 					} else {
 						LOG.warn("Error while reading configuration: Cannot read property " + propNumber);
 					}
 				}
 			}
 
-		} catch (ParserConfigurationException e) {
-			LOG.warn("Cannot load configuration: " + StringUtils.stringifyException(e));
-		} catch (IOException e) {
-			LOG.warn("Cannot load configuration: " + StringUtils.stringifyException(e));
-		} catch (SAXException e) {
-			LOG.warn("Cannot load configuration: " + StringUtils.stringifyException(e));
+		}
+		catch (Exception e) {
+			LOG.error("Cannot load configuration.", e);
 		}
 	}
 
 	/**
-	 * Copies the key/value pairs stored in the global configuration to
-	 * a {@link Configuration} object and returns it.
+	 * Gets a {@link Configuration} object with the values of this GlobalConfiguration
 	 * 
 	 * @return the {@link Configuration} object including the key/value pairs
 	 */
 	public static Configuration getConfiguration() {
-
-		return get().getConfigurationInternal(null);
-	}
-
-	/**
-	 * Copies a subset of the key/value pairs stored in the global configuration to
-	 * a {@link Configuration} object and returns it. The subset is defined by the
-	 * given array of keys. If <code>keys</code> is <code>null</code>, the entire
-	 * global configuration is copied.
-	 * 
-	 * @param keys
-	 *        array of keys specifying the subset of pairs to copy.
-	 * @return the {@link Configuration} object including the key/value pairs
-	 */
-	public static Configuration getConfiguration(final String[] keys) {
-
-		return get().getConfigurationInternal(keys);
-	}
-
-	/**
-	 * Internal non-static method to return configuration.
-	 * 
-	 * @param keys
-	 *        array of keys specifying the subset of pairs to copy.
-	 * @return the {@link Configuration} object including the key/value pairs
-	 */
-	private Configuration getConfigurationInternal(final String[] keys) {
-
-		Configuration conf = new Configuration();
-
-		synchronized (this.confData) {
-
-			final Iterator<String> it = this.confData.keySet().iterator();
-
-			while (it.hasNext()) {
-
-				final String key = it.next();
-				boolean found = false;
-				if (keys != null) {
-					for (int i = 0; i < keys.length; i++) {
-						if (key.equals(keys[i])) {
-							found = true;
-							break;
-						}
-					}
-
-					if (found) {
-						conf.setString(key, this.confData.get(key));
-					}
-				} else {
-					conf.setString(key, this.confData.get(key));
-				}
-			}
-		}
-
-		return conf;
+		Configuration copy = new Configuration();
+		copy.addAll(get().config);
+		return copy;
 	}
 
 	/**
@@ -618,8 +404,7 @@ public final class GlobalConfiguration {
 	 * @param conf
 	 *        the {@link Configuration} object to merge into the global configuration
 	 */
-	public static void includeConfiguration(final Configuration conf) {
-
+	public static void includeConfiguration(Configuration conf) {
 		get().includeConfigurationInternal(conf);
 	}
 
@@ -629,25 +414,15 @@ public final class GlobalConfiguration {
 	 * @param conf
 	 *        the {@link Configuration} object to merge into the global configuration
 	 */
-	private void includeConfigurationInternal(final Configuration conf) {
-
-		if (conf == null) {
-			LOG.error("Given configuration object is null, ignoring it...");
-			return;
-		}
-
-		synchronized (this.confData) {
-
-			final Iterator<String> it = conf.keySet().iterator();
-
-			while (it.hasNext()) {
-
-				final String key = it.next();
-				this.confData.put(key, conf.getString(key, ""));
-			}
+	private void includeConfigurationInternal(Configuration conf) {
+		// static synchronized
+		synchronized (getClass()) {
+			this.config.addAll(conf);
 		}
 	}
 	
+	// --------------------------------------------------------------------------------------------
+	
 	/**
 	 * Filters files in directory which have the specified suffix (e.g. ".xml").
 	 * 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
index c30e488..e131892 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java
@@ -16,15 +16,13 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.configuration;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
-import java.io.IOException;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
@@ -34,31 +32,146 @@ import org.junit.Test;
  * objects is tested.
  */
 public class ConfigurationTest {
+	
+	private static final byte[] EMPTY_BYTES = new byte[0];
+	private static final long TOO_LONG = Integer.MAX_VALUE + 10L;
+	private static final double TOO_LONG_DOUBLE = Double.MAX_VALUE;
+	
 
 	/**
 	 * This test checks the serialization/deserialization of configuration objects.
 	 */
 	@Test
-	public void testConfigurationSerialization() {
-
-		// First, create initial configuration object with some parameters
-		final Configuration orig = new Configuration();
-		orig.setString("mykey", "myvalue");
-		orig.setBoolean("shouldbetrue", true);
-		orig.setInteger("mynumber", 100);
-		orig.setClass("myclass", this.getClass());
-
+	public void testConfigurationSerializationAndGetters() {
 		try {
+			final Configuration orig = new Configuration();
+			orig.setString("mykey", "myvalue");
+			orig.setInteger("mynumber", 100);
+			orig.setLong("longvalue", 478236947162389746L);
+			orig.setFloat("PI", 3.1415926f);
+			orig.setDouble("E", Math.E);
+			orig.setBoolean("shouldbetrue", true);
+			orig.setBytes("bytes sequence", new byte[] { 1, 2, 3, 4, 5 } );
+			orig.setClass("myclass", this.getClass());
+	
 			final Configuration copy = (Configuration) CommonTestUtils.createCopy(orig);
+			assertEquals("myvalue", copy.getString("mykey", "null"));
+			assertEquals(100, copy.getInteger("mynumber", 0));
+			assertEquals(478236947162389746L, copy.getLong("longvalue", 0L));
+			assertEquals(3.1415926f, copy.getFloat("PI", 3.1415926f), 0.0);
+			assertEquals(Math.E, copy.getDouble("E", 0.0), 0.0);
+			assertEquals(true, copy.getBoolean("shouldbetrue", false));
+			assertArrayEquals(new byte[] { 1, 2, 3, 4, 5 }, copy.getBytes("bytes sequence", null));
+			assertEquals(getClass(), copy.getClass("myclass", null, getClass().getClassLoader()));
+			
+			assertEquals(orig, copy);
+			assertEquals(orig.keySet(), copy.keySet());
+			assertEquals(orig.hashCode(), copy.hashCode());
 
-			assertEquals(copy.getString("mykey", "null"), "myvalue");
-			assertEquals(copy.getBoolean("shouldbetrue", false), true);
-			assertEquals(copy.getInteger("mynumber", 0), 100);
-			assertEquals(copy.getClass("myclass", null).toString(), this.getClass().toString());
-			assertTrue(orig.equals(copy));
-			assertTrue(orig.keySet().equals(copy.keySet()));
-
-		} catch (IOException e) {
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testConversions() {
+		try {
+			Configuration pc = new Configuration();
+			
+			pc.setInteger("int", 5);
+			pc.setLong("long", 15);
+			pc.setLong("too_long", TOO_LONG);
+			pc.setFloat("float", 2.1456775f);
+			pc.setDouble("double", Math.PI);
+			pc.setDouble("too_long_double", TOO_LONG_DOUBLE);
+			pc.setString("string", "42");
+			pc.setString("non_convertible_string", "bcdefg&&");
+			pc.setBoolean("boolean", true);
+			
+			// as integer
+			assertEquals(5, pc.getInteger("int", 0));
+			assertEquals(5L, pc.getLong("int", 0));
+			assertEquals(5f, pc.getFloat("int", 0), 0.0);
+			assertEquals(5.0, pc.getDouble("int", 0), 0.0);
+			assertEquals(false, pc.getBoolean("int", true));
+			assertEquals("5", pc.getString("int", "0"));
+			assertArrayEquals(EMPTY_BYTES, pc.getBytes("int", EMPTY_BYTES));
+			
+			// as long
+			assertEquals(15, pc.getInteger("long", 0));
+			assertEquals(15L, pc.getLong("long", 0));
+			assertEquals(15f, pc.getFloat("long", 0), 0.0);
+			assertEquals(15.0, pc.getDouble("long", 0), 0.0);
+			assertEquals(false, pc.getBoolean("long", true));
+			assertEquals("15", pc.getString("long", "0"));
+			assertArrayEquals(EMPTY_BYTES, pc.getBytes("long", EMPTY_BYTES));
+			
+			// as too long
+			assertEquals(0, pc.getInteger("too_long", 0));
+			assertEquals(TOO_LONG, pc.getLong("too_long", 0));
+			assertEquals((float) TOO_LONG, pc.getFloat("too_long", 0), 10.0);
+			assertEquals((double) TOO_LONG, pc.getDouble("too_long", 0), 10.0);
+			assertEquals(false, pc.getBoolean("too_long", true));
+			assertEquals(String.valueOf(TOO_LONG), pc.getString("too_long", "0"));
+			assertArrayEquals(EMPTY_BYTES, pc.getBytes("too_long", EMPTY_BYTES));
+			
+			// as float
+			assertEquals(0, pc.getInteger("float", 0));
+			assertEquals(0L, pc.getLong("float", 0));
+			assertEquals(2.1456775f, pc.getFloat("float", 0), 0.0);
+			assertEquals(2.1456775, pc.getDouble("float", 0), 0.0000001);
+			assertEquals(false, pc.getBoolean("float", true));
+			assertTrue(pc.getString("float", "0").startsWith("2.145677"));
+			assertArrayEquals(EMPTY_BYTES, pc.getBytes("float", EMPTY_BYTES));
+			
+			// as double
+			assertEquals(0, pc.getInteger("double", 0));
+			assertEquals(0L, pc.getLong("double", 0));
+			assertEquals(3.141592f, pc.getFloat("double", 0), 0.000001);
+			assertEquals(Math.PI, pc.getDouble("double", 0), 0.0);
+			assertEquals(false, pc.getBoolean("double", true));
+			assertTrue(pc.getString("double", "0").startsWith("3.1415926535"));
+			assertArrayEquals(EMPTY_BYTES, pc.getBytes("double", EMPTY_BYTES));
+			
+			// as too long double
+			assertEquals(0, pc.getInteger("too_long_double", 0));
+			assertEquals(0L, pc.getLong("too_long_double", 0));
+			assertEquals(0f, pc.getFloat("too_long_double", 0f), 0.000001);
+			assertEquals(TOO_LONG_DOUBLE, pc.getDouble("too_long_double", 0), 0.0);
+			assertEquals(false, pc.getBoolean("too_long_double", true));
+			assertEquals(String.valueOf(TOO_LONG_DOUBLE), pc.getString("too_long_double", "0"));
+			assertArrayEquals(EMPTY_BYTES, pc.getBytes("too_long_double", EMPTY_BYTES));
+			
+			// as string
+			assertEquals(42, pc.getInteger("string", 0));
+			assertEquals(42L, pc.getLong("string", 0));
+			assertEquals(42f, pc.getFloat("string", 0f), 0.000001);
+			assertEquals(42.0, pc.getDouble("string", 0), 0.0);
+			assertEquals(false, pc.getBoolean("string", true));
+			assertEquals("42", pc.getString("string", "0"));
+			assertArrayEquals(EMPTY_BYTES, pc.getBytes("string", EMPTY_BYTES));
+			
+			// as non convertible string
+			assertEquals(0, pc.getInteger("non_convertible_string", 0));
+			assertEquals(0L, pc.getLong("non_convertible_string", 0));
+			assertEquals(0f, pc.getFloat("non_convertible_string", 0f), 0.000001);
+			assertEquals(0.0, pc.getDouble("non_convertible_string", 0), 0.0);
+			assertEquals(false, pc.getBoolean("non_convertible_string", true));
+			assertEquals("bcdefg&&", pc.getString("non_convertible_string", "0"));
+			assertArrayEquals(EMPTY_BYTES, pc.getBytes("non_convertible_string", EMPTY_BYTES));
+			
+			// as boolean
+			assertEquals(0, pc.getInteger("boolean", 0));
+			assertEquals(0L, pc.getLong("boolean", 0));
+			assertEquals(0f, pc.getFloat("boolean", 0f), 0.000001);
+			assertEquals(0.0, pc.getDouble("boolean", 0), 0.0);
+			assertEquals(true, pc.getBoolean("boolean", false));
+			assertEquals("true", pc.getString("boolean", "0"));
+			assertArrayEquals(EMPTY_BYTES, pc.getBytes("boolean", EMPTY_BYTES));
+		}
+		catch (Exception e) {
+			e.printStackTrace();
 			fail(e.getMessage());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
index 18702c7..f207f78 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.configuration;
 
 import static org.junit.Assert.assertEquals;
@@ -40,7 +39,7 @@ public class GlobalConfigurationTest {
 	public void resetSingleton() throws SecurityException, NoSuchFieldException, IllegalArgumentException,
 			IllegalAccessException {
 		// reset GlobalConfiguration between tests
-		Field instance = GlobalConfiguration.class.getDeclaredField("configuration");
+		Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON");
 		instance.setAccessible(true);
 		instance.set(null, null);
 	}
@@ -73,8 +72,8 @@ public class GlobalConfigurationTest {
 			GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
 			Configuration conf = GlobalConfiguration.getConfiguration();
 			
-			// all distinct keys from confFile1 + confFile2 + 'config.dir' key
-			assertEquals(3 + 1, conf.keySet().size());
+			// all distinct keys from confFile1 + confFile2key
+			assertEquals(3, conf.keySet().size());
 			
 			// keys 1, 2, 3 should be OK and match the expected values
 			// => configuration keys from YAML should overwrite keys from XML
@@ -126,8 +125,8 @@ public class GlobalConfigurationTest {
 			GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
 			Configuration conf = GlobalConfiguration.getConfiguration();
 
-			// all distinct keys from confFile1 + confFile2 + 'config.dir' key
-			assertEquals(6 + 1, conf.keySet().size());
+			// all distinct keys from confFile1 + confFile2 key
+			assertEquals(6, conf.keySet().size());
 
 			// keys 1, 2, 4, 5, 6, 7, 8 should be OK and match the expected values
 			assertEquals("myvalue1", conf.getString("mykey1", null));
@@ -202,14 +201,6 @@ public class GlobalConfigurationTest {
 			newconf.setInteger("mynewinteger", 1000);
 			GlobalConfiguration.includeConfiguration(newconf);
 			assertEquals(GlobalConfiguration.getInteger("mynewinteger", 0), 1000);
-
-			// Test local "sub" configuration
-			final String[] configparams = { "mykey1", "mykey2" };
-			Configuration newconf2 = GlobalConfiguration.getConfiguration(configparams);
-
-			assertEquals(newconf2.keySet().size(), 2);
-			assertEquals(newconf2.getString("mykey1", "null"), "myvalue1");
-			assertEquals(newconf2.getString("mykey2", "null"), "myvalue2");
 		} finally {
 			// Remove temporary files
 			confFile1.delete();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-core/src/test/resources/logback-test.xml b/flink-core/src/test/resources/logback-test.xml
index 4f484cb..1c4ea08 100644
--- a/flink-core/src/test/resources/logback-test.xml
+++ b/flink-core/src/test/resources/logback-test.xml
@@ -30,4 +30,5 @@
     <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/>
     <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/>
     <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/>
+    <logger name="org.apache.flink.configuration.Configuration" level="OFF"/>
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
index 3f42e87..391ef7f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/translation/WrappingFunction.java
@@ -146,6 +146,11 @@ public abstract class WrappingFunction<T extends Function> extends AbstractRichF
 		public DistributedCache getDistributedCache() {
 			return context.getDistributedCache();
 		}
+		
+		@Override
+		public ClassLoader getUserCodeClassLoader() {
+			return context.getUserCodeClassLoader();
+		}
 	}
 	
 	private static class WrappingIterationRuntimeContext extends WrappingRuntimeContext implements IterationRuntimeContext {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
index 8561133..c7f86af 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
@@ -192,18 +192,25 @@ public class CsvInputFormat extends GenericCsvInputFormat<Record> {
 			Class<? extends Value>[] types = (Class<? extends Value>[]) new Class[maxTextPos+1];
 			int[] targetPos = new int[maxTextPos+1];
 			
+			ClassLoader cl = Thread.currentThread().getContextClassLoader();
+			
 			// set the fields
-			for (int i = 0; i < numConfigFields; i++) {
-				int pos = textPosIdx[i];
-				
-				Class<? extends Value> clazz = config.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null).asSubclass(Value.class);
-				if (clazz == null) {
-					throw new IllegalConfigurationException("Invalid configuration for CsvInputFormat: " +
-						"No field parser class for parameter " + i);
+			try {
+				for (int i = 0; i < numConfigFields; i++) {
+					int pos = textPosIdx[i];
+					
+					Class<? extends Value> clazz = config.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null, cl).asSubclass(Value.class);
+					if (clazz == null) {
+						throw new IllegalConfigurationException("Invalid configuration for CsvInputFormat: " +
+							"No field parser class for parameter " + i);
+					}
+					
+					types[pos] = clazz;
+					targetPos[pos] = i;
 				}
-				
-				types[pos] = clazz;
-				targetPos[pos] = i;
+			}
+			catch (ClassNotFoundException e) {
+				throw new RuntimeException("Could not resolve type classes", e);
 			}
 			
 			// update the field types

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
index 8760cd8..2c514fe 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
@@ -206,14 +206,20 @@ public class CsvOutputFormat extends FileOutputFormat {
 		Class<Value>[] arr = new Class[this.numFields];
 		this.classes = arr;
 
-		for (int i = 0; i < this.numFields; i++) {
-			@SuppressWarnings("unchecked")
-			Class<? extends Value> clazz = (Class<? extends Value>) parameters.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null);
-			if (clazz == null) {
-				throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " + "No type class for parameter " + i);
+		try {
+			ClassLoader cl = Thread.currentThread().getContextClassLoader();
+			
+			for (int i = 0; i < this.numFields; i++) {
+				Class<? extends Value> clazz =  parameters.<Value>getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null, cl);
+				if (clazz == null) {
+					throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " + "No type class for parameter " + i);
+				}
+	
+				this.classes[i] = clazz;
 			}
-
-			this.classes[i] = clazz;
+		}
+		catch (ClassNotFoundException e) {
+			throw new RuntimeException("Could not resolve type classes", e);
 		}
 
 		this.recordPositions = new int[this.numFields];

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 6039654..b5cfb8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -158,7 +158,8 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
 	@Override
 	public RuntimeUDFContext createRuntimeContext(String taskName) {
 		Environment env = getEnvironment();
-		return new IterativeRuntimeUdfContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup());
+		return new IterativeRuntimeUdfContext(taskName, env.getCurrentNumberOfSubtasks(),
+				env.getIndexInSubtaskGroup(), userCodeClassLoader);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -347,8 +348,8 @@ public abstract class AbstractIterativePactTask<S extends Function, OT> extends
 
 	private class IterativeRuntimeUdfContext extends RuntimeUDFContext implements IterationRuntimeContext {
 
-		public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex) {
-			super(name, numParallelSubtasks, subtaskIndex);
+		public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) {
+			super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index cca4559..0c04538 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -296,7 +296,7 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 			}
 
 			// instantiate all aggregators and register them at the iteration global registry
-			aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators());
+			aggregatorRegistry = new RuntimeAggregatorRegistry(config.getIterationAggregators(userCodeClassLoader));
 			IterationAggregatorBroker.instance().handIn(brokerKey, aggregatorRegistry);
 
 			DataInputView superstepResult = null;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 3ac0858..f75ba61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -86,13 +86,13 @@ public class IterationSynchronizationSinkTask extends AbstractInvokable implemen
 		
 		// store all aggregators
 		this.aggregators = new HashMap<String, Aggregator<?>>();
-		for (AggregatorWithName<?> aggWithName : taskConfig.getIterationAggregators()) {
+		for (AggregatorWithName<?> aggWithName : taskConfig.getIterationAggregators(userCodeClassLoader)) {
 			aggregators.put(aggWithName.getName(), aggWithName.getAggregator());
 		}
 		
 		// store the aggregator convergence criterion
 		if (taskConfig.usesConvergenceCriterion()) {
-			convergenceCriterion = taskConfig.getConvergenceCriterion();
+			convergenceCriterion = taskConfig.getConvergenceCriterion(userCodeClassLoader);
 			convergenceAggregatorName = taskConfig.getConvergenceCriterionAggregatorName();
 			Preconditions.checkNotNull(convergenceAggregatorName);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 1791857..2d7afc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -275,7 +275,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 		}
 		// obtain task configuration (including stub parameters)
 		Configuration taskConf = getTaskConfiguration();
-		taskConf.setClassLoader(this.userCodeClassLoader);
 		this.config = new TaskConfig(taskConf);
 
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 5d5409e..af3eff3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -310,7 +310,6 @@ l	 *
 
 		// obtain task configuration (including stub parameters)
 		Configuration taskConf = getTaskConfiguration();
-		taskConf.setClassLoader(this.userCodeClassLoader);
 		this.config = new TaskConfig(taskConf);
 
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 72d0dc0..b7fa872 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -243,7 +243,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 
 		// obtain task configuration (including stub parameters)
 		Configuration taskConf = getTaskConfiguration();
-		taskConf.setClassLoader(this.userCodeClassLoader);
 		this.config = new TaskConfig(taskConf);
 
 		// now get the operator class which drives the operation
@@ -1066,7 +1065,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable i
 
 	public RuntimeUDFContext createRuntimeContext(String taskName) {
 		Environment env = getEnvironment();
-		return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTask());
+		return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader, env.getCopyTask());
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
index 3ebf45c..482103c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedCollectorMapDriver.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.operators.chaining;
 
 import org.apache.flink.api.common.functions.RichFunction;
@@ -25,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.operators.RegularPactTask;
 
+@SuppressWarnings("deprecation")
 public class ChainedCollectorMapDriver<IT, OT> extends ChainedDriver<IT, OT> {
 
 	private GenericCollectorMap<IT, OT> mapper;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index 4c0ac2d..679062a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -57,7 +57,8 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
 			this.udfContext = ((RegularPactTask<?, ?>) parent).createRuntimeContext(taskName);
 		} else {
 			Environment env = parent.getEnvironment();
-			this.udfContext = new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTask());
+			this.udfContext = new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), 
+					env.getIndexInSubtaskGroup(), userCodeClassLoader, env.getCopyTask());
 		}
 
 		setup(parent);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java
index a667844..540cc89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/udf/RuntimeUDFContext.java
@@ -44,22 +44,26 @@ public class RuntimeUDFContext implements RuntimeContext {
 
 	private final int subtaskIndex;
 
-	private DistributedCache distributedCache = new DistributedCache();
+	private final ClassLoader userCodeClassLoader;
+	
+	private final DistributedCache distributedCache = new DistributedCache();
 
-	private HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
+	private final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
 
-	private HashMap<String, List<?>> broadcastVars = new HashMap<String, List<?>>();
+	private final HashMap<String, List<?>> broadcastVars = new HashMap<String, List<?>>();
 
-	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex) {
+	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader) {
 		this.name = name;
 		this.numParallelSubtasks = numParallelSubtasks;
 		this.subtaskIndex = subtaskIndex;
+		this.userCodeClassLoader = userCodeClassLoader;
 	}
 
-	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, Map<String, FutureTask<Path>> cpTasks) {
+	public RuntimeUDFContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader, Map<String, FutureTask<Path>> cpTasks) {
 		this.name = name;
 		this.numParallelSubtasks = numParallelSubtasks;
 		this.subtaskIndex = subtaskIndex;
+		this.userCodeClassLoader = userCodeClassLoader;
 		this.distributedCache.setCopyTasks(cpTasks);
 	}
 	@Override
@@ -158,4 +162,9 @@ public class RuntimeUDFContext implements RuntimeContext {
 	public DistributedCache getDistributedCache() {
 		return this.distributedCache;
 	}
+	
+	@Override
+	public ClassLoader getUserCodeClassLoader() {
+		return this.userCodeClassLoader;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/02314adc/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
index 003b872..9facb33 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java
@@ -916,7 +916,7 @@ public class TaskConfig {
 	}
 	
 	@SuppressWarnings("unchecked")
-	public Collection<AggregatorWithName<?>> getIterationAggregators() {
+	public Collection<AggregatorWithName<?>> getIterationAggregators(ClassLoader cl) {
 		final int numAggs = this.config.getInteger(ITERATION_NUM_AGGREGATORS, 0);
 		if (numAggs == 0) {
 			return Collections.emptyList();
@@ -927,7 +927,7 @@ public class TaskConfig {
 			Aggregator<Value> aggObj;
 			try {
 				aggObj = (Aggregator<Value>) InstantiationUtil.readObjectFromConfig(
-						this.config, ITERATION_AGGREGATOR_PREFIX + i, getConfiguration().getClassLoader());
+						this.config, ITERATION_AGGREGATOR_PREFIX + i, cl);
 			} catch (IOException e) {
 					throw new RuntimeException("Error while reading the aggregator object from the task configuration.");
 			} catch (ClassNotFoundException e) {
@@ -956,11 +956,11 @@ public class TaskConfig {
 	}
 
 	@SuppressWarnings("unchecked")
-	public <T extends Value> ConvergenceCriterion<T> getConvergenceCriterion() {
+	public <T extends Value> ConvergenceCriterion<T> getConvergenceCriterion(ClassLoader cl) {
 		ConvergenceCriterion<T> convCriterionObj = null;
 		try {
 			convCriterionObj = (ConvergenceCriterion<T>) InstantiationUtil.readObjectFromConfig(
-			this.config, ITERATION_CONVERGENCE_CRITERION, getConfiguration().getClassLoader());
+			this.config, ITERATION_CONVERGENCE_CRITERION, cl);
 		} catch (IOException e) {
 			throw new RuntimeException("Error while reading the covergence criterion object from the task configuration.");
 		} catch (ClassNotFoundException e) {
@@ -974,7 +974,7 @@ public class TaskConfig {
 	}
 
 	public boolean usesConvergenceCriterion() {
-		return config.getString(ITERATION_CONVERGENCE_CRITERION, null) != null;
+		return config.getBytes(ITERATION_CONVERGENCE_CRITERION, null) != null;
 	}
 	
 	public String getConvergenceCriterionAggregatorName() {
@@ -1174,18 +1174,8 @@ public class TaskConfig {
 		}
 
 		@Override
-		public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, Class<? super T> ancestor) {
-			return this.backingConfig.getClass(this.prefix + key, defaultValue, ancestor);
-		}
-
-		@Override
-		public ClassLoader getClassLoader() {
-			return this.backingConfig.getClassLoader();
-		}
-
-		@Override
-		public Class<?> getClass(String key, Class<?> defaultValue) {
-			return this.backingConfig.getClass(this.prefix + key, defaultValue);
+		public <T> Class<T> getClass(String key, Class<? extends T> defaultValue, ClassLoader classLoader) throws ClassNotFoundException {
+			return this.backingConfig.getClass(this.prefix + key, defaultValue, classLoader);
 		}
 
 		@Override
@@ -1269,11 +1259,6 @@ public class TaskConfig {
 		}
 		
 		@Override
-		public void setClassLoader(ClassLoader classLoader) {
-			backingConfig.setClassLoader(classLoader);
-		}
-		
-		@Override
 		public Set<String> keySet() {
 			final HashSet<String> set = new HashSet<String>();
 			final int prefixLen = this.prefix == null ? 0 : this.prefix.length();


Mime
View raw message