flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/3] flink git commit: [FLINK-3352] Use HDFS Config in RocksDB Copy Utilities
Date Fri, 12 Feb 2016 21:59:40 GMT
[FLINK-3352] Use HDFS Config in RocksDB Copy Utilities

This also moves the utilities (HDFSCopyFromLocal and HDFSCopyToLocal) to
the RocksDB package because we would need a HDFS dependency in
flink-core otherwise.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f5d71909
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f5d71909
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f5d71909

Branch: refs/heads/master
Commit: f5d719096083504a0b5827e35c1d28e1180e5e1d
Parents: 31310cd
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Thu Feb 11 19:07:18 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Fri Feb 12 22:59:14 2016 +0100

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |  63 ++++++---
 .../streaming/state/HDFSCopyFromLocal.java      |  57 ++++++++
 .../streaming/state/HDFSCopyToLocal.java        |  58 ++++++++
 .../streaming/state/RocksDBListState.java       |   4 +-
 .../streaming/state/RocksDBReducingState.java   |   6 +-
 .../streaming/state/RocksDBValueState.java      |   4 +-
 .../streaming/state/HDFSCopyUtilitiesTest.java  | 140 +++++++++++++++++++
 .../apache/flink/util/HDFSCopyFromLocal.java    |  48 -------
 .../org/apache/flink/util/HDFSCopyToLocal.java  |  49 -------
 9 files changed, 309 insertions(+), 120 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f5d71909/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index 6e4adf5..76f05d6 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -24,12 +24,11 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
 import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 
-import org.apache.flink.util.HDFSCopyFromLocal;
-import org.apache.flink.util.HDFSCopyToLocal;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -46,7 +45,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.util.UUID;
@@ -73,6 +74,8 @@ import static java.util.Objects.requireNonNull;
 public abstract class AbstractRocksDBState<K, N, S extends State, SD extends StateDescriptor<S,
?>>
 	implements KvState<K, N, S, SD, RocksDBStateBackend>, State {
 
+	private static final String HADOOP_CONF_NAME = "hadoop-conf.binary";
+
 	private static final Logger LOG = LoggerFactory.getLogger(AbstractRocksDBState.class);
 
 	/** Serializer for the keys */
@@ -96,10 +99,15 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 	/** Directory in "basePath" where the actual RocksDB data base instance stores its files
*/
 	protected final File rocksDbPath;
 
+	/**
+	 * File where we store a serialized Hadoop Configuration for use by the external process
+	 * HDFS copy utilities.
+	 */
+	protected File hadoopConfPath;
+
 	/** Our RocksDB instance */
 	protected final RocksDB db;
 
-
 	/**
 	 * Creates a new RocksDB backed state.
 	 *
@@ -115,6 +123,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 			Options options) {
 
 		rocksDbPath = new File(basePath, "db" + UUID.randomUUID().toString());
+		hadoopConfPath = new File(basePath, HADOOP_CONF_NAME);
 
 		this.keySerializer = requireNonNull(keySerializer);
 		this.namespaceSerializer = namespaceSerializer;
@@ -144,6 +153,8 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 		} catch (RocksDBException e) {
 			throw new RuntimeException("Error while opening RocksDB instance.", e);
 		}
+
+		writeHadoopConfig(hadoopConfPath);
 	}
 
 	/**
@@ -164,6 +175,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 			Options options) {
 
 		rocksDbPath = new File(basePath, "db" + UUID.randomUUID().toString());
+		hadoopConfPath = new File(basePath, HADOOP_CONF_NAME);
 
 		RocksDB.loadLibrary();
 
@@ -205,6 +217,23 @@ public abstract class AbstractRocksDBState<K, N, S extends State,
SD extends Sta
 		} catch (RocksDBException e) {
 			throw new RuntimeException("Error while opening RocksDB instance.", e);
 		}
+
+		writeHadoopConfig(hadoopConfPath);
+	}
+
+	private static void writeHadoopConfig(File hadoopConfPath) {
+		Configuration conf = HadoopFileSystem.getHadoopConfiguration();
+
+		if (hadoopConfPath.exists()) {
+			if (!hadoopConfPath.delete()) {
+				throw new RuntimeException("Error deleting existing Hadoop configuration: " + hadoopConfPath);
+			}
+		}
+		try (DataOutputStream out = new DataOutputStream(new FileOutputStream(hadoopConfPath)))
{
+			conf.write(out);
+		} catch (IOException e) {
+			LOG.error("Error writing Hadoop Configuration.", e);
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -285,7 +314,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 		// ------------------------------------------------------------------------
 
 		/** Store it so that we can clean up in dispose() */
-		protected final File dbPath;
+		protected final File basePath;
 
 		/** Where we should put RocksDB backups */
 		protected final String checkpointPath;
@@ -314,7 +343,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 		/**
 		 * Creates a new snapshot from the given state parameters.
 		 */
-		public AbstractRocksDBSnapshot(File dbPath,
+		public AbstractRocksDBSnapshot(File basePath,
 				String checkpointPath,
 				URI backupUri,
 				long checkpointId,
@@ -322,7 +351,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 				TypeSerializer<N> namespaceSerializer,
 				SD stateDesc) {
 			
-			this.dbPath = dbPath;
+			this.basePath = basePath;
 			this.checkpointPath = checkpointPath;
 			this.backupUri = backupUri;
 			this.checkpointId = checkpointId;
@@ -339,7 +368,7 @@ public abstract class AbstractRocksDBState<K, N, S extends State, SD
extends Sta
 				TypeSerializer<K> keySerializer,
 				TypeSerializer<N> namespaceSerializer,
 				SD stateDesc,
-				File dbPath,
+				File basePath,
 				String backupPath,
 				String restorePath,
 				Options options) throws Exception;
@@ -360,13 +389,13 @@ public abstract class AbstractRocksDBState<K, N, S extends State,
SD extends Sta
 						"now is (" + keySerializer + ")");
 			}
 
