flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [63/73] [abbrv] prefix all projects in addons and quickstarts with flink-
Date Sat, 12 Jul 2014 12:48:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
new file mode 100644
index 0000000..b72b9bc
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.yarn;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Enumeration;
+import java.util.Map;
+import java.util.jar.JarEntry;
+import java.util.jar.JarFile;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+public class Utils {
+	
+	private static final Log LOG = LogFactory.getLog(Utils.class);
+	private static final int HEAP_LIMIT_CAP = 500;
+	
+
+	public static void copyJarContents(String prefix, String pathToJar) throws IOException {
+		LOG.info("Copying jar (location: "+pathToJar+") to prefix "+prefix);
+		
+		JarFile jar = null;
+		jar = new JarFile(pathToJar);
+		Enumeration<JarEntry> enumr = jar.entries();
+		byte[] bytes = new byte[1024];
+		while(enumr.hasMoreElements()) {
+			JarEntry entry = enumr.nextElement();
+			if(entry.getName().startsWith(prefix)) {
+				if(entry.isDirectory()) {
+					File cr = new File(entry.getName());
+					cr.mkdirs();
+					continue;
+				}
+				InputStream inStream = jar.getInputStream(entry);
+				File outFile = new File(entry.getName());
+				if(outFile.exists()) {
+					throw new RuntimeException("File unexpectedly exists");
+				}
+				FileOutputStream outputStream = new FileOutputStream(outFile);
+				int read = 0;
+				while ((read = inStream.read(bytes)) != -1) {
+					outputStream.write(bytes, 0, read);
+				}
+				inStream.close(); outputStream.close(); 
+			}
+		}
+		jar.close();
+	}
+	
+	/**
+	 * Calculate the heap size for the JVMs to start in the containers.
+	 * Since JVMs are allocating more than just the heap space, and YARN is very
+	 * fast at killing processes that use memory beyond their limit, we have to come
+	 * up with a good heapsize.
+	 * This code takes 85% of the given amount of memory (in MB). If the amount we removed by these 85%
+	 * more than 500MB (the current HEAP_LIMIT_CAP), we'll just subtract 500 MB.
+	 * 
+	 */
+	public static int calculateHeapSize(int memory) {
+		int heapLimit = (int)((float)memory*0.85);
+		if( (memory - heapLimit) > HEAP_LIMIT_CAP) {
+			heapLimit = memory-HEAP_LIMIT_CAP;
+		}
+		return heapLimit;
+	}
+	
+	public static void getFlinkConfiguration(String confDir) {
+		GlobalConfiguration.loadConfiguration(confDir);
+	}
+	
+	private static void addPathToConfig(Configuration conf, File path) {
+		// chain-in a new classloader
+		URL fileUrl = null;
+		try {
+			fileUrl = path.toURL();
+		} catch (MalformedURLException e) {
+			throw new RuntimeException("Erroneous config file path", e);
+		}
+		URL[] urls = {fileUrl};
+		ClassLoader cl = new URLClassLoader(urls, conf.getClassLoader());
+		conf.setClassLoader(cl);
+	}
+	
+	private static void setDefaultConfValues(Configuration conf) {
+		if(conf.get("fs.hdfs.impl",null) == null) {
+			conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+		}
+		if(conf.get("fs.file.impl",null) == null) {
+			conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
+		}
+	}
+	
+	public static Configuration initializeYarnConfiguration() {
+		Configuration conf = new YarnConfiguration();
+		String configuredHadoopConfig = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null);
+		if(configuredHadoopConfig != null) {
+			LOG.info("Using hadoop configuration path from " + ConfigConstants.PATH_HADOOP_CONFIG + " setting.");
+			addPathToConfig(conf, new File(configuredHadoopConfig));
+			setDefaultConfValues(conf);
+			return conf;
+		}
+		String[] envs = { "YARN_CONF_DIR", "HADOOP_CONF_DIR", "HADOOP_CONF_PATH" };
+		for(int i = 0; i < envs.length; ++i) {
+			String confPath = System.getenv(envs[i]);
+			if (confPath != null) {
+				LOG.info("Found "+envs[i]+", adding it to configuration");
+				addPathToConfig(conf, new File(confPath));
+				setDefaultConfValues(conf);
+				return conf;
+			}
+		}
+		LOG.info("Could not find HADOOP_CONF_PATH, using HADOOP_HOME.");
+		String hadoopHome = null;
+		try {
+			hadoopHome = Shell.getHadoopHome();
+		} catch (IOException e) {
+			LOG.fatal("Unable to get hadoop home. Please set HADOOP_HOME variable!", e);
+			System.exit(1);
+		}
+		File tryConf = new File(hadoopHome+"/etc/hadoop");
+		if(tryConf.exists()) {
+			LOG.info("Found configuration using hadoop home.");
+			addPathToConfig(conf, tryConf);
+		} else {
+			tryConf = new File(hadoopHome+"/conf");
+			if(tryConf.exists()) {
+				addPathToConfig(conf, tryConf);
+			}
+		}
+		setDefaultConfValues(conf);
+		return conf;
+	}
+	
+	public static void setupEnv(Configuration conf, Map<String, String> appMasterEnv) {
+		for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+			addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
+		}
+		addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator + "*");
+	}
+	
+	
+	/**
+	 * 
+	 * @return Path to remote file (usually hdfs)
+	 * @throws IOException
+	 */
+	public static Path setupLocalResource(Configuration conf, FileSystem fs, String appId, Path localRsrcPath, LocalResource appMasterJar, Path homedir)
+			throws IOException {
+		// copy to HDFS
+		String suffix = ".flink/" + appId + "/" + localRsrcPath.getName();
+		
+		Path dst = new Path(homedir, suffix);
+		
+		LOG.info("Copying from "+localRsrcPath+" to "+dst );
+		fs.copyFromLocalFile(localRsrcPath, dst);
+		registerLocalResource(fs, dst, appMasterJar);
+		return dst;
+	}
+	
+	public static void registerLocalResource(FileSystem fs, Path remoteRsrcPath, LocalResource localResource) throws IOException {
+		FileStatus jarStat = fs.getFileStatus(remoteRsrcPath);
+		localResource.setResource(ConverterUtils.getYarnUrlFromURI(remoteRsrcPath.toUri()));
+		localResource.setSize(jarStat.getLen());
+		localResource.setTimestamp(jarStat.getModificationTime());
+		localResource.setType(LocalResourceType.FILE);
+		localResource.setVisibility(LocalResourceVisibility.PUBLIC);
+	}
+
+	public static void setTokensFor(ContainerLaunchContext amContainer, Path[] paths, Configuration conf) throws IOException {
+		Credentials credentials = new Credentials();
+		// for HDFS
+		TokenCache.obtainTokensForNamenodes(credentials, paths, conf);
+		// for user
+		UserGroupInformation currUsr = UserGroupInformation.getCurrentUser();
+		
+		Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
+		for(Token<? extends TokenIdentifier> token : usrTok) {
+			final Text id = new Text(token.getIdentifier());
+			LOG.info("Adding user token "+id+" with "+token);
+			credentials.addToken(id, token);
+		}
+		DataOutputBuffer dob = new DataOutputBuffer();
+		credentials.writeTokenStorageToStream(dob);
+		LOG.debug("Wrote tokens. Credentials buffer length: "+dob.getLength());
+		
+		ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+		amContainer.setTokens(securityTokens);
+	}
+	
+	public static void logFilesInCurrentDirectory(final Log logger) {
+		new File(".").list(new FilenameFilter() {
+			
+			@Override
+			public boolean accept(File dir, String name) {
+				logger.info(dir.getAbsolutePath()+"/"+name);
+				return true;
+			}
+		});
+	}
+	
+	/**
+	 * Copied method from org.apache.hadoop.yarn.util.Apps
+	 * It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1
+	 * by https://issues.apache.org/jira/browse/YARN-1931
+	 */
+	public static void addToEnvironment(Map<String, String> environment,
+			String variable, String value) {
+		String val = environment.get(variable);
+		if (val == null) {
+			val = value;
+		} else {
+			val = val + File.pathSeparator + value;
+		}
+		environment.put(StringInterner.weakIntern(variable),
+				StringInterner.weakIntern(val));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
new file mode 100644
index 0000000..b541317
--- /dev/null
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.flink.yarn;
+
+import java.io.IOException;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+
+public class YarnTaskManagerRunner {
+	
+	private static final Log LOG = LogFactory.getLog(YarnTaskManagerRunner.class);
+	
+	public static void main(final String[] args) throws IOException {
+		Map<String, String> envs = System.getenv();
+		final String yarnClientUsername = envs.get(Client.ENV_CLIENT_USERNAME);
+		final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
+		
+		// configure local directory
+		final String[] newArgs = Arrays.copyOf(args, args.length + 2);
+		newArgs[newArgs.length-2] = "-"+TaskManager.ARG_CONF_DIR;
+		newArgs[newArgs.length-1] = localDirs;
+		LOG.info("Setting log path "+localDirs);
+		LOG.info("YARN daemon runs as '"+UserGroupInformation.getCurrentUser().getShortUserName()+"' setting"
+				+ " user to execute Flink TaskManager to '"+yarnClientUsername+"'");
+		UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername);
+		for(Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) {
+			ugi.addToken(toks);
+		}
+		ugi.doAs(new PrivilegedAction<Object>() {
+			@Override
+			public Object run() {
+				try {
+					TaskManager.main(newArgs);
+				} catch (Exception e) {
+					e.printStackTrace();
+				}
+				return null;
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/pom.xml
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/pom.xml b/flink-addons/hadoop-compatibility/pom.xml
deleted file mode 100644
index 399ef88..0000000
--- a/flink-addons/hadoop-compatibility/pom.xml
+++ /dev/null
@@ -1,77 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License. See accompanying LICENSE file.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
-	
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-	
-	<modelVersion>4.0.0</modelVersion>
-	
-	<parent>
-		<artifactId>flink-addons</artifactId>
-		<groupId>org.apache.flink</groupId>
-		<version>0.6-incubating-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>hadoop-compatibility</artifactId>
-	<name>hadoop-compatibility</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-	</dependencies>
-
-	<profiles>
-		<profile>
-			<id>hadoop-2</id>
-			<activation>
-				<property>
-					<!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
-					<!--hadoop2--><name>hadoop.profile</name><value>2</value>
-				</property>
-			</activation>
-			<dependencies>
-				<dependency>
-					<groupId>org.apache.hadoop</groupId>
-					<artifactId>hadoop-mapreduce-client-core</artifactId>
-					<version>${hadoop.version}</version>
-				</dependency>
-			</dependencies>
-		</profile>
-	</profiles>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
deleted file mode 100644
index 030d7f2..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopInputFormat.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.WritableTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
-import org.apache.flink.types.TypeInformation;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class HadoopInputFormat<K extends Writable, V extends Writable> implements InputFormat<Tuple2<K,V>, HadoopInputSplit>, ResultTypeQueryable<Tuple2<K,V>> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private static final Log LOG = LogFactory.getLog(HadoopInputFormat.class);
-	
-	private org.apache.hadoop.mapred.InputFormat<K, V> mapredInputFormat;
-	private Class<K> keyClass;
-	private Class<V> valueClass;
-	private JobConf jobConf;
-	
-	private transient K key;
-	private transient V value;
-	
-	private transient RecordReader<K, V> recordReader;
-	private transient boolean fetched = false;
-	private transient boolean hasNext;
-	
-	public HadoopInputFormat() {
-		super();
-	}
-	
-	public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat, Class<K> key, Class<V> value, JobConf job) {
-		super();
-		this.mapredInputFormat = mapredInputFormat;
-		this.keyClass = key;
-		this.valueClass = value;
-		HadoopUtils.mergeHadoopConf(job);
-		this.jobConf = job;
-	}
-	
-	public void setJobConf(JobConf job) {
-		this.jobConf = job;
-	}
-	
-	public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() {
-		return mapredInputFormat;
-	}
-	
-	public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> mapredInputFormat) {
-		this.mapredInputFormat = mapredInputFormat;
-	}
-	
-	public JobConf getJobConf() {
-		return jobConf;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  InputFormat
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void configure(Configuration parameters) {
-		// nothing to do
-	}
-	
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
-		// only gather base statistics for FileInputFormats
-		if(!(mapredInputFormat instanceof FileInputFormat)) {
-			return null;
-		}
-		
-		final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ?
-				(FileBaseStatistics) cachedStats : null;
-		
-		try {
-			final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf);
-			
-			return getFileStats(cachedFileStats, paths, new ArrayList<FileStatus>(1));
-		} catch (IOException ioex) {
-			if (LOG.isWarnEnabled()) {
-				LOG.warn("Could not determine statistics due to an io error: "
-						+ ioex.getMessage());
-			}
-		} catch (Throwable t) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Unexpected problem while getting the file statistics: "
-						+ t.getMessage(), t);
-			}
-		}
-		
-		// no statistics available
-		return null;
-	}
-	
-	@Override
-	public HadoopInputSplit[] createInputSplits(int minNumSplits)
-			throws IOException {
-		org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits);
-		HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length];
-		for(int i=0;i<splitArray.length;i++){
-			hiSplit[i] = new HadoopInputSplit(splitArray[i], jobConf);
-		}
-		return hiSplit;
-	}
-	
-	@Override
-	public Class<? extends HadoopInputSplit> getInputSplitType() {
-		return HadoopInputSplit.class;
-	}
-	
-	@Override
-	public void open(HadoopInputSplit split) throws IOException {
-		this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
-		key = this.recordReader.createKey();
-		value = this.recordReader.createValue();
-		this.fetched = false;
-	}
-	
-	@Override
-	public boolean reachedEnd() throws IOException {
-		if(!fetched) {
-			fetchNext();
-		}
-		return !hasNext;
-	}
-	
-	private void fetchNext() throws IOException {
-		hasNext = this.recordReader.next(key, value);
-		fetched = true;
-	}
-	
-	@Override
-	public Tuple2<K, V> nextRecord(Tuple2<K, V> record) throws IOException {
-		if(!fetched) {
-			fetchNext();
-		}
-		if(!hasNext) {
-			return null;
-		}
-		record.f0 = key;
-		record.f1 = value;
-		fetched = false;
-		return record;
-	}
-	
-	@Override
-	public void close() throws IOException {
-		this.recordReader.close();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Helper methods
-	// --------------------------------------------------------------------------------------------
-	
-	private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths,
-			ArrayList<FileStatus> files) throws IOException {
-		
-		long latestModTime = 0L;
-		
-		// get the file info and check whether the cached statistics are still valid.
-		for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) {
-			
-			final Path filePath = new Path(hadoopPath.toUri());
-			final FileSystem fs = FileSystem.get(filePath.toUri());
-			
-			final FileStatus file = fs.getFileStatus(filePath);
-			latestModTime = Math.max(latestModTime, file.getModificationTime());
-			
-			// enumerate all files and check their modification time stamp.
-			if (file.isDir()) {
-				FileStatus[] fss = fs.listStatus(filePath);
-				files.ensureCapacity(files.size() + fss.length);
-				
-				for (FileStatus s : fss) {
-					if (!s.isDir()) {
-						files.add(s);
-						latestModTime = Math.max(s.getModificationTime(), latestModTime);
-					}
-				}
-			} else {
-				files.add(file);
-			}
-		}
-		
-		// check whether the cached statistics are still valid, if we have any
-		if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) {
-			return cachedStats;
-		}
-		
-		// calculate the whole length
-		long len = 0;
-		for (FileStatus s : files) {
-			len += s.getLen();
-		}
-		
-		// sanity check
-		if (len <= 0) {
-			len = BaseStatistics.SIZE_UNKNOWN;
-		}
-		
-		return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-	
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeUTF(mapredInputFormat.getClass().getName());
-		out.writeUTF(keyClass.getName());
-		out.writeUTF(valueClass.getName());
-		jobConf.write(out);
-	}
-	
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		String hadoopInputFormatClassName = in.readUTF();
-		String keyClassName = in.readUTF();
-		String valueClassName = in.readUTF();
-		if(jobConf == null) {
-			jobConf = new JobConf();
-		}
-		jobConf.readFields(in);
-		try {
-			this.mapredInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
-		}
-		try {
-			this.keyClass = (Class<K>) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to find key class.", e);
-		}
-		try {
-			this.valueClass = (Class<V>) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader());
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to find value class.", e);
-		}
-		ReflectionUtils.setConf(mapredInputFormat, jobConf);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  ResultTypeQueryable
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public TypeInformation<Tuple2<K,V>> getProducedType() {
-		return new TupleTypeInfo<Tuple2<K,V>>(new WritableTypeInfo<K>((Class<K>) keyClass), new WritableTypeInfo<V>((Class<V>) valueClass));
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
deleted file mode 100644
index deae026..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/HadoopOutputFormat.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.util.ReflectionUtils;
-
-
-public class HadoopOutputFormat<K extends Writable,V extends Writable> implements OutputFormat<Tuple2<K, V>> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private JobConf jobConf;	
-	private org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat;	
-	private transient RecordWriter<K,V> recordWriter;	
-	private transient FileOutputCommitter fileOutputCommitter;
-	private transient TaskAttemptContext context;
-	private transient JobContext jobContext;
-	
-	public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat, JobConf job) {
-		super();
-		this.mapredOutputFormat = mapredOutputFormat;
-		HadoopUtils.mergeHadoopConf(job);
-		this.jobConf = job;
-	}
-	
-	public void setJobConf(JobConf job) {
-		this.jobConf = job;
-	}
-	
-	public JobConf getJobConf() {
-		return jobConf;
-	}
-	
-	public org.apache.hadoop.mapred.OutputFormat<K,V> getHadoopOutputFormat() {
-		return mapredOutputFormat;
-	}
-	
-	public void setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> mapredOutputFormat) {
-		this.mapredOutputFormat = mapredOutputFormat;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  OutputFormat
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void configure(Configuration parameters) {
-		// nothing to do
-	}
-	
-	/**
-	 * create the temporary output file for hadoop RecordWriter.
-	 * @param taskNumber The number of the parallel instance.
-	 * @param numTasks The number of parallel tasks.
-	 * @throws IOException
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		if (Integer.toString(taskNumber + 1).length() > 6) {
-			throw new IOException("Task id too large.");
-		}
-		
-		TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" 
-				+ String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") 
-				+ Integer.toString(taskNumber + 1) 
-				+ "_0");
-		
-		try {
-			this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		this.jobConf.set("mapred.task.id", taskAttemptID.toString());
-		// for hadoop 2.2
-		this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString());
-		
-		this.fileOutputCommitter = new FileOutputCommitter();
-		
-		try {
-			this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID());
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-		
-		this.fileOutputCommitter.setupJob(jobContext);
-		
-		this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
-	}
-	
-	@Override
-	public void writeRecord(Tuple2<K, V> record) throws IOException {
-		this.recordWriter.write(record.f0, record.f1);
-	}
-	
-	/**
-	 * commit the task by moving the output file out from the temporary directory.
-	 * @throws IOException
-	 */
-	@Override
-	public void close() throws IOException {
-		this.recordWriter.close(new HadoopDummyReporter());
-		
-		if (this.fileOutputCommitter.needsTaskCommit(this.context)) {
-			this.fileOutputCommitter.commitTask(this.context);
-		}
-		this.fileOutputCommitter.commitJob(this.jobContext);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Custom serialization methods
-	// --------------------------------------------------------------------------------------------
-	
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeUTF(mapredOutputFormat.getClass().getName());
-		jobConf.write(out);
-	}
-	
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		String hadoopOutputFormatName = in.readUTF();
-		if(jobConf == null) {
-			jobConf = new JobConf();
-		}
-		jobConf.readFields(in);
-		try {
-			this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
-		}
-		ReflectionUtils.setConf(mapredOutputFormat, jobConf);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
deleted file mode 100644
index 4e8ffa9..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/example/WordCount.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.hadoopcompatibility.mapred.example;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TextInputFormat;
-import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.functions.FlatMapFunction;
-import org.apache.flink.api.java.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopInputFormat;
-import org.apache.flink.hadoopcompatibility.mapred.HadoopOutputFormat;
-import org.apache.flink.util.Collector;
-
-
-
-/**
- * Implements a word count which takes the input file and counts the number of
- * occurrences of each word in the file and writes the result back to disk.
- * 
- * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to 
- * common Java types for better usage in a Flink job and how to use Hadoop Output Formats.
- */
-@SuppressWarnings("serial")
-public class WordCount {
-	
-	public static void main(String[] args) throws Exception {
-		if (args.length < 2) {
-			System.err.println("Usage: WordCount <input path> <result path>");
-			return;
-		}
-		
-		final String inputPath = args[0];
-		final String outputPath = args[1];
-		
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.setDegreeOfParallelism(1);
-		
-		// Set up the Hadoop Input Format
-		HadoopInputFormat<LongWritable, Text> hadoopInputFormat = new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, new JobConf());
-		TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath));
-		
-		// Create a Flink job with it
-		DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopInputFormat);
-		
-		// Tokenize the line and convert from Writable "Text" to String for better handling
-		DataSet<Tuple2<String, Integer>> words = text.flatMap(new Tokenizer());
-		
-		// Sum up the words
-		DataSet<Tuple2<String, Integer>> result = words.groupBy(0).aggregate(Aggregations.SUM, 1);
-		
-		// Convert String back to Writable "Text" for use with Hadoop Output Format
-		DataSet<Tuple2<Text, IntWritable>> hadoopResult = result.map(new HadoopDatatypeMapper());
-		
-		// Set up Hadoop Output Format
-		HadoopOutputFormat<Text, IntWritable> hadoopOutputFormat = new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), new JobConf());
-		hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " ");
-		TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath));
-		
-		// Output & Execute
-		hadoopResult.output(hadoopOutputFormat);
-		env.execute("Word Count");
-	}
-	
-	/**
-	 * Splits a line into words and converts Hadoop Writables into normal Java data types.
-	 */
-	public static final class Tokenizer extends FlatMapFunction<Tuple2<LongWritable, Text>, Tuple2<String, Integer>> {
-		
-		@Override
-		public void flatMap(Tuple2<LongWritable, Text> value, Collector<Tuple2<String, Integer>> out) {
-			// normalize and split the line
-			String line = value.f1.toString();
-			String[] tokens = line.toLowerCase().split("\\W+");
-			
-			// emit the pairs
-			for (String token : tokens) {
-				if (token.length() > 0) {
-					out.collect(new Tuple2<String, Integer>(token, 1));
-				}
-			}
-		}
-	}
-	
-	/**
-	 * Converts Java data types to Hadoop Writables.
-	 */
-	public static final class HadoopDatatypeMapper extends MapFunction<Tuple2<String, Integer>, Tuple2<Text, IntWritable>> {
-		
-		@Override
-		public Tuple2<Text, IntWritable> map(Tuple2<String, Integer> value) throws Exception {
-			return new Tuple2<Text, IntWritable>(new Text(value.f0), new IntWritable(value.f1));
-		}
-		
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
deleted file mode 100644
index 415f897..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record;
-
-import java.util.List;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.operators.GenericDataSink;
-import org.apache.flink.compiler.contextcheck.Validatable;
-import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter;
-import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter;
-import org.apache.flink.types.Record;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-/**
- * The HadoopDataSink is a generic wrapper for all Hadoop OutputFormats.
- *
- * Example usage:
- * <pre>
- * 		HadoopDataSink out = new HadoopDataSink(new org.apache.hadoop.mapred.TextOutputFormat<Text, IntWritable>(), new JobConf(), "Hadoop TextOutputFormat",reducer, Text.class,IntWritable.class);
- *		org.apache.hadoop.mapred.TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output));
- * </pre>
- *
- * Note that it is possible to provide custom data type converter.
- *
- * The HadoopDataSink provides a default converter: {@link org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter}
- **/
-public class HadoopDataSink<K,V> extends GenericDataSink implements Validatable {
-
-	private static String DEFAULT_NAME = "<Unnamed Hadoop Data Sink>";
-
-	private JobConf jobConf;
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, FlinkTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, jobConf, name, ImmutableList.<Operator<Record>>of(input), conv, keyClass, valueClass);
-	}
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, jobConf, name, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-	}
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, jobConf, DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-	}
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, new JobConf(), DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-	}
-
-
-
-	@SuppressWarnings("deprecation")
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, FlinkTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) {
-		super(new HadoopRecordOutputFormat<K,V>(hadoopFormat, jobConf, conv),input, name);
-		Preconditions.checkNotNull(hadoopFormat);
-		Preconditions.checkNotNull(jobConf);
-		this.name = name;
-		this.jobConf = jobConf;
-		jobConf.setOutputKeyClass(keyClass);
-		jobConf.setOutputValueClass(valueClass);
-	}
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, jobConf, name, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-	}
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, jobConf, DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-	}
-
-	public HadoopDataSink(OutputFormat<K,V> hadoopFormat, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) {
-		this(hadoopFormat, new JobConf(), DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass);
-	}
-
-	public JobConf getJobConf() {
-		return this.jobConf;
-	}
-
-	@Override
-	public void check() {
-		// see for more details https://github.com/stratosphere/stratosphere/pull/531
-		Preconditions.checkNotNull(FileOutputFormat.getOutputPath(jobConf), "The HadoopDataSink currently expects a correct outputPath.");
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
deleted file mode 100644
index d55fe87..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record;
-
-
-import org.apache.flink.api.java.record.operators.GenericDataSource;
-import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultHadoopTypeConverter;
-import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
-
-import com.google.common.base.Preconditions;
-
-
-
-/**
- * The HadoopDataSource is a generic wrapper for all Hadoop InputFormats.
- * 
- * Example usage:
- * <pre>
- * 		HadoopDataSource source = new HadoopDataSource(new org.apache.hadoop.mapred.TextInputFormat(), new JobConf(), "Input Lines");
- *		org.apache.hadoop.mapred.TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput));
- * </pre>
- * 
- * Note that it is possible to provide custom data type converter.
- * 
- * The HadoopDataSource provides two different standard converters:
- * * WritableWrapperConverter: Converts Hadoop Types to a record that contains a WritableComparableWrapper (key) and a WritableWrapper
- * * DefaultHadoopTypeConverter: Converts the standard hadoop types (longWritable, Text) to Flinks's {@link org.apache.flink.types.Value} types.
- *
- */
-public class HadoopDataSource<K,V> extends GenericDataSource<HadoopRecordInputFormat<K,V>> {
-
-	private static String DEFAULT_NAME = "<Unnamed Hadoop Data Source>";
-	
-	private JobConf jobConf;
-	
-	/**
-	 * 
-	 * @param hadoopFormat Implementation of a Hadoop input format
-	 * @param jobConf JobConf object (Hadoop)
-	 * @param name Name of the DataSource
-	 * @param conv Definition of a custom type converter {@link DefaultHadoopTypeConverter}.
-	 */
-	public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name, HadoopTypeConverter<K,V> conv) {
-		super(new HadoopRecordInputFormat<K,V>(hadoopFormat, jobConf, conv),name);
-		Preconditions.checkNotNull(hadoopFormat);
-		Preconditions.checkNotNull(jobConf);
-		Preconditions.checkNotNull(conv);
-		this.name = name;
-		this.jobConf = jobConf;
-	}
-	
-	public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name) {
-		this(hadoopFormat, jobConf, name, new DefaultHadoopTypeConverter<K,V>() );
-	}
-	public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf) {
-		this(hadoopFormat, jobConf, DEFAULT_NAME);
-	}
-	
-	public HadoopDataSource(InputFormat<K,V> hadoopFormat) {
-		this(hadoopFormat, new JobConf(), DEFAULT_NAME);
-	}
-
-	public JobConf getJobConf() {
-		return this.jobConf;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
deleted file mode 100644
index dcf1952..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.util.ReflectionUtils;
-
-public class HadoopRecordInputFormat<K, V> implements InputFormat<Record, HadoopInputSplit> {
-
-	private static final long serialVersionUID = 1L;
-
-	public org.apache.hadoop.mapred.InputFormat<K, V> hadoopInputFormat;
-	public HadoopTypeConverter<K,V> converter;
-	private String hadoopInputFormatName;
-	public JobConf jobConf;
-	public transient K key;
-	public transient V value;
-	public RecordReader<K, V> recordReader;
-	private boolean fetched = false;
-	private boolean hasNext;
-		
-	public HadoopRecordInputFormat() {
-		super();
-	}
-	
-	public HadoopRecordInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> hadoopInputFormat, JobConf job, HadoopTypeConverter<K,V> conv) {
-		super();
-		this.hadoopInputFormat = hadoopInputFormat;
-		this.hadoopInputFormatName = hadoopInputFormat.getClass().getName();
-		this.converter = conv;
-		HadoopUtils.mergeHadoopConf(job);
-		this.jobConf = job;
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-		
-	}
-
-	@Override
-	public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
-		return null;
-	}
-
-	@Override
-	public HadoopInputSplit[] createInputSplits(int minNumSplits)
-			throws IOException {
-		org.apache.hadoop.mapred.InputSplit[] splitArray = hadoopInputFormat.getSplits(jobConf, minNumSplits);
-		HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length];
-		for(int i=0;i<splitArray.length;i++){
-			hiSplit[i] = new HadoopInputSplit(splitArray[i], jobConf);
-		}
-		return hiSplit;
-	}
-
-	@Override
-	public Class<? extends HadoopInputSplit> getInputSplitType() {
-		return HadoopInputSplit.class;
-	}
-
-	@Override
-	public void open(HadoopInputSplit split) throws IOException {
-		this.recordReader = this.hadoopInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter());
-		key = this.recordReader.createKey();
-		value = this.recordReader.createValue();
-		this.fetched = false;
-	}
-
-	private void fetchNext() throws IOException {
-		hasNext = this.recordReader.next(key, value);
-		fetched = true;
-	}
-	
-	@Override
-	public boolean reachedEnd() throws IOException {
-		if(!fetched) {
-			fetchNext();
-		}
-		return !hasNext;
-	}
-
-	@Override
-	public Record nextRecord(Record record) throws IOException {
-		if(!fetched) {
-			fetchNext();
-		}
-		if(!hasNext) {
-			return null;
-		}
-		converter.convert(record, key, value);
-		fetched = false;
-		return record;
-	}
-
-	@Override
-	public void close() throws IOException {
-		this.recordReader.close();
-	}
-	
-	/**
-	 * Custom serialization methods.
-	 *  @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
-	 */
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeUTF(hadoopInputFormatName);
-		jobConf.write(out);
-		out.writeObject(converter);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		hadoopInputFormatName = in.readUTF();
-		if(jobConf == null) {
-			jobConf = new JobConf();
-		}
-		jobConf.readFields(in);
-		try {
-			this.hadoopInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(this.hadoopInputFormatName).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop input format", e);
-		}
-		ReflectionUtils.setConf(hadoopInputFormat, jobConf);
-		converter = (HadoopTypeConverter<K,V>) in.readObject();
-	}
-	
-	public void setJobConf(JobConf job) {
-		this.jobConf = job;
-	}
-		
-
-	public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() {
-		return hadoopInputFormat;
-	}
-	
-	public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> hadoopInputFormat) {
-		this.hadoopInputFormat = hadoopInputFormat;
-	}
-	
-	public JobConf getJobConf() {
-		return jobConf;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
deleted file mode 100644
index 337b543..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java
+++ /dev/null
@@ -1,156 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopFileOutputCommitter;
-import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter;
-import org.apache.flink.hadoopcompatibility.mapred.utils.HadoopUtils;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable;
-import org.apache.flink.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter;
-import org.apache.flink.types.Record;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RecordWriter;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.util.ReflectionUtils;
-
-
-public class HadoopRecordOutputFormat<K,V> implements OutputFormat<Record> {
-
-	private static final long serialVersionUID = 1L;
-
-	public JobConf jobConf;
-
-	public org.apache.hadoop.mapred.OutputFormat<K,V> hadoopOutputFormat;
-
-	private String hadoopOutputFormatName;
-
-	public RecordWriter<K,V> recordWriter;
-
-	public FlinkTypeConverter<K,V> converter;
-
-	public HadoopFileOutputCommitter fileOutputCommitterWrapper;
-
-	public HadoopRecordOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> hadoopFormat, JobConf job, FlinkTypeConverter<K,V> conv) {
-		super();
-		this.hadoopOutputFormat = hadoopFormat;
-		this.hadoopOutputFormatName = hadoopFormat.getClass().getName();
-		this.converter = conv;
-		this.fileOutputCommitterWrapper = new HadoopFileOutputCommitter();
-		HadoopUtils.mergeHadoopConf(job);
-		this.jobConf = job;
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-	}
-
-	/**
-	 * create the temporary output file for hadoop RecordWriter.
-	 * @param taskNumber The number of the parallel instance.
-	 * @param numTasks The number of parallel tasks.
-	 * @throws IOException
-	 */
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException {
-		this.fileOutputCommitterWrapper.setupJob(this.jobConf);
-		if (Integer.toString(taskNumber + 1).length() <= 6) {
-			this.jobConf.set("mapred.task.id", "attempt__0000_r_" + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") + Integer.toString(taskNumber + 1) + "_0");
-			//compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1
-			this.jobConf.set("mapreduce.task.output.dir", this.fileOutputCommitterWrapper.getTempTaskOutputPath(this.jobConf,TaskAttemptID.forName(this.jobConf.get("mapred.task.id"))).toString());
-		} else {
-			throw new IOException("task id too large");
-		}
-		this.recordWriter = this.hadoopOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable());
-	}
-
-
-	@Override
-	public void writeRecord(Record record) throws IOException {
-		K key = this.converter.convertKey(record);
-		V value = this.converter.convertValue(record);
-		this.recordWriter.write(key, value);
-	}
-
-	/**
-	 * commit the task by moving the output file out from the temporary directory.
-	 * @throws IOException
-	 */
-	@Override
-	public void close() throws IOException {
-		this.recordWriter.close(new HadoopDummyReporter());
-		if (this.fileOutputCommitterWrapper.needsTaskCommit(this.jobConf, TaskAttemptID.forName(this.jobConf.get("mapred.task.id")))) {
-			this.fileOutputCommitterWrapper.commitTask(this.jobConf, TaskAttemptID.forName(this.jobConf.get("mapred.task.id")));
-		}
-	//TODO: commitjob when all the tasks are finished
-	}
-
-
-	/**
-	 * Custom serialization methods.
-	 *  @see http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html
-	 */
-	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeUTF(hadoopOutputFormatName);
-		jobConf.write(out);
-		out.writeObject(converter);
-		out.writeObject(fileOutputCommitterWrapper);
-	}
-
-	@SuppressWarnings("unchecked")
-	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-		hadoopOutputFormatName = in.readUTF();
-		if(jobConf == null) {
-			jobConf = new JobConf();
-		}
-		jobConf.readFields(in);
-		try {
-			this.hadoopOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(this.hadoopOutputFormatName).newInstance();
-		} catch (Exception e) {
-			throw new RuntimeException("Unable to instantiate the hadoop output format", e);
-		}
-		ReflectionUtils.setConf(hadoopOutputFormat, jobConf);
-		converter = (FlinkTypeConverter<K,V>) in.readObject();
-		fileOutputCommitterWrapper = (HadoopFileOutputCommitter) in.readObject();
-	}
-
-
-	public void setJobConf(JobConf job) {
-		this.jobConf = job;
-	}
-
-	public JobConf getJobConf() {
-		return jobConf;
-	}
-
-	public org.apache.hadoop.mapred.OutputFormat<K,V> getHadoopOutputFormat() {
-		return hadoopOutputFormat;
-	}
-
-	public void setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> hadoopOutputFormat) {
-		this.hadoopOutputFormat = hadoopOutputFormat;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
deleted file mode 100644
index 4e63717..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-
-/**
- * Convert Flink Record into the default hadoop writables.
- */
-public class DefaultFlinkTypeConverter<K,V> implements FlinkTypeConverter<K,V> {
-	private static final long serialVersionUID = 1L;
-
-	private Class<K> keyClass;
-	private Class<V> valueClass;
-
-	public DefaultFlinkTypeConverter(Class<K> keyClass, Class<V> valueClass) {
-		this.keyClass= keyClass;
-		this.valueClass = valueClass;
-	}
-	@Override
-	public K convertKey(Record flinkRecord) {
-		if(flinkRecord.getNumFields() > 0) {
-			return convert(flinkRecord, 0, this.keyClass);
-		} else {
-			return null;
-		}
-	}
-
-	@Override
-	public V convertValue(Record flinkRecord) {
-		if(flinkRecord.getNumFields() > 1) {
-			return convert(flinkRecord, 1, this.valueClass);
-		} else {
-			return null;
-		}
-	}
-
-	@SuppressWarnings("unchecked")
-	private<T> T convert(Record flinkType, int pos, Class<T> hadoopType) {
-		if(hadoopType == LongWritable.class ) {
-			return (T) new LongWritable((flinkType.getField(pos, LongValue.class)).getValue());
-		}
-		if(hadoopType == org.apache.hadoop.io.Text.class) {
-			return (T) new Text((flinkType.getField(pos, StringValue.class)).getValue());
-		}
-		if(hadoopType == org.apache.hadoop.io.IntWritable.class) {
-			return (T) new IntWritable((flinkType.getField(pos, IntValue.class)).getValue());
-		}
-		if(hadoopType == org.apache.hadoop.io.FloatWritable.class) {
-			return (T) new FloatWritable((flinkType.getField(pos, FloatValue.class)).getValue());
-		}
-		if(hadoopType == org.apache.hadoop.io.DoubleWritable.class) {
-			return (T) new DoubleWritable((flinkType.getField(pos, DoubleValue.class)).getValue());
-		}
-		if(hadoopType == org.apache.hadoop.io.BooleanWritable.class) {
-			return (T) new BooleanWritable((flinkType.getField(pos, BooleanValue.class)).getValue());
-		}
-		if(hadoopType == org.apache.hadoop.io.ByteWritable.class) {
-			return (T) new ByteWritable((flinkType.getField(pos, ByteValue.class)).getValue());
-		}
-
-		throw new RuntimeException("Unable to convert Flink type ("+flinkType.getClass().getCanonicalName()+") to Hadoop.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
deleted file mode 100644
index c053e36..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import org.apache.flink.types.BooleanValue;
-import org.apache.flink.types.ByteValue;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.FloatValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-import org.apache.flink.types.Value;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.Text;
-
-
-/**
- * Converter for the default hadoop writables.
- * Key will be in field 0, Value in field 1 of a Record.
- */
-public class DefaultHadoopTypeConverter<K, V> implements HadoopTypeConverter<K, V> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public void convert(Record flinkRecord, K hadoopKey, V hadoopValue) {
-		flinkRecord.setField(0, convert(hadoopKey));
-		flinkRecord.setField(1, convert(hadoopValue));
-	}
-	
-	protected Value convert(Object hadoopType) {
-		if(hadoopType instanceof org.apache.hadoop.io.LongWritable ) {
-			return new LongValue(((LongWritable)hadoopType).get());
-		}
-		if(hadoopType instanceof org.apache.hadoop.io.Text) {
-			return new StringValue(((Text)hadoopType).toString());
-		}
-		if(hadoopType instanceof org.apache.hadoop.io.IntWritable) {
-			return new IntValue(((IntWritable)hadoopType).get());
-		}
-		if(hadoopType instanceof org.apache.hadoop.io.FloatWritable) {
-			return new FloatValue(((FloatWritable)hadoopType).get());
-		}
-		if(hadoopType instanceof org.apache.hadoop.io.DoubleWritable) {
-			return new DoubleValue(((DoubleWritable)hadoopType).get());
-		}
-		if(hadoopType instanceof org.apache.hadoop.io.BooleanWritable) {
-			return new BooleanValue(((BooleanWritable)hadoopType).get());
-		}
-		if(hadoopType instanceof org.apache.hadoop.io.ByteWritable) {
-			return new ByteValue(((ByteWritable)hadoopType).get());
-		}
-		if (hadoopType instanceof NullWritable) {
-			return NullValue.getInstance();
-		}
-		
-		throw new RuntimeException("Unable to convert Hadoop type ("+hadoopType.getClass().getCanonicalName()+") to a Flink data type.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
deleted file mode 100644
index 9e33606..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import java.io.Serializable;
-
-import org.apache.flink.types.Record;
-
-/**
- * An interface describing a class that is able to
- * convert Flink's Record into Hadoop types model.
- *
- * The converter must be Serializable.
- *
- * Flink provides a DefaultFlinkTypeConverter. Custom implementations should
- * chain the type converters.
- */
-public interface FlinkTypeConverter<K,V> extends Serializable {
-
-	/**
-	 * Convert a Flink type to a Hadoop type.
-	 */
-	public K convertKey(Record record);
-
-	public V convertValue(Record record);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
deleted file mode 100644
index 1a35dc0..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URI;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.FileOutputCommitter;
-import org.apache.hadoop.mapred.FileOutputFormat;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * Hadoop 1.2.1 {@link org.apache.hadoop.mapred.FileOutputCommitter} takes {@link org.apache.hadoop.mapred.JobContext}
- * as input parameter. However JobContext class is package private, and in Hadoop 2.2.0 it's public.
- * This class takes {@link org.apache.hadoop.mapred.JobConf} as input instead of JobContext in order to setup and commit tasks.
- */
-public class HadoopFileOutputCommitter extends FileOutputCommitter implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	
-	static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
-		"mapreduce.fileoutputcommitter.marksuccessfuljobs";
-
-	public void setupJob(JobConf conf) throws IOException {
-		Path outputPath = FileOutputFormat.getOutputPath(conf);
-		if (outputPath != null) {
-			Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
-			FileSystem fileSys = tmpDir.getFileSystem(conf);
-			if (!fileSys.mkdirs(tmpDir)) {
-				LOG.error("Mkdirs failed to create " + tmpDir.toString());
-			}
-		}
-	}
-
-	private static boolean getOutputDirMarking(JobConf conf) {
-		return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,  true);
-	}
-
-	private void markSuccessfulOutputDir(JobConf conf)
-		throws IOException {
-		Path outputPath = FileOutputFormat.getOutputPath(conf);
-		if (outputPath != null) {
-			FileSystem fileSys = outputPath.getFileSystem(conf);
-			// create a file in the folder to mark it
-			if (fileSys.exists(outputPath)) {
-				Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
-				fileSys.create(filePath).close();
-			}
-		}
-	}
-
-	private Path getFinalPath(Path jobOutputDir, Path taskOutput,
-							Path taskOutputPath) throws IOException {
-		URI taskOutputUri = taskOutput.toUri();
-		URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
-		if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput
-			throw new IOException("Can not get the relative path: base = " +
-				taskOutputPath + " child = " + taskOutput);
-		}
-		if (relativePath.getPath().length() > 0) {
-			return new Path(jobOutputDir, relativePath.getPath());
-		} else {
-			return jobOutputDir;
-		}
-	}
-	private void moveTaskOutputs(JobConf conf, TaskAttemptID taskAttemptID,
-								FileSystem fs,
-								Path jobOutputDir,
-								Path taskOutput)
-		throws IOException {
-		if (fs.isFile(taskOutput)) {
-			Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
-				getTempTaskOutputPath(conf, taskAttemptID));
-			if (!fs.rename(taskOutput, finalOutputPath)) {
-				if (!fs.delete(finalOutputPath, true)) {
-					throw new IOException("Failed to delete earlier output of task: " +
-						taskAttemptID);
-				}
-				if (!fs.rename(taskOutput, finalOutputPath)) {
-					throw new IOException("Failed to save output of task: " +
-						taskAttemptID);
-				}
-			}
-			LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
-		} else if(fs.getFileStatus(taskOutput).isDir()) {
-			FileStatus[] paths = fs.listStatus(taskOutput);
-			Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput,
-				getTempTaskOutputPath(conf, taskAttemptID));
-			fs.mkdirs(finalOutputPath);
-			if (paths != null) {
-				for (FileStatus path : paths) {
-					moveTaskOutputs(conf,taskAttemptID, fs, jobOutputDir, path.getPath());
-				}
-			}
-		}
-	}
-
-	public void commitTask(JobConf conf, TaskAttemptID taskAttemptID)
-		throws IOException {
-		Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID);
-		if (taskOutputPath != null) {
-			FileSystem fs = taskOutputPath.getFileSystem(conf);
-			if (fs.exists(taskOutputPath)) {
-				Path jobOutputPath = taskOutputPath.getParent().getParent();
-				// Move the task outputs to their final place
-				moveTaskOutputs(conf,taskAttemptID, fs, jobOutputPath, taskOutputPath);
-				// Delete the temporary task-specific output directory
-				if (!fs.delete(taskOutputPath, true)) {
-					LOG.info("Failed to delete the temporary output" +
-						" directory of task: " + taskAttemptID + " - " + taskOutputPath);
-				}
-				LOG.info("Saved output of task '" + taskAttemptID + "' to " +
-					jobOutputPath);
-			}
-		}
-	}
-	public boolean needsTaskCommit(JobConf conf, TaskAttemptID taskAttemptID)
-		throws IOException {
-		try {
-			Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID);
-			if (taskOutputPath != null) {
-				// Get the file-system for the task output directory
-				FileSystem fs = taskOutputPath.getFileSystem(conf);
-				// since task output path is created on demand,
-				// if it exists, task needs a commit
-				if (fs.exists(taskOutputPath)) {
-					return true;
-				}
-			}
-		} catch (IOException  ioe) {
-			throw ioe;
-		}
-		return false;
-	}
-
-	public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID taskAttemptID) {
-		Path outputPath = FileOutputFormat.getOutputPath(conf);
-		if (outputPath != null) {
-			Path p = new Path(outputPath,
-				(FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
-					"_" + taskAttemptID.toString()));
-			try {
-				FileSystem fs = p.getFileSystem(conf);
-				return p.makeQualified(fs);
-			} catch (IOException ie) {
-				LOG.warn(StringUtils.stringifyException(ie));
-				return p;
-			}
-		}
-		return null;
-	}
-	public void cleanupJob(JobConf conf) throws IOException {
-		// do the clean up of temporary directory
-		Path outputPath = FileOutputFormat.getOutputPath(conf);
-		if (outputPath != null) {
-			Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
-			FileSystem fileSys = tmpDir.getFileSystem(conf);
-			if (fileSys.exists(tmpDir)) {
-				fileSys.delete(tmpDir, true);
-			}
-		} else {
-			LOG.warn("Output path is null in cleanup");
-		}
-	}
-
-	public void commitJob(JobConf conf) throws IOException {
-		cleanupJob(conf);
-		if (getOutputDirMarking(conf)) {
-			markSuccessfulOutputDir(conf);
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
deleted file mode 100644
index 5860d26..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import java.io.Serializable;
-
-import org.apache.flink.types.Record;
-
-
-/**
- * An interface describing a class that is able to 
- * convert Hadoop types into Flink's Record model.
- * 
- * The converter must be Serializable.
- * 
- * Flink provides a DefaultHadoopTypeConverter. Custom implementations should
- * chain the type converters.
- */
-public interface HadoopTypeConverter<K, V> extends Serializable {
-	
-	/**
-	 * Convert a Hadoop type to a Flink type.
-	 */
-	public void convert(Record record, K hadoopKey, V hadoopValue);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
deleted file mode 100644
index 0a459b8..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import org.apache.flink.types.Key;
-import org.apache.hadoop.io.WritableComparable;
-
-public class WritableComparableWrapper<T extends WritableComparable<T>> extends WritableWrapper<T> implements Key<WritableComparableWrapper<T>> {
-	private static final long serialVersionUID = 1L;
-	
-	public WritableComparableWrapper() {
-		super();
-	}
-	
-	public WritableComparableWrapper(T toWrap) {
-		super(toWrap);
-	}
-
-	@Override
-	public int compareTo(WritableComparableWrapper<T> o) {
-		return super.value().compareTo(o.value());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4771efc2/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java b/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
deleted file mode 100644
index 629b91e..0000000
--- a/flink-addons/hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.hadoopcompatibility.mapred.record.datatypes;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.hadoop.io.Writable;
-
-public class WritableWrapper<T extends Writable> implements Value {
-	private static final long serialVersionUID = 2L;
-	
-	private T wrapped;
-	private String wrappedType;
-	private ClassLoader cl;
-	
-	public WritableWrapper() {
-	}
-	
-	public WritableWrapper(T toWrap) {
-		wrapped = toWrap;
-		wrappedType = toWrap.getClass().getCanonicalName();
-	}
-
-	public T value() {
-		return wrapped;
-	}
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeUTF(wrappedType);
-		wrapped.write(out);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		if(cl == null) {
-			cl = Thread.currentThread().getContextClassLoader();
-		}
-		wrappedType = in.readUTF();
-		try {
-			@SuppressWarnings("unchecked")
-			Class<T> wrClass = (Class<T>) Class.forName(wrappedType, true, cl).asSubclass(Writable.class);
-			wrapped = InstantiationUtil.instantiate(wrClass, Writable.class);
-		} catch (ClassNotFoundException e) {
-			throw new RuntimeException("Error creating the WritableWrapper", e);
-		}
-		wrapped.readFields(in);
-	}
-
-}


Mime
View raw message