flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [09/20] flink git commit: [FLINK-2268] Remove unused HDFS copy-utils from flink-streaming-java
Date Wed, 27 Sep 2017 11:09:14 GMT
[FLINK-2268] Remove unused HDFS copy-utils from flink-streaming-java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/492dd8eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/492dd8eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/492dd8eb

Branch: refs/heads/master
Commit: 492dd8eb41907011a30334bf85c62d74325e8fc6
Parents: 2807932
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Aug 22 16:42:22 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Sep 27 10:05:11 2017 +0200

----------------------------------------------------------------------
 .../flink/streaming/util/HDFSCopyFromLocal.java |  67 ------------
 .../flink/streaming/util/HDFSCopyToLocal.java   |  65 ------------
 .../streaming/util/HDFSCopyUtilitiesTest.java   | 104 -------------------
 3 files changed, 236 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/492dd8eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
deleted file mode 100644
index 833c13b..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyFromLocal.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.File;
-import java.net.URI;
-
-/**
- * Utility for copying from local file system to a HDFS {@link FileSystem}.
- */
-public class HDFSCopyFromLocal {
-
-	public static void copyFromLocal(final File localPath,
-			final URI remotePath) throws Exception {
-		// Do it in another Thread because HDFS can deadlock if being interrupted while copying
-		String threadName = "HDFS Copy from " + localPath + " to " + remotePath;
-
-		final Tuple1<Exception> asyncException = Tuple1.of(null);
-
-		Thread copyThread = new Thread(threadName) {
-			@Override
-			public void run() {
-				try {
-					Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
-
-					FileSystem fs = FileSystem.get(remotePath, hadoopConf);
-
-					fs.copyFromLocalFile(new Path(localPath.getAbsolutePath()),
-							new Path(remotePath));
-				} catch (Exception t) {
-					asyncException.f0 = t;
-				}
-			}
-		};
-
-		copyThread.setDaemon(true);
-		copyThread.start();
-		copyThread.join();
-
-		if (asyncException.f0 != null) {
-			throw asyncException.f0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/492dd8eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyToLocal.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyToLocal.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyToLocal.java
deleted file mode 100644
index e3b45ed..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/HDFSCopyToLocal.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.File;
-import java.net.URI;
-
-/**
- * Utility for copying from a HDFS {@link FileSystem} to the local file system.
- */
-public class HDFSCopyToLocal {
-
-	public static void copyToLocal(final URI remotePath,
-			final File localPath) throws Exception {
-		// Do it in another Thread because HDFS can deadlock if being interrupted while copying
-		String threadName = "HDFS Copy from " + remotePath + " to " + localPath;
-
-		final Tuple1<Exception> asyncException = Tuple1.of(null);
-
-		Thread copyThread = new Thread(threadName) {
-			@Override
-			public void run() {
-				try {
-					Configuration hadoopConf = HadoopFileSystem.getHadoopConfiguration();
-
-					FileSystem fs = FileSystem.get(remotePath, hadoopConf);
-					fs.copyToLocalFile(new Path(remotePath), new Path(localPath.getAbsolutePath()));
-				} catch (Exception t) {
-					asyncException.f0 = t;
-				}
-			}
-		};
-
-		copyThread.setDaemon(true);
-		copyThread.start();
-		copyThread.join();
-
-		if (asyncException.f0 != null) {
-			throw asyncException.f0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/492dd8eb/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
deleted file mode 100644
index ca21c0c..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/HDFSCopyUtilitiesTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.OperatingSystem;
-
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link HDFSCopyFromLocal} and {@link HDFSCopyToLocal}.
- */
-public class HDFSCopyUtilitiesTest {
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void checkOperatingSystem() {
-		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
-	}
-
-
-	/**
-	 * This test verifies that a hadoop configuration is correctly read in the external
-	 * process copying tools.
-	 */
-	@Test
-	public void testCopyFromLocal() throws Exception {
-
-		File testFolder = tempFolder.newFolder();
-
-		File originalFile = new File(testFolder, "original");
-		File copyFile = new File(testFolder, "copy");
-
-		try (DataOutputStream out = new DataOutputStream(new FileOutputStream(originalFile))) {
-			out.writeUTF("Hello there, 42!");
-		}
-
-		HDFSCopyFromLocal.copyFromLocal(
-				originalFile,
-				new Path("file://" + copyFile.getAbsolutePath()).toUri());
-
-		try (DataInputStream in = new DataInputStream(new FileInputStream(copyFile))) {
-			assertTrue(in.readUTF().equals("Hello there, 42!"));
-
-		}
-	}
-
-	/**
-	 * This test verifies that a hadoop configuration is correctly read in the external
-	 * process copying tools.
-	 */
-	@Test
-	public void testCopyToLocal() throws Exception {
-
-		File testFolder = tempFolder.newFolder();
-
-		File originalFile = new File(testFolder, "original");
-		File copyFile = new File(testFolder, "copy");
-
-		try (DataOutputStream out = new DataOutputStream(new FileOutputStream(originalFile))) {
-			out.writeUTF("Hello there, 42!");
-		}
-
-		HDFSCopyToLocal.copyToLocal(
-				new Path("file://" + originalFile.getAbsolutePath()).toUri(),
-				copyFile);
-
-		try (DataInputStream in = new DataInputStream(new FileInputStream(copyFile))) {
-			assertTrue(in.readUTF().equals("Hello there, 42!"));
-
-		}
-	}
-
-}


Mime
View raw message