flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [3/4] flink git commit: [FLINK-1680] Remove Tachyon test and rename Maven module to "flink-fs-tests"
Date Wed, 05 Aug 2015 14:24:37 GMT
[FLINK-1680] Remove Tachyon test and rename Maven module to "flink-fs-tests"

This closes #987


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fb7e6342
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fb7e6342
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fb7e6342

Branch: refs/heads/master
Commit: fb7e6342211d116a2db13933241d3546bbf8d4e8
Parents: 0aa6f0c
Author: Robert Metzger <rmetzger@apache.org>
Authored: Tue Aug 4 13:35:12 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Aug 5 14:46:35 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-fs-tests/pom.xml            |  78 +++++++++
 .../flink/tachyon/FileStateHandleTest.java      | 126 ++++++++++++++
 .../java/org/apache/flink/tachyon/HDFSTest.java | 174 +++++++++++++++++++
 .../src/test/resources/log4j.properties         |  31 ++++
 flink-staging/flink-tachyon/pom.xml             | 113 ------------
 .../flink/tachyon/FileStateHandleTest.java      | 126 --------------
 .../java/org/apache/flink/tachyon/HDFSTest.java | 157 -----------------
 .../tachyon/TachyonFileSystemWrapperTest.java   | 167 ------------------
 .../src/test/resources/log4j.properties         |  31 ----
 .../src/test/resources/tachyonHadoopConf.xml    |  28 ---
 flink-staging/pom.xml                           |   4 +-
 11 files changed, 411 insertions(+), 624 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-fs-tests/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/pom.xml b/flink-staging/flink-fs-tests/pom.xml
