flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/2] flink git commit: [FLINK-3904] enhancements to GlobalConfiguration
Date Wed, 27 Jul 2016 14:42:16 GMT
[FLINK-3904] enhancements to GlobalConfiguration

- fail if config couldn't be loaded
- remove duplicate api methods
- remove undocumented XML loading feature
- generate yaml conf in tests instead of xml conf
- only load one config file instead of all xml or yaml files (flink-conf.yaml)
- make globalconfiguration non-global and remove static SINGLETON
- fix test cases
- add test cases

This closes #2123


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5eb0e38f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5eb0e38f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5eb0e38f

Branch: refs/heads/master
Commit: 5eb0e38fbb346fea05932c3afc24a2d071e8974f
Parents: 12bf7c1
Author: Maximilian Michels <mxm@apache.org>
Authored: Wed Jul 27 17:06:06 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Wed Jul 27 16:40:09 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    |   4 +-
 .../CliFrontendAddressConfigurationTest.java    |   5 -
 .../flink/client/CliFrontendListCancelTest.java |   1 -
 .../apache/flink/client/CliFrontendRunTest.java |   1 -
 .../flink/client/CliFrontendStopTest.java       |   2 -
 .../flink/client/CliFrontendTestUtils.java      |  28 +-
 .../org/apache/flink/storm/api/FlinkClient.java |   8 +-
 .../apache/flink/storm/api/FlinkSubmitter.java  |   2 +-
 .../flink/api/common/io/BinaryOutputFormat.java |   2 +-
 .../api/common/io/DelimitedInputFormat.java     |  30 +-
 .../flink/api/common/io/FileInputFormat.java    |  25 +-
 .../flink/api/common/io/FileOutputFormat.java   |  34 +-
 .../api/common/io/GenericCsvInputFormat.java    |   8 +-
 .../apache/flink/api/common/io/InputFormat.java |   2 +-
 .../api/common/io/SerializedOutputFormat.java   |   2 +-
 .../configuration/GlobalConfiguration.java      | 395 +++----------------
 .../api/common/io/BinaryInputFormatTest.java    |   3 +-
 .../io/DelimitedInputFormatSamplingTest.java    |  44 ++-
 .../api/common/io/DelimitedInputFormatTest.java |   6 +-
 .../api/common/io/EnumerateNestedFilesTest.java |   4 +-
 .../api/common/io/FileOutputFormatTest.java     |  12 +-
 .../api/common/io/RichInputFormatTest.java      |   1 +
 .../api/common/io/RichOutputFormatTest.java     |   1 +
 .../FilesystemSchemeConfigTest.java             |  20 +-
 .../configuration/GlobalConfigurationTest.java  | 201 +++-------
 .../apache/flink/testutils/TestConfigUtils.java |  29 +-
 .../java/hadoop/mapred/utils/HadoopUtils.java   |  10 +-
 .../hadoop/mapreduce/utils/HadoopUtils.java     |  12 +-
 .../flink/api/java/io/PrimitiveInputFormat.java |   4 +-
 .../flink/api/java/io/TextInputFormat.java      |   2 +-
 .../flink/api/java/io/TextValueInputFormat.java |   2 +-
 .../flink/python/api/PythonPlanBinder.java      |  11 +-
 .../plantranslate/JobGraphGenerator.java        |  10 +-
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  10 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   3 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   1 -
 .../BlobLibraryCacheManagerTest.java            |   4 -
 .../org/apache/flink/api/scala/FlinkShell.scala |   4 +-
 .../flink/api/scala/ScalaShellITCase.scala      |   4 +-
 .../environment/StreamContextEnvironment.java   |   2 +-
 .../api/environment/StreamPlanEnvironment.java  |   2 +-
 ...CliFrontendYarnAddressConfigurationTest.java |   8 -
 .../flink/yarn/YARNHighAvailabilityITCase.java  |   2 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |   2 +-
 .../yarn/AbstractYarnClusterDescriptor.java     |   7 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   3 +-
 46 files changed, 273 insertions(+), 700 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index a888841..7c2ee2e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -153,9 +153,7 @@ public class CliFrontend {
 
 		// load the configuration
 		LOG.info("Trying to load configuration file");
-		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
-		System.setProperty(ConfigConstants.ENV_FLINK_CONF_DIR, configDirectory.getAbsolutePath());
-		this.config = GlobalConfiguration.getConfiguration();
+		this.config = GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
 
 		try {
 			FileSystem.setDefaultScheme(config);

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
index 41d8622..8320e04 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendAddressConfigurationTest.java
@@ -48,11 +48,6 @@ public class CliFrontendAddressConfigurationTest {
 		CliFrontendTestUtils.pipeSystemOutToNull();
 	}
 
-	@Before
-	public void clearConfig() {
-		CliFrontendTestUtils.clearGlobalConfiguration();
-	}
-
 	@Test
 	public void testValidConfig() {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 736d859..524e7e7 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -55,7 +55,6 @@ public class CliFrontendListCancelTest {
 	@BeforeClass
 	public static void init() {
 		CliFrontendTestUtils.pipeSystemOutToNull();
-		CliFrontendTestUtils.clearGlobalConfiguration();
 	}
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index f710d8e..0326eab 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -36,7 +36,6 @@ public class CliFrontendRunTest {
 	@BeforeClass
 	public static void init() {
 		CliFrontendTestUtils.pipeSystemOutToNull();
-		CliFrontendTestUtils.clearGlobalConfiguration();
 	}
 	
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
index 7c34c75..9522ac7 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java
@@ -35,7 +35,6 @@ import org.junit.Test;
 import java.util.UUID;
 
 import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-import static org.apache.flink.client.CliFrontendTestUtils.clearGlobalConfiguration;
 import static org.junit.Assert.*;
 
 public class CliFrontendStopTest extends TestLogger {
@@ -45,7 +44,6 @@ public class CliFrontendStopTest extends TestLogger {
 	@BeforeClass
 	public static void setup() {
 		pipeSystemOutToNull();
-		clearGlobalConfiguration();
 		actorSystem = ActorSystem.create("TestingActorSystem");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
index 1872133..c411a7b 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendTestUtils.java
@@ -71,33 +71,7 @@ public class CliFrontendTestUtils {
 		System.setOut(new PrintStream(new BlackholeOutputSteam()));
 		System.setErr(new PrintStream(new BlackholeOutputSteam()));
 	}
-	
-	public static void clearGlobalConfiguration() {
-		try {
-			Field singletonInstanceField = GlobalConfiguration.class.getDeclaredField("SINGLETON");
-			Field conf = GlobalConfiguration.class.getDeclaredField("config");
-			Field map = Configuration.class.getDeclaredField("confData");
-			
-			singletonInstanceField.setAccessible(true);
-			conf.setAccessible(true);
-			map.setAccessible(true);
-			
-			GlobalConfiguration gconf = (GlobalConfiguration) singletonInstanceField.get(null);
-			if (gconf != null) {
-				Configuration confObject = (Configuration) conf.get(gconf);
-				@SuppressWarnings("unchecked")
-				Map<String, Object> confData = (Map<String, Object>) map.get(confObject);
-				confData.clear();
-			}
-		}
-		catch (Exception e) {
-			System.err.println(e.getMessage());
-			e.printStackTrace();
-			fail("Test initialization caused an exception: " + e.getMessage());
-		}
-		
-	}
-	
+
 	private static final class BlackholeOutputSteam extends java.io.OutputStream {
 		@Override
 		public void write(int b){}

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
index 6ad250d..9628bb7 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkClient.java
@@ -242,7 +242,7 @@ public class FlinkClient {
 			}
 		}
 
-		final Configuration configuration = GlobalConfiguration.getConfiguration();
+		final Configuration configuration = GlobalConfiguration.loadConfiguration();
 		configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, this.jobManagerHost);
 		configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, this.jobManagerPort);
 
@@ -271,7 +271,7 @@ public class FlinkClient {
 	 * @return Flink's internally used {@link JobID}.
 	 */
 	JobID getTopologyJobId(final String id) {
-		final Configuration configuration = GlobalConfiguration.getConfiguration();
+		final Configuration configuration = GlobalConfiguration.loadConfiguration();
 		if (this.timeout != null) {
 			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
 		}
@@ -311,7 +311,7 @@ public class FlinkClient {
 	}
 
 	private FiniteDuration getTimeout() {
-		final Configuration configuration = GlobalConfiguration.getConfiguration();
+		final Configuration configuration = GlobalConfiguration.loadConfiguration();
 		if (this.timeout != null) {
 			configuration.setString(ConfigConstants.AKKA_ASK_TIMEOUT, this.timeout);
 		}
@@ -320,7 +320,7 @@ public class FlinkClient {
 	}
 
 	private ActorRef getJobManager() throws IOException {
-		final Configuration configuration = GlobalConfiguration.getConfiguration();
+		final Configuration configuration = GlobalConfiguration.loadConfiguration();
 
 		ActorSystem actorSystem;
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
index 13a39ef..f8932b1 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
@@ -87,7 +87,7 @@ public class FlinkSubmitter {
 			throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
 		}
 
-		final Configuration flinkConfig = GlobalConfiguration.getConfiguration();
+		final Configuration flinkConfig = GlobalConfiguration.loadConfiguration();
 		if (!stormConf.containsKey(Config.NIMBUS_HOST)) {
 			stormConf.put(Config.NIMBUS_HOST,
 					flinkConfig.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, "localhost"));

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
index a89e73e..059198c 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java
@@ -45,7 +45,7 @@ public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> {
 	
 	private transient DataOutputViewStreamWrapper outView;
 
-	
+
 	@Override
 	public void close() throws IOException {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 99aa022..fd02c82 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -20,13 +20,13 @@ package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
@@ -84,12 +84,18 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 	 */
 	private static int MAX_SAMPLE_LEN;
 
-	static { loadGlobalConfigParams(); }
-	
+	/**
+	 * @Deprecated Please use {@code loadConfigParameters(Configuration config}
+	 */
+	@Deprecated
 	protected static void loadGlobalConfigParams() {
-		int maxSamples = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY,
+		loadConfigParameters(GlobalConfiguration.loadConfiguration());
+	}
+
+	protected static void loadConfigParameters(Configuration parameters) {
+		int maxSamples = parameters.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY,
 				ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_LINE_SAMPLES);
-		int minSamples = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,
+		int minSamples = parameters.getInteger(ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,
 			ConfigConstants.DEFAULT_DELIMITED_FORMAT_MIN_LINE_SAMPLES);
 		
 		if (maxSamples < 0) {
@@ -113,7 +119,7 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 			DEFAULT_MIN_NUM_SAMPLES = minSamples;
 		}
 		
-		int maxLen = GlobalConfiguration.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_SAMPLE_LENGTH_KEY,
+		int maxLen = parameters.getInteger(ConfigConstants.DELIMITED_FORMAT_MAX_SAMPLE_LENGTH_KEY,
 				ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN);
 		if (maxLen <= 0) {
 			maxLen = ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN;
@@ -164,13 +170,17 @@ public abstract class DelimitedInputFormat<OT> extends FileInputFormat<OT> imple
 	// --------------------------------------------------------------------------------------------
 	//  Constructors & Getters/setters for the configurable parameters
 	// --------------------------------------------------------------------------------------------
-	
+
 	public DelimitedInputFormat() {
-		super();
+		this(null, null);
 	}
-	
-	protected DelimitedInputFormat(Path filePath) {
+
+	protected DelimitedInputFormat(Path filePath, Configuration configuration) {
 		super(filePath);
+		if (configuration == null) {
+			configuration = GlobalConfiguration.loadConfiguration();
+		}
+		loadConfigParameters(configuration);
 	}
 	
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 95a1ffa..72d6061 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -87,15 +87,19 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 	 * The splitLength is set to -1L for reading the whole split.
 	 */
 	protected static final long READ_WHOLE_SPLIT_FLAG = -1L;
-	
+
 	static {
-		initDefaultsFromConfiguration();
+		initDefaultsFromConfiguration(GlobalConfiguration.loadConfiguration());
 		initDefaultInflaterInputStreamFactories();
 	}
-	
-	private static void initDefaultsFromConfiguration() {
-		
-		final long to = GlobalConfiguration.getLong(ConfigConstants.FS_STREAM_OPENING_TIMEOUT_KEY,
+
+	/**
+	 * Initialize defaults for input format. Needs to be a static method because it is configured for local
+	 * cluster execution, see LocalFlinkMiniCluster.
+	 * @param configuration The configuration to load defaults from
+	 */
+	private static void initDefaultsFromConfiguration(Configuration configuration) {
+		final long to = configuration.getLong(ConfigConstants.FS_STREAM_OPENING_TIMEOUT_KEY,
 			ConfigConstants.DEFAULT_FS_STREAM_OPENING_TIMEOUT);
 		if (to < 0) {
 			LOG.error("Invalid timeout value for filesystem stream opening: " + to + ". Using default value of " +
@@ -154,10 +158,6 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 		}
 	}
 	
-	static long getDefaultOpeningTimeout() {
-		return DEFAULT_OPENING_TIMEOUT;
-	}
-	
 	// --------------------------------------------------------------------------------------------
 	//  Variables for internal operation.
 	//  They are all transient, because we do not want them so be serialized 
@@ -224,11 +224,8 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS
 	// --------------------------------------------------------------------------------------------	
 
 	public FileInputFormat() {}
-	
+
 	protected FileInputFormat(Path filePath) {
-		if (filePath == null) {
-			throw new IllegalArgumentException("The file path must not be null.");
-		}
 		this.filePath = filePath;
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
index 557c342..7530ba1 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileOutputFormat.java
@@ -22,11 +22,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
@@ -62,26 +62,30 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen
 	private static WriteMode DEFAULT_WRITE_MODE;
 	
 	private static OutputDirectoryMode DEFAULT_OUTPUT_DIRECTORY_MODE;
-	
-	
-	private static final void initDefaultsFromConfiguration(Configuration configuration) {
-		final boolean overwrite = configuration.getBoolean(ConfigConstants
-						.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
+
+	static {
+		initDefaultsFromConfiguration(GlobalConfiguration.loadConfiguration());
+	}
+
+	/**
+	 * Initialize defaults for output format. Needs to be a static method because it is configured for local
+	 * cluster execution, see LocalFlinkMiniCluster.
+	 * @param configuration The configuration to load defaults from
+	 */
+	private static void initDefaultsFromConfiguration(Configuration configuration) {
+		final boolean overwrite = configuration.getBoolean(
+				ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY,
 				ConfigConstants.DEFAULT_FILESYSTEM_OVERWRITE);
 	
 		DEFAULT_WRITE_MODE = overwrite ? WriteMode.OVERWRITE : WriteMode.NO_OVERWRITE;
 		
-		final boolean alwaysCreateDirectory = configuration.getBoolean(ConfigConstants
-						.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
+		final boolean alwaysCreateDirectory = configuration.getBoolean(
+			ConfigConstants.FILESYSTEM_OUTPUT_ALWAYS_CREATE_DIRECTORY_KEY,
 			ConfigConstants.DEFAULT_FILESYSTEM_ALWAYS_CREATE_DIRECTORY);
 	
 		DEFAULT_OUTPUT_DIRECTORY_MODE = alwaysCreateDirectory ? OutputDirectoryMode.ALWAYS : OutputDirectoryMode.PARONLY;
 	}
-	
-	static {
-		initDefaultsFromConfiguration(GlobalConfiguration.getConfiguration());
-	}
-	
+
 	// --------------------------------------------------------------------------------------------	
 	
 	/**
@@ -121,9 +125,9 @@ public abstract class FileOutputFormat<IT> extends RichOutputFormat<IT> implemen
 	private transient boolean fileCreated;
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	public FileOutputFormat() {}
-	
+
 	public FileOutputFormat(Path outputPath) {
 		this.outputFilePath = outputPath;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index e2c54ad..85d9cd8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -98,15 +98,15 @@ public abstract class GenericCsvInputFormat<OT> extends DelimitedInputFormat<OT>
 	// --------------------------------------------------------------------------------------------
 	//  Constructors and getters/setters for the configurable parameters
 	// --------------------------------------------------------------------------------------------
-	
+
 	protected GenericCsvInputFormat() {
 		super();
 	}
-	
+
 	protected GenericCsvInputFormat(Path filePath) {
-		super(filePath);
+		super(filePath, null);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	public int getNumberOfFieldsTotal() {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
index 0e978b9..300c237 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java
@@ -70,7 +70,7 @@ public interface InputFormat<OT, T extends InputSplit> extends InputSplitSource<
 	 * <p>
 	 * This method is always called first on a newly instantiated input format. 
 	 *  
-	 * @param parameters The configuration with all parameters.
+	 * @param parameters The configuration with all parameters (note: not the Flink config but the TaskConfig).
 	 */
 	void configure(Configuration parameters);
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
index 0ef160e..edbe1a0 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/io/SerializedOutputFormat.java
@@ -33,7 +33,7 @@ import org.apache.flink.core.memory.DataOutputView;
 public class SerializedOutputFormat<T extends IOReadableWritable> extends BinaryOutputFormat<T> {
 	
 	private static final long serialVersionUID = 1L;
-	
+
 	@Override
 	protected void serialize(T record, DataOutputView dataOutputView) throws IOException {
 		record.write(dataOutputView);

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
index 7e50486..8d550d7 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java
@@ -21,21 +21,12 @@ package org.apache.flink.configuration;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStreamReader;
 
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
 import org.apache.flink.annotation.Internal;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.w3c.dom.Text;
 
 /**
  * Global configuration object for Flink. Similar to Java properties configuration
@@ -44,159 +35,62 @@ import org.w3c.dom.Text;
 @Internal
 public final class GlobalConfiguration {
 
-	/** The log object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(GlobalConfiguration.class);
 
-	/** The global configuration object accessible through a singleton pattern. */
-	private static GlobalConfiguration SINGLETON = null;
-
-	/** The internal map holding the key-value pairs the configuration consists of. */
-	private final Configuration config = new Configuration();
+	public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";
 
 	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Retrieves the singleton object of the global configuration.
-	 * 
-	 * @return the global configuration object
-	 */
-	private static GlobalConfiguration get() {
-		// lazy initialization currently only for testibility
-		synchronized (GlobalConfiguration.class) {
-			if (SINGLETON == null) {
-				SINGLETON = new GlobalConfiguration();
-			}
-			return SINGLETON;
-		}
-	}
 
-	/**
-	 * The constructor used to construct the singleton instance of the global configuration.
-	 */
 	private GlobalConfiguration() {}
 
 	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Returns the value associated with the given key as a string.
-	 * 
-	 * @param key
-	 *        the key pointing to the associated value
-	 * @param defaultValue
-	 *        the default value which is returned in case there is no value associated with the given key
-	 * @return the (default) value associated with the given key
-	 */
-	public static String getString(String key, String defaultValue) {
-		return get().config.getString(key, defaultValue);
-	}
-
-	/**
-	 * Returns the value associated with the given key as a long integer.
-	 * 
-	 * @param key
-	 *        the key pointing to the associated value
-	 * @param defaultValue
-	 *        the default value which is returned in case there is no value associated with the given key
-	 * @return the (default) value associated with the given key
-	 */
-	public static long getLong(String key, long defaultValue) {
-		return get().config.getLong(key, defaultValue);
-	}
 
 	/**
-	 * Returns the value associated with the given key as an integer.
-	 * 
-	 * @param key
-	 *        the key pointing to the associated value
-	 * @param defaultValue
-	 *        the default value which is returned in case there is no value associated with the given key
-	 * @return the (default) value associated with the given key
-	 */
-	public static int getInteger(String key, int defaultValue) {
-		return get().config.getInteger(key, defaultValue);
-	}
-	
-	/**
-	 * Returns the value associated with the given key as a float.
-	 * 
-	 * @param key
-	 *        the key pointing to the associated value
-	 * @param defaultValue
-	 *        the default value which is returned in case there is no value associated with the given key
-	 * @return the (default) value associated with the given key
+	 * Loads the global configuration from the environment. Fails if an error occurs during loading. Returns an
+	 * empty configuration object if the environment variable is not set. In production this variable is set but
+	 * tests and local execution/debugging don't have this environment variable set. That's why we should fail
+	 * if it is not set.
+	 * @return Returns the Configuration
 	 */
-	public static float getFloat(String key, float defaultValue) {
-		return get().config.getFloat(key, defaultValue);
-	}
-
-	/**
-	 * Returns the value associated with the given key as a boolean.
-	 * 
-	 * @param key
-	 *        the key pointing to the associated value
-	 * @param defaultValue
-	 *        the default value which is returned in case there is no value associated with the given key
-	 * @return the (default) value associated with the given key
-	 */
-	public static boolean getBoolean(String key, boolean defaultValue) {
-		return get().config.getBoolean(key, defaultValue);
+	public static Configuration loadConfiguration() {
+		final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
+		if (configDir == null) {
+			return new Configuration();
+		}
+		return loadConfiguration(configDir);
 	}
 
 	/**
 	 * Loads the configuration files from the specified directory.
 	 * <p>
-	 * XML and YAML are supported as configuration files. If both XML and YAML files exist in the configuration
-	 * directory, keys from YAML will overwrite keys from XML.
+	 * YAML files are supported as configuration files.
 	 * 
 	 * @param configDir
 	 *        the directory which contains the configuration files
 	 */
-	public static void loadConfiguration(final String configDir) {
+	public static Configuration loadConfiguration(final String configDir) {
 
 		if (configDir == null) {
-			LOG.warn("Given configuration directory is null, cannot load configuration");
-			return;
+			throw new IllegalArgumentException("Given configuration directory is null, cannot load configuration");
 		}
 
 		final File confDirFile = new File(configDir);
 		if (!(confDirFile.exists())) {
-			LOG.warn("The given configuration directory name '" + configDir + "' (" + confDirFile.getAbsolutePath()
-				+ ") does not describe an existing directory.");
-			return;
+			throw new IllegalConfigurationException(
+				"The given configuration directory name '" + configDir +
+					"' (" + confDirFile.getAbsolutePath() + ") does not describe an existing directory.");
 		}
-		
-		if (confDirFile.isFile()) {
-			final File file = new File(configDir);
-			if(configDir.endsWith(".xml")) {
-				get().loadXMLResource( file );
-			} else if(configDir.endsWith(".yaml")) {
-				get().loadYAMLResource(file);
-			} else {
-				LOG.warn("The given configuration has an unknown extension.");
-				return;
-			}
-			return;
-		}
-
-		// get all XML and YAML files in the directory
-		final File[] xmlFiles = filterFilesBySuffix(confDirFile, ".xml");
-		final File[] yamlFiles = filterFilesBySuffix(confDirFile, new String[] { ".yaml", ".yml" });
 
-		if ((xmlFiles == null || xmlFiles.length == 0) && (yamlFiles == null || yamlFiles.length == 0)) {
-			LOG.warn("Unable to get the contents of the config directory '" + configDir + "' ("
-				+ confDirFile.getAbsolutePath() + ").");
-			return;
-		}
+		// get Flink yaml configuration file
+		final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);
 
-		// load config files and write into config map
-		for (File f : xmlFiles) {
-			get().loadXMLResource(f);
+		if (!yamlConfigFile.exists()) {
+			throw new IllegalConfigurationException(
+				"The Flink config file '" + yamlConfigFile +
+					"' (" + confDirFile.getAbsolutePath() + ") does not exist.");
 		}
 
-		// => if both XML and YAML files exist, the YAML config keys overwrite XML settings
-		for (File f : yamlFiles) {
-			get().loadYAMLResource(f);
-		}
+		return loadYAMLResource(yamlConfigFile);
 	}
 
 	/**
@@ -219,237 +113,46 @@ public final class GlobalConfiguration {
 	 * @param file the YAML file to read from
 	 * @see <a href="http://www.yaml.org/spec/1.2/spec.html">YAML 1.2 specification</a>
 	 */
-	private void loadYAMLResource(File file) {
-
-		synchronized (getClass()) {
-
-			BufferedReader reader = null;
-			try {
-				reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)));
-	
-				String line = null;
-				while ((line = reader.readLine()) != null) {
-	
-					// 1. check for comments
-					String[] comments = line.split("#", 2);
-					String conf = comments[0];
-	
-					// 2. get key and value
-					if (conf.length() > 0) {
-						String[] kv = conf.split(": ", 2);
-	
-						// skip line with no valid key-value pair
-						if (kv.length == 1) {
-							LOG.warn("Error while trying to split key and value in configuration file " + file + ": " + line);
-							continue;
-						}
-	
-						String key = kv[0].trim();
-						String value = kv[1].trim();
-						
-						// sanity check
-						if (key.length() == 0 || value.length() == 0) {
-							LOG.warn("Error after splitting key and value in configuration file " + file + ": " + line);
-							continue;
-						}
-	
-						LOG.debug("Loading configuration property: {}, {}", key, value);
-	
-						this.config.setString(key, value);
-					}
-				}
-			}
-			catch (IOException e) {
-				LOG.error("Error parsing YAML configuration.", e);
-			}
-			finally {
-				try {
-					if(reader != null) {
-						reader.close();
-					}
-				} catch (IOException e) {
-					LOG.warn("Cannot to close reader with IOException.", e);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Loads an XML document of key-values pairs.
-	 * 
-	 * @param file
-	 *        the XML document file
-	 */
-	private void loadXMLResource(File file) {
+	private static Configuration loadYAMLResource(File file) {
+		final Configuration config = new Configuration();
 
-		final DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
-		// Ignore comments in the XML file
-		docBuilderFactory.setIgnoringComments(true);
-		docBuilderFactory.setNamespaceAware(true);
+		try (BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))){
 
-		try {
+			String line;
+			while ((line = reader.readLine()) != null) {
 
-			final DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
-			Document doc;
-			Element root;
+				// 1. check for comments
+				String[] comments = line.split("#", 2);
+				String conf = comments[0];
 
-			doc = builder.parse(file);
-
-			if (doc == null) {
-				LOG.warn("Cannot load configuration: doc is null");
-				return;
-			}
+				// 2. get key and value
+				if (conf.length() > 0) {
+					String[] kv = conf.split(": ", 2);
 
-			root = doc.getDocumentElement();
-			if (root == null) {
-				LOG.warn("Cannot load configuration: root is null");
-				return;
-			}
-
-			if (!"configuration".equals(root.getNodeName())) {
-				return;
-			}
-
-			final NodeList props = root.getChildNodes();
-			int propNumber = -1;
-
-			synchronized (getClass()) {
-
-				for (int i = 0; i < props.getLength(); i++) {
-
-					final Node propNode = props.item(i);
-					String key = null;
-					String value = null;
-
-					// Ignore text at this point
-					if (propNode instanceof Text) {
+					// skip line with no valid key-value pair
+					if (kv.length == 1) {
+						LOG.warn("Error while trying to split key and value in configuration file " + file + ": " + line);
 						continue;
 					}
 
-					if (!(propNode instanceof Element)) {
-						continue;
-					}
-
-					Element property = (Element) propNode;
-					if (!"property".equals(property.getNodeName())) {
-						continue;
-					}
+					String key = kv[0].trim();
+					String value = kv[1].trim();
 
-					propNumber++;
-					final NodeList propChildren = property.getChildNodes();
-					if (propChildren == null) {
-						LOG.warn("Error while reading configuration: property has no children, skipping...");
+					// sanity check
+					if (key.length() == 0 || value.length() == 0) {
+						LOG.warn("Error after splitting key and value in configuration file " + file + ": " + line);
 						continue;
 					}
 
-					for (int j = 0; j < propChildren.getLength(); j++) {
-
-						final Node propChild = propChildren.item(j);
-						if (propChild instanceof Element) {
-							if ("key".equals(propChild.getNodeName())) {
-								if (propChild.getChildNodes() != null) {
-									if (propChild.getChildNodes().getLength() == 1) {
-										if (propChild.getChildNodes().item(0) instanceof Text) {
-											final Text t = (Text) propChild.getChildNodes().item(0);
-											key = t.getTextContent();
-										}
-									}
-								}
-							}
-
-							if ("value".equals(propChild.getNodeName())) {
-								if (propChild.getChildNodes() != null) {
-									if (propChild.getChildNodes().getLength() == 1) {
-										if (propChild.getChildNodes().item(0) instanceof Text) {
-											final Text t = (Text) propChild.getChildNodes().item(0);
-											value = t.getTextContent();
-										}
-									}
-								}
-							}
-						}
-					}
-
-					if (key != null && value != null) {
-						// Put key, value pair into the map
-						LOG.debug("Loading configuration property: {}, {}", key, value);
-						this.config.setString(key, value);
-					} else {
-						LOG.warn("Error while reading configuration: Cannot read property " + propNumber);
-					}
+					LOG.debug("Loading configuration property: {}, {}", key, value);
+					config.setString(key, value);
 				}
 			}
-
-		}
-		catch (Exception e) {
-			LOG.error("Cannot load configuration.", e);
+		} catch (IOException e) {
+			throw new RuntimeException("Error parsing YAML configuration.", e);
 		}
-	}
 
-	/**
-	 * Gets a {@link Configuration} object with the values of this GlobalConfiguration
-	 * 
-	 * @return the {@link Configuration} object including the key/value pairs
-	 */
-	public static Configuration getConfiguration() {
-		Configuration copy = new Configuration();
-		copy.addAll(get().config);
-		return copy;
-	}
-
-	/**
-	 * Merges the given {@link Configuration} object into the global
-	 * configuration. If a key/value pair with an identical already
-	 * exists in the global configuration, it is overwritten by the
-	 * pair of the {@link Configuration} object.
-	 * 
-	 * @param conf
-	 *        the {@link Configuration} object to merge into the global configuration
-	 */
-	public static void includeConfiguration(Configuration conf) {
-		get().includeConfigurationInternal(conf);
-	}
-
-	/**
-	 * Internal non-static method to include configuration.
-	 * 
-	 * @param conf
-	 *        the {@link Configuration} object to merge into the global configuration
-	 */
-	private void includeConfigurationInternal(Configuration conf) {
-		// static synchronized
-		synchronized (getClass()) {
-			this.config.addAll(conf);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Filters files in directory which have the specified suffix (e.g. ".xml").
-	 * 
-	 * @param dirToFilter
-	 *        directory to filter
-	 * @param suffix
-	 *        suffix to filter files by (e.g. ".xml")
-	 * @return files with given ending in directory
-	 */
-	private static File[] filterFilesBySuffix(final File dirToFilter, final String suffix) {
-		return filterFilesBySuffix(dirToFilter, new String[] { suffix });
+		return config;
 	}
 
-	private static File[] filterFilesBySuffix(final File dirToFilter, final String[] suffixes) {
-		return dirToFilter.listFiles(new FilenameFilter() {
-			@Override
-			public boolean accept(final File dir, final String name) {
-				for (String suffix : suffixes) {
-					if (dir.equals(dirToFilter) && name != null && name.endsWith(suffix)) {
-						return true;
-					}
-				}
-
-				return false;
-			}
-		});
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
index 90b366c..a7374e3 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java
@@ -41,7 +41,8 @@ public class BinaryInputFormatTest {
 			return record;
 		}
 	}
-	
+
+
 	@Test
 	public void testCreateInputSplitsWithOneFile() throws IOException {
 		// create temporary file with 3 blocks

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
index fac979e..be73798 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatSamplingTest.java
@@ -30,6 +30,7 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+
 public class DelimitedInputFormatSamplingTest {
 	
 	private static final String TEST_DATA1 = 
@@ -66,6 +67,8 @@ public class DelimitedInputFormatSamplingTest {
 	
 	private static final int DEFAULT_NUM_SAMPLES = 4;
 	
+	private static Configuration CONFIG;
+	
 	// ========================================================================
 	//  Setup
 	// ========================================================================
@@ -80,16 +83,17 @@ public class DelimitedInputFormatSamplingTest {
 		
 		try {
 			// make sure we do 4 samples
-			TestConfigUtils.loadGlobalConf(
+			CONFIG = TestConfigUtils.loadGlobalConf(
 				new String[] { ConfigConstants.DELIMITED_FORMAT_MIN_LINE_SAMPLES_KEY,
 								ConfigConstants.DELIMITED_FORMAT_MAX_LINE_SAMPLES_KEY },
 				new String[] { "4", "4" });
-			
-			TestDelimitedInputFormat.prepare();
+
+
 		} catch (Throwable t) {
 			Assert.fail("Could not load the global configuration.");
 		}
 	}
+
 	
 	// ========================================================================
 	//  Tests
@@ -101,7 +105,7 @@ public class DelimitedInputFormatSamplingTest {
 			final String tempFile = TestFileUtils.createTempFile(TEST_DATA1);
 			final Configuration conf = new Configuration();
 			
-			final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+			final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
 			format.setFilePath(tempFile.replace("file", "test"));
 			format.configure(conf);
 			
@@ -109,7 +113,7 @@ public class DelimitedInputFormatSamplingTest {
 			format.getStatistics(null);
 			Assert.assertEquals("Wrong number of samples taken.", DEFAULT_NUM_SAMPLES, TestFileSystem.getNumtimeStreamOpened());
 			
-			TestDelimitedInputFormat format2 = new TestDelimitedInputFormat();
+			TestDelimitedInputFormat format2 = new TestDelimitedInputFormat(CONFIG);
 			format2.setFilePath(tempFile.replace("file", "test"));
 			format2.setNumLineSamples(8);
 			format2.configure(conf);
@@ -130,7 +134,7 @@ public class DelimitedInputFormatSamplingTest {
 			final String tempFile = TestFileUtils.createTempFileDir(TEST_DATA1, TEST_DATA1, TEST_DATA1, TEST_DATA1);
 			final Configuration conf = new Configuration();
 			
-			final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+			final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
 			format.setFilePath(tempFile.replace("file", "test"));
 			format.configure(conf);
 			
@@ -138,7 +142,7 @@ public class DelimitedInputFormatSamplingTest {
 			format.getStatistics(null);
 			Assert.assertEquals("Wrong number of samples taken.", DEFAULT_NUM_SAMPLES, TestFileSystem.getNumtimeStreamOpened());
 			
-			TestDelimitedInputFormat format2 = new TestDelimitedInputFormat();
+			TestDelimitedInputFormat format2 = new TestDelimitedInputFormat(CONFIG);
 			format2.setFilePath(tempFile.replace("file", "test"));
 			format2.setNumLineSamples(8);
 			format2.configure(conf);
@@ -159,7 +163,7 @@ public class DelimitedInputFormatSamplingTest {
 			final String tempFile = TestFileUtils.createTempFile(TEST_DATA1);
 			final Configuration conf = new Configuration();
 			
-			final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+			final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
 			format.setFilePath(tempFile);
 			format.configure(conf);
 			BaseStatistics stats = format.getStatistics(null);
@@ -180,7 +184,7 @@ public class DelimitedInputFormatSamplingTest {
 			final String tempFile = TestFileUtils.createTempFileDir(TEST_DATA1, TEST_DATA2);
 			final Configuration conf = new Configuration();
 			
-			final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+			final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
 			format.setFilePath(tempFile);
 			format.configure(conf);
 			BaseStatistics stats = format.getStatistics(null);
@@ -212,7 +216,7 @@ public class DelimitedInputFormatSamplingTest {
 			final String tempFile = TestFileUtils.createTempFile(testData);
 			final Configuration conf = new Configuration();
 			
-			final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+			final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
 			format.setFilePath(tempFile);
 			format.setDelimiter(DELIMITER);
 			format.configure(conf);
@@ -235,7 +239,7 @@ public class DelimitedInputFormatSamplingTest {
 			final String tempFile = TestFileUtils.createTempFile(2 * ConfigConstants.DEFAULT_DELIMITED_FORMAT_MAX_SAMPLE_LEN);
 			final Configuration conf = new Configuration();
 			
-			final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+			final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
 			format.setFilePath(tempFile);
 			format.configure(conf);
 			
@@ -252,7 +256,7 @@ public class DelimitedInputFormatSamplingTest {
 			final String tempFile = TestFileUtils.createTempFile(TEST_DATA1);
 			final Configuration conf = new Configuration();
 			
-			final TestDelimitedInputFormat format = new TestDelimitedInputFormat();
+			final TestDelimitedInputFormat format = new TestDelimitedInputFormat(CONFIG);
 			format.setFilePath("test://" + tempFile);
 			format.configure(conf);
 			
@@ -260,7 +264,7 @@ public class DelimitedInputFormatSamplingTest {
 			BaseStatistics stats = format.getStatistics(null);
 			Assert.assertEquals("Wrong number of samples taken.", DEFAULT_NUM_SAMPLES, TestFileSystem.getNumtimeStreamOpened());
 			
-			final TestDelimitedInputFormat format2 = new TestDelimitedInputFormat();
+			final TestDelimitedInputFormat format2 = new TestDelimitedInputFormat(CONFIG);
 			format2.setFilePath("test://" + tempFile);
 			format2.configure(conf);
 			
@@ -274,21 +278,21 @@ public class DelimitedInputFormatSamplingTest {
 			Assert.fail(e.getMessage());
 		}
 	}
-	
+
 	// ========================================================================
 	//  Mocks
 	// ========================================================================
-	
+
 	private static final class TestDelimitedInputFormat extends DelimitedInputFormat<IntValue> {
 		private static final long serialVersionUID = 1L;
-		
+
+		TestDelimitedInputFormat(Configuration configuration) {
+			super(null, configuration);
+		}
+
 		@Override
 		public IntValue readRecord(IntValue reuse, byte[] bytes, int offset, int numBytes) {
 			throw new UnsupportedOperationException();
 		}
-		
-		public static void prepare() {
-			DelimitedInputFormat.loadGlobalConfigParams();
-		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
index 599a640..8a31099 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/DelimitedInputFormatTest.java
@@ -38,6 +38,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 
@@ -47,17 +48,18 @@ import org.junit.Test;
 
 public class DelimitedInputFormatTest {
 	
-	private final DelimitedInputFormat<String> format = new MyTextInputFormat();
+	private DelimitedInputFormat<String> format;
 	
 	// --------------------------------------------------------------------------------------------
 
 	@Before
 	public void setup() {
+		format = new MyTextInputFormat();
 		this.format.setFilePath(new Path("file:///some/file/that/will/not/be/read"));
 	}
 	
 	@After
-	public void setdown() throws Exception {
+	public void shutdown() throws Exception {
 		if (this.format != null) {
 			this.format.close();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
index 68465a3..1076338 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.testutils.TestFileUtils;
@@ -37,11 +38,12 @@ public class EnumerateNestedFilesTest {
 	protected Configuration config;
 	final String tempPath = System.getProperty("java.io.tmpdir");
 
-	private final DummyFileInputFormat format = new DummyFileInputFormat();
+	private DummyFileInputFormat format;
 
 	@Before
 	public void setup() {
 		this.config = new Configuration();
+		format = new DummyFileInputFormat();
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
index cc040b6..4a598f2 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileOutputFormatTest.java
@@ -22,13 +22,12 @@ package org.apache.flink.api.common.io;
 import java.io.File;
 import java.io.IOException;
 
-import org.junit.Assert;
-
 import org.apache.flink.api.common.io.FileOutputFormat.OutputDirectoryMode;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.types.IntValue;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.fail;
@@ -38,11 +37,8 @@ public class FileOutputFormatTest {
 	@Test
 	public void testCreateNonParallelLocalFS() throws IOException {
 
-		File tmpOutPath = null;
-		File tmpOutFile = null;
-
-		tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
-		tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
+		File tmpOutPath = File.createTempFile("fileOutputFormatTest", "Test1");
+		File tmpOutFile = new File(tmpOutPath.getAbsolutePath()+"/1");
 
 		String tmpFilePath = tmpOutPath.toURI().toString();
 
@@ -652,8 +648,10 @@ public class FileOutputFormatTest {
 	// -------------------------------------------------------------------------------------------
 	
 	public static class DummyFileOutputFormat extends FileOutputFormat<IntValue> {
+
 		private static final long serialVersionUID = 1L;
 		public boolean testFileName = false;
+
 		@Override
 		public void writeRecord(IntValue record) throws IOException {
 			// DO NOTHING

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
index ae0f8e5..c3cbb58 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
@@ -52,4 +52,5 @@ public class RichInputFormatTest {
 		assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
 		assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
index 296af11..4c303a6 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
@@ -52,4 +52,5 @@ public class RichOutputFormatTest {
 		assertEquals(inputFormat.getRuntimeContext().getIndexOfThisSubtask(), 1);
 		assertEquals(inputFormat.getRuntimeContext().getNumberOfParallelSubtasks(),3);
 	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
index 3e1a723..0cf5e32 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/FilesystemSchemeConfigTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.configuration;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.File;
@@ -38,15 +37,6 @@ import static org.junit.Assert.fail;
 
 public class FilesystemSchemeConfigTest {
 
-	@Before
-	public void resetSingleton() throws SecurityException, NoSuchFieldException, IllegalArgumentException,
-		IllegalAccessException {
-		// reset GlobalConfiguration between tests
-		Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON");
-		instance.setAccessible(true);
-		instance.set(null, null);
-	}
-	
 	@Test
 	public void testExplicitFilesystemScheme() {
 		testSettingFilesystemScheme(false, "fs.default-scheme: otherFS://localhost:1234/", true);
@@ -65,7 +55,12 @@ public class FilesystemSchemeConfigTest {
 	private void testSettingFilesystemScheme(boolean useDefaultScheme,
 											String configFileScheme, boolean useExplicitScheme) {
 		final File tmpDir = getTmpDir();
-		final File confFile = createRandomFile(tmpDir, ".yaml");
+		final File confFile = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME);
+		try {
+			confFile.createNewFile();
+		} catch (IOException e) {
+			throw new RuntimeException("Couldn't create file", e);
+		}
 		final File testFile = new File(tmpDir.getAbsolutePath() + File.separator + "testing.txt");
 
 		try {
@@ -83,8 +78,7 @@ public class FilesystemSchemeConfigTest {
 				fail(e.getMessage());
 			}
 
-			GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
-			Configuration conf = GlobalConfiguration.getConfiguration();
+			Configuration conf = GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
 
 			try {
 				FileSystem.setDefaultScheme(conf);

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
index ce55d2e..6336a73 100644
--- a/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
+++ b/flink-core/src/test/java/org/apache/flink/configuration/GlobalConfigurationTest.java
@@ -19,114 +19,58 @@
 package org.apache.flink.configuration;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotNull;
 
 import java.io.File;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.io.PrintWriter;
-import java.lang.reflect.Field;
 import java.util.UUID;
 
-import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.util.TestLogger;
-import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 /**
  * This class contains tests for the global configuration (parsing configuration directory information).
  */
 public class GlobalConfigurationTest extends TestLogger {
 
-	@Before
-	public void resetSingleton() throws SecurityException, NoSuchFieldException, IllegalArgumentException,
-			IllegalAccessException {
-		// reset GlobalConfiguration between tests
-		Field instance = GlobalConfiguration.class.getDeclaredField("SINGLETON");
-		instance.setAccessible(true);
-		instance.set(null, null);
-	}
-	
-	@Test
-	public void testConfigurationMixed() {
-		File tmpDir = getTmpDir();
-		File confFile1 = createRandomFile(tmpDir, ".yaml");
-		File confFile2 = createRandomFile(tmpDir, ".xml");
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
 
-		try {
-			try {
-				PrintWriter pw1 = new PrintWriter(confFile1);
-				PrintWriter pw2 = new PrintWriter(confFile2);
-				
-				pw1.println("mykey1: myvalue1_YAML");
-				pw1.println("mykey2: myvalue2");
-				
-				pw2.println("<configuration>");
-				pw2.println("<property><key>mykey1</key><value>myvalue1_XML</value></property>");
-				pw2.println("<property><key>mykey3</key><value>myvalue3</value></property>");
-				pw2.println("</configuration>");
-				
-				pw1.close();
-				pw2.close();
-			} catch (FileNotFoundException e) {
-				e.printStackTrace();
-			}
-			
-			GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
-			Configuration conf = GlobalConfiguration.getConfiguration();
-			
-			// all distinct keys from confFile1 + confFile2key
-			assertEquals(3, conf.keySet().size());
-			
-			// keys 1, 2, 3 should be OK and match the expected values
-			// => configuration keys from YAML should overwrite keys from XML
-			assertEquals("myvalue1_YAML", conf.getString("mykey1", null));
-			assertEquals("myvalue2", conf.getString("mykey2", null));
-			assertEquals("myvalue3", conf.getString("mykey3", null));
-		} finally {
-			confFile1.delete();
-			confFile2.delete();
-			tmpDir.delete();
-		}
-	}
-	
 	@Test
 	public void testConfigurationYAML() {
-		File tmpDir = getTmpDir();
-		File confFile1 = createRandomFile(tmpDir, ".yaml");
-		File confFile2 = createRandomFile(tmpDir, ".yml");
+		File tmpDir = tempFolder.getRoot();
+		File confFile = new File(tmpDir, GlobalConfiguration.FLINK_CONF_FILENAME);
 
 		try {
-			try {
-				PrintWriter pw1 = new PrintWriter(confFile1);
-				PrintWriter pw2 = new PrintWriter(confFile2);
-
-				pw1.println("###########################"); // should be skipped
-				pw1.println("# Some : comments : to skip"); // should be skipped
-				pw1.println("###########################"); // should be skipped
-				pw1.println("mykey1: myvalue1"); // OK, simple correct case
-				pw1.println("mykey2       : myvalue2"); // OK, whitespace before colon is correct
-				pw1.println("mykey3:myvalue3"); // SKIP, missing white space after colon
-				pw1.println(" some nonsense without colon and whitespace separator"); // SKIP
-				pw1.println(" :  "); // SKIP
-				pw1.println("   "); // SKIP
-				pw1.println("mykey4: myvalue4# some comments"); // OK, skip comments only
-				pw1.println("   mykey5    :    myvalue5    "); // OK, trim unnecessary whitespace
-				pw1.println("mykey6: my: value6"); // OK, only use first ': ' as separator
-				pw1.println("mykey7: "); // SKIP, no value provided
-				pw1.println(": myvalue8"); // SKIP, no key provided
-
-				pw2.println("mykey9: myvalue9"); // OK
-				pw2.println("mykey9: myvalue10"); // OK, overwrite last value
-
-				pw1.close();
-				pw2.close();
+			try (final PrintWriter pw = new PrintWriter(confFile)) {
+
+				pw.println("###########################"); // should be skipped
+				pw.println("# Some : comments : to skip"); // should be skipped
+				pw.println("###########################"); // should be skipped
+				pw.println("mykey1: myvalue1"); // OK, simple correct case
+				pw.println("mykey2       : myvalue2"); // OK, whitespace before colon is correct
+				pw.println("mykey3:myvalue3"); // SKIP, missing white space after colon
+				pw.println(" some nonsense without colon and whitespace separator"); // SKIP
+				pw.println(" :  "); // SKIP
+				pw.println("   "); // SKIP
+				pw.println("mykey4: myvalue4# some comments"); // OK, skip comments only
+				pw.println("   mykey5    :    myvalue5    "); // OK, trim unnecessary whitespace
+				pw.println("mykey6: my: value6"); // OK, only use first ': ' as separator
+				pw.println("mykey7: "); // SKIP, no value provided
+				pw.println(": myvalue8"); // SKIP, no key provided
+
+				pw.println("mykey9: myvalue9"); // OK
+				pw.println("mykey9: myvalue10"); // OK, overwrite last value
+
 			} catch (FileNotFoundException e) {
 				e.printStackTrace();
 			}
 
-			GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
-			Configuration conf = GlobalConfiguration.getConfiguration();
+			Configuration conf = GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
 
 			// all distinct keys from confFile1 + confFile2 key
 			assertEquals(6, conf.keySet().size());
@@ -142,83 +86,36 @@ public class GlobalConfigurationTest extends TestLogger {
 			assertEquals("null", conf.getString("mykey8", "null"));
 			assertEquals("myvalue10", conf.getString("mykey9", null));
 		} finally {
-			confFile1.delete();
-			confFile2.delete();
+			confFile.delete();
 			tmpDir.delete();
 		}
 	}
 
-	/**
-	 * This test creates several configuration files with values and cross-checks the resulting
-	 * {@link GlobalConfiguration} object.
-	 */
-	@Test
-	public void testConfigurationXML() {
-
-		// Create temporary directory for configuration files
-		final File tmpDir = getTmpDir();
-		final File confFile1 = createRandomFile(tmpDir, ".xml");
-		final File confFile2 = createRandomFile(tmpDir, ".xml");
-
-		try {
-			try {
-				final PrintWriter pw1 = new PrintWriter(confFile1);
-				final PrintWriter pw2 = new PrintWriter(confFile2);
-
-				pw1.append("<configuration>");
-				pw2.append("<configuration>");
-
-				pw1.append("<property><key>mykey1</key><value>myvalue1</value></property>");
-				pw1.append("<property></property>");
-				pw1.append("<property><key></key><value></value></property>");
-				pw1.append("<property><key>hello</key><value></value></property>");
-				pw1.append("<property><key>mykey2</key><value>myvalue2</value></property>");
-				pw2.append("<property><key>mykey3</key><value>myvalue3</value></property>");
-				pw2.append("<property><key>mykey4</key><value>myvalue4</value></property>");
-
-				pw1.append("</configuration>");
-				pw2.append("</configuration>");
-				pw1.close();
-				pw2.close();
-			} catch (FileNotFoundException e) {
-				fail(e.getMessage());
-			}
-
-			GlobalConfiguration.loadConfiguration(tmpDir.getAbsolutePath());
+	@Test(expected = IllegalArgumentException.class)
+	public void testFailIfNull() {
+		GlobalConfiguration.loadConfiguration(null);
+	}
 
-			final Configuration co = GlobalConfiguration.getConfiguration();
+	@Test(expected = IllegalConfigurationException.class)
+	public void testFailIfNotLoaded() {
+		GlobalConfiguration.loadConfiguration("/some/path/" + UUID.randomUUID());
+	}
 
-			assertEquals(co.getString("mykey1", "null"), "myvalue1");
-			assertEquals(co.getString("mykey2", "null"), "myvalue2");
-			assertEquals(co.getString("mykey3", "null"), "myvalue3");
-			assertEquals(co.getString("mykey4", "null"), "myvalue4");
+	@Test(expected = IllegalConfigurationException.class)
+	public void testInvalidConfiguration() throws IOException {
+		GlobalConfiguration.loadConfiguration(tempFolder.getRoot().getAbsolutePath());
+	}
 
-			// // Test (wrong) string-to integer conversion. should return default value.
-			// semantics are changed to throw an exception upon invalid parsing!
-			// assertEquals(co.getInteger("mykey1", 500), 500);
-			// assertEquals(co.getInteger("anything", 500), 500);
-			// assertEquals(co.getBoolean("notexistent", true), true);
+	@Test
+	// We allow malformed YAML files
+	public void testInvalidYamlFile() throws IOException {
+		final File confFile = tempFolder.newFile(GlobalConfiguration.FLINK_CONF_FILENAME);
 
-			// Test include local configuration
-			final Configuration newconf = new Configuration();
-			newconf.setInteger("mynewinteger", 1000);
-			GlobalConfiguration.includeConfiguration(newconf);
-			assertEquals(GlobalConfiguration.getInteger("mynewinteger", 0), 1000);
-		} finally {
-			// Remove temporary files
-			confFile1.delete();
-			confFile2.delete();
-			tmpDir.delete();
+		try (PrintWriter pw = new PrintWriter(confFile);) {
+			pw.append("invalid");
 		}
-	}
 
-	private File getTmpDir() {
-		File tmpDir = new File(CommonTestUtils.getTempDir(), UUID.randomUUID().toString());
-		assertTrue(tmpDir.mkdirs());
-		return tmpDir;
+		assertNotNull(GlobalConfiguration.loadConfiguration(tempFolder.getRoot().getAbsolutePath()));
 	}
 
-	private File createRandomFile(File path, String suffix) {
-		return new File(path, UUID.randomUUID().toString() + suffix);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java
index 6096f69..d34f20a 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestConfigUtils.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 
 /**
@@ -30,20 +31,20 @@ import org.apache.flink.configuration.GlobalConfiguration;
  */
 public final class TestConfigUtils {
 	
-	public static void loadGlobalConf(String[] keys, String[] values) throws IOException {
-		loadGlobalConf(getConfAsString(keys, values));
+	public static Configuration loadGlobalConf(String[] keys, String[] values) throws IOException {
+		return loadGlobalConf(getConfAsString(keys, values));
 	}
 	
-	public static void loadGlobalConf(String contents) throws IOException {
+	public static Configuration loadGlobalConf(String contents) throws IOException {
 		final File tempDir = new File(System.getProperty("java.io.tmpdir"));
-		File confDir = null;
+		File confDir;
 		do {
 			confDir = new File(tempDir, TestFileUtils.randomFileName());
 		} while (confDir.exists());
 		
 		try {
 			confDir.mkdirs();
-			final File confFile = new File(confDir, "tempConfig.xml");
+			final File confFile = new File(confDir, GlobalConfiguration.FLINK_CONF_FILENAME);
 		
 			try {
 				BufferedWriter writer = new BufferedWriter(new FileWriter(confFile));
@@ -52,7 +53,7 @@ public final class TestConfigUtils {
 				} finally {
 					writer.close();
 				}
-				GlobalConfiguration.loadConfiguration(confDir.getAbsolutePath());
+				return GlobalConfiguration.loadConfiguration(confDir.getAbsolutePath());
 			} finally {
 				confFile.delete();
 			}
@@ -61,25 +62,25 @@ public final class TestConfigUtils {
 			confDir.delete();
 		}
 	}
-	
+
 	public static String getConfAsString(String[] keys, String[] values) {
 		if (keys == null || values == null || keys.length != values.length) {
 			throw new IllegalArgumentException();
 		}
-		
+
 		StringBuilder bld = new StringBuilder();
-		bld.append("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n<configuration>\n");
-		
+
 		for (int i = 0; i < keys.length; i++) {
-			bld.append("<property>\n<key>").append(keys[i]).append("</key>\n");
-			bld.append("<value>").append(values[i]).append("</value>\n</property>\n");
+			bld.append(keys[i]);
+			bld.append(": ");
+			bld.append(values[i]);
+			bld.append(System.lineSeparator());
 		}
-		bld.append("</configuration>\n");
 		return bld.toString();
 	}
 
 	// ------------------------------------------------------------------------
-	
+
 	private TestConfigUtils() {}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
index ab4e993..7c41eaf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/utils/HadoopUtils.java
@@ -104,13 +104,17 @@ public final class HadoopUtils {
 	 * This method is public because its being used in the HadoopDataSource.
 	 */
 	public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
+
+		org.apache.flink.configuration.Configuration flinkConfiguration =
+			GlobalConfiguration.loadConfiguration();
+
 		Configuration retConf = new org.apache.hadoop.conf.Configuration();
 
 		// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
 		// the hdfs configuration
 		// Try to load HDFS configuration from Hadoop's own configuration files
 		// 1. approach: Flink configuration
-		final String hdfsDefaultPath = GlobalConfiguration.getString(ConfigConstants
+		final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants
 				.HDFS_DEFAULT_CONFIG, null);
 		if (hdfsDefaultPath != null) {
 			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
@@ -118,7 +122,7 @@ public final class HadoopUtils {
 			LOG.debug("Cannot find hdfs-default configuration file");
 		}
 
-		final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
+		final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
 		if (hdfsSitePath != null) {
 			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
 		} else {
@@ -127,7 +131,7 @@ public final class HadoopUtils {
 
 		// 2. Approach environment variables
 		String[] possibleHadoopConfPaths = new String[4];
-		possibleHadoopConfPaths[0] = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+		possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
 		possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
 
 		if (System.getenv("HADOOP_HOME") != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
index b219de4..52fd734 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/utils/HadoopUtils.java
@@ -37,12 +37,14 @@ public final class HadoopUtils {
 	/**
 	 * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration.
 	 */
-	public static void mergeHadoopConf(Configuration configuration) {
-		Configuration hadoopConf = org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration();
-		
+	public static void mergeHadoopConf(Configuration hadoopConfig) {
+
+		Configuration hadoopConf =
+			org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils.getHadoopConfiguration();
+
 		for (Map.Entry<String, String> e : hadoopConf) {
-			if (configuration.get(e.getKey()) == null) {
-				configuration.set(e.getKey(), e.getValue());
+			if (hadoopConfig.get(e.getKey()) == null) {
+				hadoopConfig.set(e.getKey(), e.getValue());
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index 75b82cd..05ed6fa 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -46,12 +46,12 @@ public class PrimitiveInputFormat<OT> extends DelimitedInputFormat<OT> {
 
 
 	public PrimitiveInputFormat(Path filePath, Class<OT> primitiveClass) {
-		super(filePath);
+		super(filePath, null);
 		this.primitiveClass = primitiveClass;
 	}
 
 	public PrimitiveInputFormat(Path filePath, String delimiter, Class<OT> primitiveClass) {
-		super(filePath);
+		super(filePath, null);
 		this.primitiveClass = primitiveClass;
 		this.setDelimiter(delimiter);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
index d6a02f1..b2554bf 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextInputFormat.java
@@ -50,7 +50,7 @@ public class TextInputFormat extends DelimitedInputFormat<String> {
 	// --------------------------------------------------------------------------------------------
 	
 	public TextInputFormat(Path filePath) {
-		super(filePath);
+		super(filePath, null);
 	}
 	
 	// --------------------------------------------------------------------------------------------	

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
index a0d20d6..45a2e3e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/TextValueInputFormat.java
@@ -50,7 +50,7 @@ public class TextValueInputFormat extends DelimitedInputFormat<StringValue> {
 	// --------------------------------------------------------------------------------------------
 	
 	public TextValueInputFormat(Path filePath) {
-		super(filePath);
+		super(filePath, null);
 	}
 	
 	// --------------------------------------------------------------------------------------------	

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
index a6cbfa8..d55b9d4 100644
--- a/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
+++ b/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/PythonPlanBinder.java
@@ -73,8 +73,10 @@ public class PythonPlanBinder {
 	public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3";
 	public static final String PLANBINDER_CONFIG_BCVAR_COUNT = "PLANBINDER_BCVAR_COUNT";
 	public static final String PLANBINDER_CONFIG_BCVAR_NAME_PREFIX = "PLANBINDER_BCVAR_";
-	public static String FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
-	public static String FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
+	public static String FLINK_PYTHON2_BINARY_PATH =
+		GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON2_BINARY_KEY, "python");
+	public static String FLINK_PYTHON3_BINARY_PATH =
+		GlobalConfiguration.loadConfiguration().getString(FLINK_PYTHON3_BINARY_KEY, "python3");
 
 	private static final Random r = new Random();
 
@@ -113,8 +115,9 @@ public class PythonPlanBinder {
 	}
 
 	public PythonPlanBinder() throws IOException {
-		FLINK_PYTHON2_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON2_BINARY_KEY, "python");
-		FLINK_PYTHON3_BINARY_PATH = GlobalConfiguration.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
+		Configuration conf = GlobalConfiguration.loadConfiguration();
+		FLINK_PYTHON2_BINARY_PATH = conf.getString(FLINK_PYTHON2_BINARY_KEY, "python");
+		FLINK_PYTHON3_BINARY_PATH = conf.getString(FLINK_PYTHON3_BINARY_KEY, "python3");
 		FULL_PATH = FLINK_DIR != null
 				//command-line
 				? FLINK_DIR + FLINK_PYTHON_REL_LOCAL_PATH

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 12c5dfc..5ab1fbf 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
 import org.apache.flink.api.common.distributions.DataDistribution;
 import org.apache.flink.api.common.operators.util.UserCodeWrapper;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.optimizer.CompilerException;
 import org.apache.flink.optimizer.dag.TempMode;
 import org.apache.flink.optimizer.plan.BulkIterationPlanNode;
@@ -49,7 +50,6 @@ import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
 import org.apache.flink.optimizer.plan.WorksetPlanNode;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.optimizer.util.Utils;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -107,10 +107,11 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 	
 	public static final String MERGE_ITERATION_AUX_TASKS_KEY = "compiler.merge-iteration-aux";
 	
-	private static final boolean mergeIterationAuxTasks = GlobalConfiguration.getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false);
-	
+	private static final boolean mergeIterationAuxTasks =
+		GlobalConfiguration.loadConfiguration().getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false);
+
 	private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null, null);
-	
+
 	// ------------------------------------------------------------------------
 
 	private Map<PlanNode, JobVertex> vertices; // a map from optimizer nodes to job vertices
@@ -156,7 +157,6 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		this.useLargeRecordHandler = config.getBoolean(
 				ConfigConstants.USE_LARGE_RECORD_HANDLER_KEY,
 				ConfigConstants.DEFAULT_USE_LARGE_RECORD_HANDLER);
-
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
index 4e05ebe..5d7173b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopFileSystem.java
@@ -176,20 +176,24 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 	 * This method is public because its being used in the HadoopDataSource.
 	 */
 	public static org.apache.hadoop.conf.Configuration getHadoopConfiguration() {
+
+		org.apache.flink.configuration.Configuration flinkConfiguration =
+			GlobalConfiguration.loadConfiguration();
+
 		Configuration retConf = new org.apache.hadoop.conf.Configuration();
 
 		// We need to load both core-site.xml and hdfs-site.xml to determine the default fs path and
 		// the hdfs configuration
 		// Try to load HDFS configuration from Hadoop's own configuration files
 		// 1. approach: Flink configuration
-		final String hdfsDefaultPath = GlobalConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
+		final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null);
 		if (hdfsDefaultPath != null) {
 			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath));
 		} else {
 			LOG.debug("Cannot find hdfs-default configuration file");
 		}
 
-		final String hdfsSitePath = GlobalConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
+		final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null);
 		if (hdfsSitePath != null) {
 			retConf.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath));
 		} else {
@@ -198,7 +202,7 @@ public final class HadoopFileSystem extends FileSystem implements HadoopFileSyst
 		
 		// 2. Approach environment variables
 		String[] possibleHadoopConfPaths = new String[4]; 
-		possibleHadoopConfPaths[0] = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+		possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
 		possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR");
 		
 		if (System.getenv("HADOOP_HOME") != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index f14a37f..84d38c1 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2350,8 +2350,7 @@ object JobManager {
     }
 
     LOG.info("Loading configuration from " + configDir)
-    GlobalConfiguration.loadConfiguration(configDir)
-    val configuration = GlobalConfiguration.getConfiguration()
+    val configuration = GlobalConfiguration.loadConfiguration(configDir)
 
     try {
       FileSystem.setDefaultScheme(configuration)

http://git-wip-us.apache.org/repos/asf/flink/blob/5eb0e38f/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a7dd789..226fa75 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1526,7 +1526,6 @@ object TaskManager {
     val conf: Configuration = try {
       LOG.info("Loading configuration from " + cliConfig.getConfigDir())
       GlobalConfiguration.loadConfiguration(cliConfig.getConfigDir())
-      GlobalConfiguration.getConfiguration()
     }
     catch {
       case e: Exception => throw new Exception("Could not load configuration", e)


Mime
View raw message