-			if (!dbPath.exists()) {
-				if (!dbPath.mkdirs()) {
-					throw new RuntimeException("Could not create RocksDB base path " + dbPath);
+			if (!basePath.exists()) {
+				if (!basePath.mkdirs()) {
+					throw new RuntimeException("Could not create RocksDB base path " + basePath);
 				}
 			}
 
-			final File localBackupPath = new File(dbPath, "chk-" + checkpointId);
+			final File localBackupPath = new File(basePath, "chk-" + checkpointId);
 
 			if (localBackupPath.exists()) {
 				try {
@@ -377,14 +406,16 @@ public abstract class AbstractRocksDBState<K, N, S extends State,
SD extends Sta
 				}
 			}
 
-			HDFSCopyToLocal.copyToLocal(backupUri, dbPath);
-			return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, dbPath, 
+			writeHadoopConfig(new File(basePath, HADOOP_CONF_NAME));
+
+			HDFSCopyToLocal.copyToLocal(new File(basePath, HADOOP_CONF_NAME), backupUri, basePath);
+			return createRocksDBState(keySerializer, namespaceSerializer, stateDesc, basePath,
 					checkpointPath, localBackupPath.getAbsolutePath(), stateBackend.getRocksDBOptions());
 		}
 
 		@Override
 		public final void discardState() throws Exception {
-			FileSystem fs = FileSystem.get(backupUri, new Configuration());
+			FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
 			fs.delete(new Path(backupUri), true);
 		}
 
@@ -419,10 +450,10 @@ public abstract class AbstractRocksDBState<K, N, S extends State,
SD extends Sta
 		@Override
 		public KvStateSnapshot<K, N, S, SD, RocksDBStateBackend> materialize() throws Exception
{
 			try {
-				HDFSCopyFromLocal.copyFromLocal(localBackupPath, backupUri);
+				HDFSCopyFromLocal.copyFromLocal(state.hadoopConfPath, localBackupPath, backupUri);
 				return state.createRocksDBSnapshot(backupUri, checkpointId);
 			} catch (Exception e) {
-				FileSystem fs = FileSystem.get(backupUri, new Configuration());
+				FileSystem fs = FileSystem.get(backupUri, HadoopFileSystem.getHadoopConfiguration());
 				fs.delete(new Path(backupUri), true);
 				throw e;
 			} finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/f5d71909/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/HDFSCopyFromLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/HDFSCopyFromLocal.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/HDFSCopyFromLocal.java
new file mode 100644
index 0000000..a6e3dd4
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/HDFSCopyFromLocal.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.ExternalProcessRunner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.URI;
+
+/**
+ * Utility for copying from local file system to a HDFS {@link FileSystem} in an external
process.
+ * This is required since {@code FileSystem.copyFromLocalFile} does not like being interrupted.
+ */
+public class HDFSCopyFromLocal {
+	public static void main(String[] args) throws Exception {
+		String hadoopConfPath = args[0];
+		String localBackupPath = args[1];
+		String backupUri = args[2];
+
+		Configuration hadoopConf = new Configuration();
+		try (DataInputStream in = new DataInputStream(new FileInputStream(hadoopConfPath))) {
+			hadoopConf.readFields(in);
+		}
+
+		FileSystem fs = FileSystem.get(new URI(backupUri), hadoopConf);
+
+		fs.copyFromLocalFile(new Path(localBackupPath), new Path(backupUri));
+	}
+
+	public static void copyFromLocal(File hadoopConfPath, File localPath, URI remotePath) throws
Exception {
+		ExternalProcessRunner processRunner = new ExternalProcessRunner(HDFSCopyFromLocal.class.getName(),
+			new String[]{hadoopConfPath.getAbsolutePath(), localPath.getAbsolutePath(), remotePath.toString()});
+		if (processRunner.run() != 0) {
+			throw new  RuntimeException("Error while copying to remote FileSystem: " + processRunner.getErrorOutput());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5d71909/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/HDFSCopyToLocal.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/HDFSCopyToLocal.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/HDFSCopyToLocal.java
new file mode 100644
index 0000000..003f65f
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/HDFSCopyToLocal.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.util.ExternalProcessRunner;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.net.URI;
+
+/**
+ * Utility for copying from a HDFS {@link FileSystem} to the local file system in an external
+ * process. This is required since {@code FileSystem.copyToLocalFile} does not like being
+ * interrupted.
+ */
+public class HDFSCopyToLocal {
+	public static void main(String[] args) throws Exception {
+		String hadoopConfPath = args[0];
+		String backupUri = args[1];
+		String dbPath = args[2];
+
+		Configuration hadoopConf = new Configuration();
+		try (DataInputStream in = new DataInputStream(new FileInputStream(hadoopConfPath))) {
+			hadoopConf.readFields(in);
+		}
+
+		FileSystem fs = FileSystem.get(new URI(backupUri), hadoopConf);
+
+		fs.copyToLocalFile(new Path(backupUri), new Path(dbPath));
+	}
+
+	public static void copyToLocal(File hadoopConfPath, URI remotePath, File localPath) throws
Exception {
+		ExternalProcessRunner processRunner = new ExternalProcessRunner(HDFSCopyToLocal.class.getName(),
+			new String[]{hadoopConfPath.getAbsolutePath(), remotePath.toString(), localPath.getAbsolutePath()});
+		if (processRunner.run() != 0) {
+			throw new  RuntimeException("Error while copying from remote FileSystem: " + processRunner.getErrorOutput());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5d71909/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index aa029ac..53cde46 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -183,12 +183,12 @@ public class RocksDBListState<K, N, V>
 				TypeSerializer<K> keySerializer,
 				TypeSerializer<N> namespaceSerializer,
 				ListStateDescriptor<V> stateDesc,
-				File dbPath,
+				File basePath,
 				String backupPath,
 				String restorePath,
 				Options options) throws Exception {
 			
-			return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, dbPath,

+			return new RocksDBListState<>(keySerializer, namespaceSerializer, stateDesc, basePath,
 					checkpointPath, restorePath, options);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5d71909/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
index 3fdfafe..d1444eb 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java
@@ -181,13 +181,13 @@ public class RocksDBReducingState<K, N, V>
 				TypeSerializer<K> keySerializer,
 				TypeSerializer<N> namespaceSerializer,
 				ReducingStateDescriptor<V> stateDesc,
-				File dbPath,
+				File basePath,
 				String backupPath,
 				String restorePath,
 				Options options) throws Exception {
 			
-			return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc,

-					dbPath, checkpointPath, restorePath, options);
+			return new RocksDBReducingState<>(keySerializer, namespaceSerializer, stateDesc,
+					basePath, checkpointPath, restorePath, options);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f5d71909/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
index 5f6eccd..39a0e83 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java
@@ -169,12 +169,12 @@ public class RocksDBValueState<K, N, V>
 				TypeSerializer<K> keySerializer,
 				TypeSerializer<N> namespaceSerializer,
 				ValueStateDescriptor<V> stateDesc,
-				File dbPath,
+				File basePath,
 				String backupPath,
 				String restorePath,
 				Options options) throws Exception {
 			
-			return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, dbPath,

+			return new RocksDBValueState<>(keySerializer, namespaceSerializer, stateDesc, basePath,
 					checkpointPath, restorePath, options);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5d71909/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HDFSCopyUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HDFSCopyUtilitiesTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HDFSCopyUtilitiesTest.java
new file mode 100644
index 0000000..ad2da04
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/HDFSCopyUtilitiesTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.net.URI;
+
+import static org.junit.Assert.assertTrue;
+
+public class HDFSCopyUtilitiesTest {
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+
+	/**
+	 * This test verifies that a hadoop configuration is correctly read in the external
+	 * process copying tools.
+	 */
+	@Test
+	public void testCopyFromLocal() throws Exception {
+		Configuration config = new Configuration();
+		config.set("fs.default.name", "magic-1337:///");
+
+		File testFolder = tempFolder.newFolder();
+		File hadoopConfPath = new File(testFolder, "hadoop-conf.binary");
+		try (DataOutputStream out = new DataOutputStream(new FileOutputStream(hadoopConfPath)))
{
+			config.write(out);
+		}
+
+		File originalFile = new File(testFolder, "original");
+		File copyFile = new File(testFolder, "copy");
+
+		try (DataOutputStream out = new DataOutputStream(new FileOutputStream(originalFile))) {
+			out.writeUTF("Hello there, 42!");
+		}
+
+		try {
+			HDFSCopyFromLocal.copyFromLocal(hadoopConfPath,
+					originalFile,
+					new URI(copyFile.getAbsolutePath()));
+		} catch (Exception e) {
+			// The copying will try to write to filesystem "magic-1337" for which there is no
+			// implementation, the error message will contain the name of the file system and
+			// we check for that.
+			assertTrue(e.getMessage().contains("magic-1337"));
+		}
+
+		config.set("fs.default.name", "file:///");
+
+		hadoopConfPath.delete();
+		try (DataOutputStream out = new DataOutputStream(new FileOutputStream(hadoopConfPath)))
{
+			config.write(out);
+		}
+
+		HDFSCopyFromLocal.copyFromLocal(hadoopConfPath,
+				originalFile,
+				new URI(copyFile.getAbsolutePath()));
+
+		try (DataInputStream in = new DataInputStream(new FileInputStream(copyFile))) {
+			assertTrue(in.readUTF().equals("Hello there, 42!"));
+
+		}
+	}
+
+	/**
+	 * This test verifies that a hadoop configuration is correctly read in the external
+	 * process copying tools.
+	 */
+	@Test
+	public void testCopyToLocal() throws Exception {
+		Configuration config = new Configuration();
+		config.set("fs.default.name", "magic-1337:///");
+
+		File testFolder = tempFolder.newFolder();
+		File hadoopConfPath = new File(testFolder, "hadoop-conf.binary");
+		try (DataOutputStream out = new DataOutputStream(new FileOutputStream(hadoopConfPath)))
{
+			config.write(out);
+		}
+
+		File originalFile = new File(testFolder, "original");
+		File copyFile = new File(testFolder, "copy");
+
+		try (DataOutputStream out = new DataOutputStream(new FileOutputStream(originalFile))) {
+			out.writeUTF("Hello there, 42!");
+		}
+
+		try {
+			HDFSCopyToLocal.copyToLocal(hadoopConfPath,
+					new URI(originalFile.getAbsolutePath()),
+					copyFile);
+		} catch (Exception e) {
+			// The copying will try to write to filesystem "magic-1337" for which there is no
+			// implementation, the error message will contain the name of the file system and
+			// we check for that.
+			assertTrue(e.getMessage().contains("magic-1337"));
+		}
+
+		config.set("fs.default.name", "file:///");
+
+		hadoopConfPath.delete();
+		try (DataOutputStream out = new DataOutputStream(new FileOutputStream(hadoopConfPath)))
{
+			config.write(out);
+		}
+
+		HDFSCopyToLocal.copyToLocal(hadoopConfPath,
+				new URI(originalFile.getAbsolutePath()),
+				copyFile);
+
+		try (DataInputStream in = new DataInputStream(new FileInputStream(copyFile))) {
+			assertTrue(in.readUTF().equals("Hello there, 42!"));
+
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5d71909/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java
deleted file mode 100644
index cf6780b..0000000
--- a/flink-core/src/main/java/org/apache/flink/util/HDFSCopyFromLocal.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.util;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.File;
-import java.net.URI;
-
-/**
- * Utility for copying from local file system to a HDFS {@link FileSystem} in an external
process.
- * This is required since {@code FileSystem.copyFromLocalFile} does not like being interrupted.
- */
-public class HDFSCopyFromLocal {
-	public static void main(String[] args) throws Exception {
-		String localBackupPath = args[0];
-		String backupUri = args[1];
-
-		FileSystem fs = FileSystem.get(new URI(backupUri), new Configuration());
-
-		fs.copyFromLocalFile(new Path(localBackupPath), new Path(backupUri));
-	}
-
-	public static void copyFromLocal(File localPath, URI remotePath) throws Exception {
-		ExternalProcessRunner processRunner = new ExternalProcessRunner(HDFSCopyFromLocal.class.getName(),
-			new String[]{localPath.getAbsolutePath(), remotePath.toString()});
-		if (processRunner.run() != 0) {
-			throw new  RuntimeException("Error while copying to remote FileSystem: " + processRunner.getErrorOutput());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f5d71909/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java b/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java
deleted file mode 100644
index 813f768..0000000
--- a/flink-core/src/main/java/org/apache/flink/util/HDFSCopyToLocal.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.util;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.File;
-import java.net.URI;
-
-/**
- * Utility for copying from a HDFS {@link FileSystem} to the local file system in an external
- * process. This is required since {@code FileSystem.copyToLocalFile} does not like being
- * interrupted.
- */
-public class HDFSCopyToLocal {
-	public static void main(String[] args) throws Exception {
-		String backupUri = args[0];
-		String dbPath = args[1];
-
-		FileSystem fs = FileSystem.get(new URI(backupUri), new Configuration());
-
-		fs.copyToLocalFile(new Path(backupUri), new Path(dbPath));
-	}
-
-	public static void copyToLocal(URI remotePath, File localPath) throws Exception {
-		ExternalProcessRunner processRunner = new ExternalProcessRunner(HDFSCopyToLocal.class.getName(),
-			new String[]{remotePath.toString(), localPath.getAbsolutePath()});
-		if (processRunner.run() != 0) {
-			throw new  RuntimeException("Error while copying from remote FileSystem: " + processRunner.getErrorOutput());
-		}
-	}
-}


Mime
View raw message