new file mode 100644
index 0000000..fe1abb3
--- /dev/null
+++ b/flink-staging/flink-fs-tests/pom.xml
@@ -0,0 +1,78 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-staging</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-fs-tests</artifactId>
+	<name>flink-fs-tests</name>
+
+	<packaging>jar</packaging>
+
+	<!--
+		This is a Hadoop2 only flink module.
+	-->
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>${shading-artifact.name}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-java-examples</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-hdfs</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+		</dependency>
+		<dependency>
+			<groupId>org.apache.hadoop</groupId>
+			<artifactId>hadoop-common</artifactId>
+			<scope>test</scope>
+			<type>test-jar</type>
+			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
new file mode 100644
index 0000000..2873c78
--- /dev/null
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tachyon;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.runtime.util.SerializedValue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class FileStateHandleTest {
+
+	private String hdfsURI;
+	private MiniDFSCluster hdfsCluster;
+	private org.apache.hadoop.fs.Path hdPath;
+	private org.apache.hadoop.fs.FileSystem hdfs;
+
+	@Before
+	public void createHDFS() {
+		try {
+			Configuration hdConf = new Configuration();
+
+			File baseDir = new File("./target/hdfs/filestatehandletest").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":"
+					+ hdfsCluster.getNameNodePort() + "/";
+
+			hdPath = new org.apache.hadoop.fs.Path("/StateHandleTest");
+			hdfs = hdPath.getFileSystem(hdConf);
+			hdfs.mkdirs(hdPath);
+
+		} catch (Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@After
+	public void destroyHDFS() {
+		try {
+			hdfs.delete(hdPath, true);
+			hdfsCluster.shutdown();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	@Test
+	public void testFileStateHandle() throws Exception {
+
+		Serializable state = "state";
+
+		// Create a state handle provider for the hdfs directory
+		StateHandleProvider<Serializable> handleProvider = FileStateHandle.createProvider(hdfsURI
+				+ hdPath);
+
+		FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state);
+		
+		try {
+			handleProvider.createStateHandle(null);
+			fail();
+		} catch (RuntimeException e) {
+			// good
+		}
+
+		assertTrue(handle.stateFetched());
+		assertFalse(handle.isWritten());
+
+		// Serialize the handle so it writes the value to hdfs
+		SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>(
+				handle);
+		
+		assertTrue(handle.isWritten());
+		
+		// Deserialize the handle and verify that the state is not fetched yet
+		FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle
+				.deserializeValue(Thread.currentThread().getContextClassLoader());
+		assertFalse(deserializedHandle.stateFetched());
+
+		// Fetch the and compare with original
+		assertEquals(state, deserializedHandle.getState());
+
+		// Test whether discard removes the checkpoint file properly
+		assertTrue(hdfs.listFiles(hdPath, true).hasNext());
+		deserializedHandle.discardState();
+		assertFalse(hdfs.listFiles(hdPath, true).hasNext());
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
new file mode 100644
index 0000000..633d022
--- /dev/null
+++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/HDFSTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tachyon;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.io.FileOutputFormat;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.api.java.io.AvroOutputFormat;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.examples.java.wordcount.WordCount;
+import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringWriter;
+
+/**
+ * This test should logically be located in the 'flink-runtime' tests. However, this project
+ * has already all dependencies required (flink-java-examples). Also, the ParallelismOneExecEnv
is here.
+ */
+public class HDFSTest {
+
+	protected String hdfsURI;
+	private MiniDFSCluster hdfsCluster;
+	private org.apache.hadoop.fs.Path hdPath;
+	protected org.apache.hadoop.fs.FileSystem hdfs;
+
+	@Before
+	public void createHDFS() {
+		try {
+			Configuration hdConf = new Configuration();
+
+			File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();
+			FileUtil.fullyDelete(baseDir);
+			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
+			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
+			hdfsCluster = builder.build();
+
+			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort()
+"/";
+
+			hdPath = new org.apache.hadoop.fs.Path("/test");
+			hdfs = hdPath.getFileSystem(hdConf);
+			FSDataOutputStream stream = hdfs.create(hdPath);
+			for(int i = 0; i < 10; i++) {
+				stream.write("Hello HDFS\n".getBytes());
+			}
+			stream.close();
+
+		} catch(Throwable e) {
+			e.printStackTrace();
+			Assert.fail("Test failed " + e.getMessage());
+		}
+	}
+
+	@After
+	public void destroyHDFS() {
+		try {
+			hdfs.delete(hdPath, false);
+			hdfsCluster.shutdown();
+		} catch (IOException e) {
+			throw new RuntimeException(e);
+		}
+
+	}
+
+	@Test
+	public void testHDFS() {
+
+		Path file = new Path(hdfsURI + hdPath);
+		org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result");
+		try {
+			FileSystem fs = file.getFileSystem();
+			Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
+			new DopOneTestEnvironment();
+			try {
+				WordCount.main(new String[]{file.toString(), result.toString()});
+			} catch(Throwable t) {
+				t.printStackTrace();
+				Assert.fail("Test failed with " + t.getMessage());
+			}
+			Assert.assertTrue("No result file present", hdfs.exists(result));
+			// validate output:
+			org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
+			StringWriter writer = new StringWriter();
+			IOUtils.copy(inStream, writer);
+			String resultString = writer.toString();
+
+			Assert.assertEquals("hdfs 10\n" +
+					"hello 10\n", resultString);
+			inStream.close();
+
+		} catch (IOException e) {
+			e.printStackTrace();
+			Assert.fail("Error in test: " + e.getMessage() );
+		}
+	}
+
+	@Test
+	public void testAvroOut() {
+		String type = "one";
+		AvroOutputFormat<String> avroOut =
+				new AvroOutputFormat<String>( String.class );
+
+		org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
+
+		avroOut.setOutputFilePath(new Path(result.toString()));
+		avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
+		avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
+
+		try {
+			avroOut.open(0, 2);
+			avroOut.writeRecord(type);
+			avroOut.close();
+
+			avroOut.open(1, 2);
+			avroOut.writeRecord(type);
+			avroOut.close();
+
+
+			Assert.assertTrue("No result file present", hdfs.exists(result));
+			FileStatus[] files = hdfs.listStatus(result);
+			Assert.assertEquals(2, files.length);
+			for(FileStatus file : files) {
+				Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName()));
+			}
+
+		} catch (IOException e) {
+			e.printStackTrace();
+			Assert.fail(e.getMessage());
+		}
+	}
+
+	// package visible
+	static final class DopOneTestEnvironment extends LocalEnvironment {
+		static {
+			initializeContextEnvironment(new ExecutionEnvironmentFactory() {
+				@Override
+				public ExecutionEnvironment createExecutionEnvironment() {
+					LocalEnvironment le = new LocalEnvironment();
+					le.setParallelism(1);
+					return le;
+				}
+			});
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-fs-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-fs-tests/src/test/resources/log4j.properties b/flink-staging/flink-fs-tests/src/test/resources/log4j.properties
new file mode 100644
index 0000000..f533ba2
--- /dev/null
+++ b/flink-staging/flink-fs-tests/src/test/resources/log4j.properties
@@ -0,0 +1,31 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Tachyon's test-jar dependency adds a log4j.properties file to classpath.
+# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571)
+# we provide a log4j.properties file ourselves.
+
+log4j.rootLogger=OFF, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target = System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/pom.xml b/flink-staging/flink-tachyon/pom.xml
deleted file mode 100644
index 7ad9139..0000000
--- a/flink-staging/flink-tachyon/pom.xml
+++ /dev/null
@@ -1,113 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-staging</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-tachyon</artifactId>
-	<name>flink-tachyon</name>
-
-	<packaging>jar</packaging>
-
-	<!--
-		This is a Hadoop2 only flink module.
-	-->
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>${shading-artifact.name}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java-examples</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-avro</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-hdfs</artifactId>
-			<scope>test</scope>
-			<type>test-jar</type>
-			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
-		</dependency>
-		<dependency>
-			<groupId>org.apache.hadoop</groupId>
-			<artifactId>hadoop-common</artifactId>
-			<scope>test</scope>
-			<type>test-jar</type>
-			<version>${hadoop.version}</version><!--$NO-MVN-MAN-VER$-->
-		</dependency>
-		<dependency>
-			<groupId>org.tachyonproject</groupId>
-			<artifactId>tachyon</artifactId>
-			<version>0.5.0</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.tachyonproject</groupId>
-			<artifactId>tachyon</artifactId>
-			<version>0.5.0</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.eclipse.jetty</groupId>
-			<artifactId>jetty-util</artifactId>
-			<version>7.6.8.v20121106</version><!--$NO-MVN-MAN-VER$-->
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-	<dependencyManagement>
-		<dependencies>
-			<dependency>
-				<groupId>org.eclipse.jetty</groupId>
-				<artifactId>jetty-server</artifactId>
-				<version>7.6.8.v20121106</version>
-				<scope>test</scope>
-			</dependency>
-			<dependency>
-				<groupId>org.eclipse.jetty</groupId>
-				<artifactId>jetty-servlet</artifactId>
-				<version>7.6.8.v20121106</version>
-				<scope>test</scope>
-			</dependency>
-		</dependencies>
-	</dependencyManagement>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
deleted file mode 100644
index 2873c78..0000000
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tachyon;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-
-import org.apache.flink.runtime.state.FileStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateHandleProvider;
-import org.apache.flink.runtime.util.SerializedValue;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class FileStateHandleTest {
-
-	private String hdfsURI;
-	private MiniDFSCluster hdfsCluster;
-	private org.apache.hadoop.fs.Path hdPath;
-	private org.apache.hadoop.fs.FileSystem hdfs;
-
-	@Before
-	public void createHDFS() {
-		try {
-			Configuration hdConf = new Configuration();
-
-			File baseDir = new File("./target/hdfs/filestatehandletest").getAbsoluteFile();
-			FileUtil.fullyDelete(baseDir);
-			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
-			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
-			hdfsCluster = builder.build();
-
-			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":"
-					+ hdfsCluster.getNameNodePort() + "/";
-
-			hdPath = new org.apache.hadoop.fs.Path("/StateHandleTest");
-			hdfs = hdPath.getFileSystem(hdConf);
-			hdfs.mkdirs(hdPath);
-
-		} catch (Throwable e) {
-			e.printStackTrace();
-			Assert.fail("Test failed " + e.getMessage());
-		}
-	}
-
-	@After
-	public void destroyHDFS() {
-		try {
-			hdfs.delete(hdPath, true);
-			hdfsCluster.shutdown();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-
-	}
-
-	@Test
-	public void testFileStateHandle() throws Exception {
-
-		Serializable state = "state";
-
-		// Create a state handle provider for the hdfs directory
-		StateHandleProvider<Serializable> handleProvider = FileStateHandle.createProvider(hdfsURI
-				+ hdPath);
-
-		FileStateHandle handle = (FileStateHandle) handleProvider.createStateHandle(state);
-		
-		try {
-			handleProvider.createStateHandle(null);
-			fail();
-		} catch (RuntimeException e) {
-			// good
-		}
-
-		assertTrue(handle.stateFetched());
-		assertFalse(handle.isWritten());
-
-		// Serialize the handle so it writes the value to hdfs
-		SerializedValue<StateHandle<Serializable>> serializedHandle = new SerializedValue<StateHandle<Serializable>>(
-				handle);
-		
-		assertTrue(handle.isWritten());
-		
-		// Deserialize the handle and verify that the state is not fetched yet
-		FileStateHandle deserializedHandle = (FileStateHandle) serializedHandle
-				.deserializeValue(Thread.currentThread().getContextClassLoader());
-		assertFalse(deserializedHandle.stateFetched());
-
-		// Fetch the and compare with original
-		assertEquals(state, deserializedHandle.getState());
-
-		// Test whether discard removes the checkpoint file properly
-		assertTrue(hdfs.listFiles(hdPath, true).hasNext());
-		deserializedHandle.discardState();
-		assertFalse(hdfs.listFiles(hdPath, true).hasNext());
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
deleted file mode 100644
index a761712..0000000
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/HDFSTest.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.tachyon;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.api.common.io.FileOutputFormat;
-import org.apache.flink.api.java.io.AvroOutputFormat;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.examples.java.wordcount.WordCount;
-import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.StringWriter;
-
-/**
- * This test should logically be located in the 'flink-runtime' tests. However, this project
- * has already all dependencies required (flink-java-examples). Also, the ParallelismOneExecEnv
is here.
- */
-public class HDFSTest {
-
-	protected String hdfsURI;
-	private MiniDFSCluster hdfsCluster;
-	private org.apache.hadoop.fs.Path hdPath;
-	protected org.apache.hadoop.fs.FileSystem hdfs;
-
-	@Before
-	public void createHDFS() {
-		try {
-			Configuration hdConf = new Configuration();
-
-			File baseDir = new File("./target/hdfs/hdfsTest").getAbsoluteFile();
-			FileUtil.fullyDelete(baseDir);
-			hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath());
-			MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf);
-			hdfsCluster = builder.build();
-
-			hdfsURI = "hdfs://" + hdfsCluster.getURI().getHost() + ":" + hdfsCluster.getNameNodePort()
+"/";
-
-			hdPath = new org.apache.hadoop.fs.Path("/test");
-			hdfs = hdPath.getFileSystem(hdConf);
-			FSDataOutputStream stream = hdfs.create(hdPath);
-			for(int i = 0; i < 10; i++) {
-				stream.write("Hello HDFS\n".getBytes());
-			}
-			stream.close();
-
-		} catch(Throwable e) {
-			e.printStackTrace();
-			Assert.fail("Test failed " + e.getMessage());
-		}
-	}
-
-	@After
-	public void destroyHDFS() {
-		try {
-			hdfs.delete(hdPath, false);
-			hdfsCluster.shutdown();
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-
-	}
-
-	@Test
-	public void testHDFS() {
-
-		Path file = new Path(hdfsURI + hdPath);
-		org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/result");
-		try {
-			FileSystem fs = file.getFileSystem();
-			Assert.assertTrue("Must be HadoopFileSystem", fs instanceof HadoopFileSystem);
-			new TachyonFileSystemWrapperTest.DopOneTestEnvironment();
-			try {
-				WordCount.main(new String[]{file.toString(), result.toString()});
-			} catch(Throwable t) {
-				t.printStackTrace();
-				Assert.fail("Test failed with " + t.getMessage());
-			}
-			Assert.assertTrue("No result file present", hdfs.exists(result));
-			// validate output:
-			org.apache.hadoop.fs.FSDataInputStream inStream = hdfs.open(result);
-			StringWriter writer = new StringWriter();
-			IOUtils.copy(inStream, writer);
-			String resultString = writer.toString();
-
-			Assert.assertEquals("hdfs 10\n" +
-					"hello 10\n", resultString);
-			inStream.close();
-
-		} catch (IOException e) {
-			e.printStackTrace();
-			Assert.fail("Error in test: " + e.getMessage() );
-		}
-	}
-
-	@Test
-	public void testAvroOut() {
-		String type = "one";
-		AvroOutputFormat<String> avroOut =
-				new AvroOutputFormat<String>( String.class );
-
-		org.apache.hadoop.fs.Path result = new org.apache.hadoop.fs.Path(hdfsURI + "/avroTest");
-
-		avroOut.setOutputFilePath(new Path(result.toString()));
-		avroOut.setWriteMode(FileSystem.WriteMode.NO_OVERWRITE);
-		avroOut.setOutputDirectoryMode(FileOutputFormat.OutputDirectoryMode.ALWAYS);
-
-		try {
-			avroOut.open(0, 2);
-			avroOut.writeRecord(type);
-			avroOut.close();
-
-			avroOut.open(1, 2);
-			avroOut.writeRecord(type);
-			avroOut.close();
-
-
-			Assert.assertTrue("No result file present", hdfs.exists(result));
-			FileStatus[] files = hdfs.listStatus(result);
-			Assert.assertEquals(2, files.length);
-			for(FileStatus file : files) {
-				Assert.assertTrue("1.avro".equals(file.getPath().getName()) || "2.avro".equals(file.getPath().getName()));
-			}
-
-		} catch (IOException e) {
-			e.printStackTrace();
-			Assert.fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
b/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
deleted file mode 100644
index 3b2fb7f..0000000
--- a/flink-staging/flink-tachyon/src/test/java/org/apache/flink/tachyon/TachyonFileSystemWrapperTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.tachyon;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.api.java.LocalEnvironment;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.GlobalConfiguration;
-import org.apache.flink.examples.java.wordcount.WordCount;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import tachyon.client.InStream;
-import tachyon.client.OutStream;
-import tachyon.client.ReadType;
-import tachyon.client.TachyonFS;
-import tachyon.client.TachyonFile;
-import tachyon.client.WriteType;
-import tachyon.master.LocalTachyonCluster;
-
-import java.io.File;
-import java.io.StringWriter;
-import java.net.URISyntaxException;
-import java.net.URL;
-
-public class TachyonFileSystemWrapperTest {
-	private static final long TACHYON_WORKER_CAPACITY = 1024 * 1024 * 32;
-	private static final String TACHYON_TEST_IN_FILE_NAME = "tachyontest";
-	private static final String TACHYON_TEST_OUT_FILE_NAME = "result";
-	private static final Path HADOOP_CONFIG_PATH;
-
-	static {
-		URL resource = TachyonFileSystemWrapperTest.class.getResource("/tachyonHadoopConf.xml");
-		File file = null;
-		try {
-			file = new File(resource.toURI());
-		} catch (URISyntaxException e) {
-			throw new RuntimeException("Unable to load req. res", e);
-		}
-		if(!file.exists()) {
-			throw new RuntimeException("Unable to load required resource");
-		}
-		HADOOP_CONFIG_PATH = new Path(file.getAbsolutePath());
-	}
-
-	private LocalTachyonCluster cluster;
-	private TachyonFS client;
-	private String input;
-	private String output;
-
-	@Before
-	public void startTachyon() {
-		try {
-			cluster = new LocalTachyonCluster(TACHYON_WORKER_CAPACITY);
-			cluster.start();
-			client = cluster.getClient();
-			int id = client.createFile("/" + TACHYON_TEST_IN_FILE_NAME, 1024 * 32);
-			Assert.assertNotEquals("Unable to create file", -1, id);
-
-			TachyonFile testFile = client.getFile(id);
-			Assert.assertNotNull(testFile);
-
-
-			OutStream outStream = testFile.getOutStream(WriteType.MUST_CACHE);
-			for(int i = 0; i < 10; i++) {
-				outStream.write("Hello Tachyon\n".getBytes());
-			}
-			outStream.close();
-			final String tachyonBase = "tachyon://" + cluster.getMasterHostname() + ":" + cluster.getMasterPort();
-			input = tachyonBase + "/" + TACHYON_TEST_IN_FILE_NAME;
-			output = tachyonBase + "/" + TACHYON_TEST_OUT_FILE_NAME;
-
-		} catch(Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test preparation failed with exception: "+e.getMessage());
-		}
-	}
-
-	@After
-	public void stopTachyon() {
-		try {
-			cluster.stop();
-		} catch(Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test teardown failed with exception: "+e.getMessage());
-		}
-	}
-	// Verify that Hadoop's FileSystem can load the TFS (Tachyon File System)
-	@Test
-	public void testHadoopLoadability() {
-		try {
-			Path tPath = new Path(input);
-			Configuration conf = new Configuration();
-			conf.addResource(HADOOP_CONFIG_PATH);
-			Assert.assertEquals("tachyon.hadoop.TFS", conf.get("fs.tachyon.impl", null));
-			tPath.getFileSystem(conf);
-		} catch(Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test failed with exception: "+e.getMessage());
-		}
-	}
-
-
-	@Test
-	public void testTachyon() {
-		try {
-			org.apache.flink.configuration.Configuration addHDConfToFlinkConf = new org.apache.flink.configuration.Configuration();
-			addHDConfToFlinkConf.setString(ConfigConstants.HDFS_DEFAULT_CONFIG, HADOOP_CONFIG_PATH.toString());
-			GlobalConfiguration.includeConfiguration(addHDConfToFlinkConf);
-
-			new DopOneTestEnvironment(); // initialize parallelism one
-
-			WordCount.main(new String[]{input, output});
-
-			// verify result
-			TachyonFile resultFile = client.getFile("/" + TACHYON_TEST_OUT_FILE_NAME);
-			Assert.assertNotNull("Result file has not been created", resultFile);
-			InStream inStream = resultFile.getInStream(ReadType.CACHE);
-			Assert.assertNotNull("Result file has not been created", inStream);
-			StringWriter writer = new StringWriter();
-			IOUtils.copy(inStream, writer);
-			String resultString = writer.toString();
-
-			Assert.assertEquals("hello 10\n" +
-					"tachyon 10\n", resultString);
-
-		} catch(Exception e) {
-			e.printStackTrace();
-			Assert.fail("Test failed with exception: "+e.getMessage());
-		}
-	}
-
-	// package visible
-	static final class DopOneTestEnvironment extends LocalEnvironment {
-	 	static {
-    		initializeContextEnvironment(new ExecutionEnvironmentFactory() {
-				@Override
-				public ExecutionEnvironment createExecutionEnvironment() {
-					LocalEnvironment le = new LocalEnvironment();
-					le.setParallelism(1);
-					return le;
-				}
-			});
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/resources/log4j.properties b/flink-staging/flink-tachyon/src/test/resources/log4j.properties
deleted file mode 100644
index f533ba2..0000000
--- a/flink-staging/flink-tachyon/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Tachyon's test-jar dependency adds a log4j.properties file to classpath.
-# Until the issue is resolved (see https://github.com/amplab/tachyon/pull/571)
-# we provide a log4j.properties file ourselves.
-
-log4j.rootLogger=OFF, testlogger
-
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/flink-tachyon/src/test/resources/tachyonHadoopConf.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-tachyon/src/test/resources/tachyonHadoopConf.xml b/flink-staging/flink-tachyon/src/test/resources/tachyonHadoopConf.xml
deleted file mode 100644
index 0af8190..0000000
--- a/flink-staging/flink-tachyon/src/test/resources/tachyonHadoopConf.xml
+++ /dev/null
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
-
-
-<configuration>
-    <property>
-        <name>fs.tachyon.impl</name>
-        <value>tachyon.hadoop.TFS</value>
-    </property>
-</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/fb7e6342/flink-staging/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/pom.xml b/flink-staging/pom.xml
index a05c8b1..b3aec14 100644
--- a/flink-staging/pom.xml
+++ b/flink-staging/pom.xml
@@ -59,10 +59,10 @@ under the License.
 				</property>
 			</activation>
 			<modules>
-				<!-- Include the Flink-tachyon project only for HD2.
+				<!-- Include the flink-fs-tests project only for HD2.
 				 	The HDFS minicluster interfaces changed between the two versions.
 				 -->
-				<module>flink-tachyon</module>
+				<module>flink-fs-tests</module>
 			</modules>
 		</profile>
 		<profile>


Mime
View raw message