flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-2057] [FLINK-2058] [core] Fix hadoop input split class loading and remove IOReadableWritable from InputSplits
Date Wed, 20 May 2015 16:08:45 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7bd80689b -> f27025baf


[FLINK-2057] [FLINK-2058] [core] Fix hadoop input split class loading and remove IOReadableWritable
from InputSplits


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f27025ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f27025ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f27025ba

Branch: refs/heads/master
Commit: f27025bafbe45fbfe8bfd27d06f01258a4ff9f44
Parents: 7bd8068
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed May 20 15:04:51 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed May 20 15:37:16 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/core/fs/FileInputSplit.java    |  57 +-------
 .../apache/flink/core/io/GenericInputSplit.java |  38 +-----
 .../org/apache/flink/core/io/InputSplit.java    |   5 +-
 .../flink/core/io/LocatableInputSplit.java      |  64 ++-------
 .../hadoop/mapred/wrapper/HadoopInputSplit.java | 129 ++++++++-----------
 .../mapreduce/wrapper/HadoopInputSplit.java     | 113 +++++++---------
 .../record/io/ExternalProcessInputSplit.java    |  26 +---
 .../api/java/io/CollectionInputFormatTest.java  |   4 +-
 .../runtime/jobgraph/JobTaskVertexTest.java     |   9 +-
 .../flink/addons/hbase/TableInputSplit.java     |  93 +------------
 .../jar/CustomInputSplitProgram.java            |  19 +--
 .../scala/io/CollectionInputFormatTest.scala    |   4 +-
 12 files changed, 147 insertions(+), 414 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f27025ba/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
index bd93e28..31ca3c2 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileInputSplit.java
@@ -18,11 +18,7 @@
 
 package org.apache.flink.core.fs;
 
-import java.io.IOException;
-
 import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 
 /**
  * A file input split provides information on a particular part of a file, possibly
@@ -32,19 +28,13 @@ public class FileInputSplit extends LocatableInputSplit {
 
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * The path of the file this file split refers to.
-	 */
-	private Path file;
+	/** The path of the file this file split refers to. */
+	private final Path file;
 
-	/**
-	 * The position of the first byte in the file to process.
-	 */
+	/** The position of the first byte in the file to process. */
 	private long start;
 
-	/**
-	 * The number of bytes in the file to process.
-	 */
+	/** The number of bytes in the file to process. */
 	private long length;
 
 	// --------------------------------------------------------------------------------------------
@@ -71,11 +61,6 @@ public class FileInputSplit extends LocatableInputSplit {
 		this.length = length;
 	}
 
-	/**
-	 * Default constructor for deserialization.
-	 */
-	public FileInputSplit() {}
-
 	// --------------------------------------------------------------------------------------------
 	
 	/**
@@ -106,40 +91,6 @@ public class FileInputSplit extends LocatableInputSplit {
 	}
 	
 	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		super.write(out);
-
-		// write start and length
-		out.writeLong(this.start);
-		out.writeLong(this.length);
-		
-		// write file
-		if (this.file != null) {
-			out.writeBoolean(true);
-			this.file.write(out);
-		} else {
-			out.writeBoolean(false);
-		}
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		super.read(in);
-		
-		this.start = in.readLong();
-		this.length = in.readLong();
-		
-		// read file path
-		boolean isNotNull = in.readBoolean();
-		if (isNotNull) {
-			this.file = new Path();
-			this.file.read(in);
-		}
-	}
-	
-	// --------------------------------------------------------------------------------------------
 	
 	@Override
 	public int hashCode() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f27025ba/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
index 93782dd..d96d264 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/GenericInputSplit.java
@@ -18,11 +18,6 @@
 
 package org.apache.flink.core.io;
 
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
 /**
  * A generic input split that has only a partition number.
  */
@@ -30,24 +25,15 @@ public class GenericInputSplit implements InputSplit, java.io.Serializable
{
 
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * The number of this split.
-	 */
-	private int partitionNumber;
+	/** The number of this split. */
+	private final int partitionNumber;
 
-	/**
-	 * The total number of partitions
-	 */
-	private int totalNumberOfPartitions;
+	/** The total number of partitions */
+	private final int totalNumberOfPartitions;
 	
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Default constructor for instantiation during de-serialization.
-	 */
-	public GenericInputSplit() {}
-
-	/**
 	 * Creates a generic input split with the given split number.
 	 * 
 	 * @param partitionNumber The number of the split's partition.
@@ -70,20 +56,6 @@ public class GenericInputSplit implements InputSplit, java.io.Serializable
{
 	}
 	
 	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.partitionNumber);
-		out.writeInt(this.totalNumberOfPartitions);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.partitionNumber = in.readInt();
-		this.totalNumberOfPartitions = in.readInt();
-	}
-	
-	// --------------------------------------------------------------------------------------------
 
 	@Override
 	public int hashCode() {
@@ -102,6 +74,6 @@ public class GenericInputSplit implements InputSplit, java.io.Serializable
{
 	}
 	
 	public String toString() {
-		return "GenericSplit (" + this.partitionNumber + "/" + this.totalNumberOfPartitions + ")";
+		return "GenericSplit (" + this.partitionNumber + '/' + this.totalNumberOfPartitions + ')';
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f27025ba/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
index 02d1744..87dd073 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/InputSplit.java
@@ -22,8 +22,11 @@ import java.io.Serializable;
 
 /**
  * This interface must be implemented by all kind of input splits that can be assigned to
input formats.
+ * 
+ * <p>Input splits are transferred in serialized form via the messages, so they need
to be serializable
+ * as defined by {@link java.io.Serializable}.</p>
  */
-public interface InputSplit extends IOReadableWritable, Serializable {
+public interface InputSplit extends Serializable {
 	
 	/**
 	 * Returns the number of this input split.

http://git-wip-us.apache.org/repos/asf/flink/blob/f27025ba/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java b/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
index a373639..6544f1c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
+++ b/flink-core/src/main/java/org/apache/flink/core/io/LocatableInputSplit.java
@@ -18,13 +18,8 @@
 
 package org.apache.flink.core.io;
 
-import java.io.IOException;
 import java.util.Arrays;
 
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.StringUtils;
-
 /**
  * A locatable input split is an input split referring to input data which is located on
one or more hosts.
  */
@@ -34,41 +29,36 @@ public class LocatableInputSplit implements InputSplit, java.io.Serializable
{
 
 	private static final String[] EMPTY_ARR = new String[0];
 	
-	/**
-	 * The number of the split.
-	 */
-	private int splitNumber;
+	/** The number of the split. */
+	private final int splitNumber;
 
-	/**
-	 * The names of the hosts storing the data this input split refers to.
-	 */
-	private String[] hostnames;
+	/** The names of the hosts storing the data this input split refers to. */
+	private final String[] hostnames;
 
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * Creates a new locatable input split.
+	 * Creates a new locatable input split that refers to a multiple host as its data location.
 	 * 
-	 * @param splitNumber
-	 *        the number of the split
-	 * @param hostnames
-	 *        the names of the hosts storing the data this input split refers to
+	 * @param splitNumber The number of the split
+	 * @param hostnames The names of the hosts storing the data this input split refers to.
 	 */
 	public LocatableInputSplit(int splitNumber, String[] hostnames) {
 		this.splitNumber = splitNumber;
 		this.hostnames = hostnames == null ? EMPTY_ARR : hostnames;
 	}
-	
+
+	/**
+	 * Creates a new locatable input split that refers to a single host as its data location.
+	 *
+	 * @param splitNumber The number of the split.
+	 * @param hostname The names of the host storing the data this input split refers to.
+	 */
 	public LocatableInputSplit(int splitNumber, String hostname) {
 		this.splitNumber = splitNumber;
 		this.hostnames = hostname == null ? EMPTY_ARR : new String[] { hostname };
 	}
 
-	/**
-	 * Default constructor for serialization/deserialization.
-	 */
-	public LocatableInputSplit() {}
-
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
@@ -84,32 +74,6 @@ public class LocatableInputSplit implements InputSplit, java.io.Serializable
{
 	public String[] getHostnames() {
 		return this.hostnames;
 	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.splitNumber);
-		out.writeInt(this.hostnames.length);
-		for (String hostname : this.hostnames) {
-			StringUtils.writeNullableString(hostname, out);
-		}
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		this.splitNumber = in.readInt();
-
-		final int numHosts = in.readInt();
-		if (numHosts == 0) {
-			this.hostnames = EMPTY_ARR;
-		} else {
-			this.hostnames = new String[numHosts];
-			for (int i = 0; i < numHosts; i++) {
-				this.hostnames[i] = StringUtils.readNullableString(in);
-			}
-		}
-	}
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/f27025ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
index da99404..dee5452 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapred/wrapper/HadoopInputSplit.java
@@ -24,8 +24,7 @@ import java.io.ObjectOutputStream;
 
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.mapred.JobConf;
@@ -36,104 +35,84 @@ import org.apache.hadoop.mapred.JobConf;
  */
 public class HadoopInputSplit extends LocatableInputSplit {
 
-	private static final long serialVersionUID = 1L;
+	private static final long serialVersionUID = -6990336376163226160L;
 	
-	private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
 	
-	private JobConf jobConf;
+	private final Class<? extends org.apache.hadoop.mapred.InputSplit> splitType;
 	
-	private int splitNumber;
-	private String hadoopInputSplitTypeName;
-
-
-	public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
-		return hadoopInputSplit;
-	}
-
-	public HadoopInputSplit() {
-		super();
-	}
+	private transient JobConf jobConf;
 
+	private transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit;
+	
+	
 	public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapred.InputSplit hInputSplit,
JobConf jobconf) {
+		super(splitNumber, (String) null);
 
-		this.splitNumber = splitNumber;
-		this.hadoopInputSplit = hInputSplit;
-		this.hadoopInputSplitTypeName = hInputSplit.getClass().getName();
-		this.jobConf = jobconf;
+		if (hInputSplit == null) {
+			throw new NullPointerException("Hadoop input split must not be null");
+		}
+		if (jobconf == null) {
+			throw new NullPointerException("Hadoop JobConf must not be null");
+		}
+		
+		this.splitType = hInputSplit.getClass();
 
+		this.jobConf = jobconf;
+		this.hadoopInputSplit = hInputSplit;
 	}
 
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(splitNumber);
-		out.writeUTF(hadoopInputSplitTypeName);
-		jobConf.write(out);
-		hadoopInputSplit.write(out);
-	}
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
 
 	@Override
-	public void read(DataInputView in) throws IOException {
-		this.splitNumber = in.readInt();
-		this.hadoopInputSplitTypeName = in.readUTF();
-		if(hadoopInputSplit == null) {
-			try {
-				Class<? extends org.apache.hadoop.io.Writable> inputSplit =
-						Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
-				this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(
inputSplit );
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Unable to create InputSplit", e);
-			}
+	public String[] getHostnames() {
+		try {
+			return this.hadoopInputSplit.getLocations();
 		}
-		jobConf = new JobConf();
-		jobConf.readFields(in);
-		if (this.hadoopInputSplit instanceof Configurable) {
-			((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
+		catch(IOException e) {
+			return new String[0];
 		}
-		this.hadoopInputSplit.readFields(in);
-
 	}
+	
+	public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() {
+		return hadoopInputSplit;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Serialization
+	// ------------------------------------------------------------------------
 
 	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeInt(splitNumber);
-		out.writeUTF(hadoopInputSplitTypeName);
+		// serialize the parent fields and the final fields
+		out.defaultWriteObject();
+
+		// the job conf knows how to serialize itself
 		jobConf.write(out);
+		
+		// write the input split
 		hadoopInputSplit.write(out);
-
 	}
 
 	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
-		this.splitNumber=in.readInt();
-		this.hadoopInputSplitTypeName = in.readUTF();
-		if(hadoopInputSplit == null) {
-			try {
-				Class<? extends org.apache.hadoop.io.Writable> inputSplit =
-						Class.forName(hadoopInputSplitTypeName).asSubclass(org.apache.hadoop.io.Writable.class);
-				this.hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(
inputSplit );
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Unable to create InputSplit", e);
-			}
-		}
+		// read the parent fields and the final fields
+		in.defaultReadObject();
+
+		// the job conf knows how to deserialize itself
 		jobConf = new JobConf();
 		jobConf.readFields(in);
-		if (this.hadoopInputSplit instanceof Configurable) {
-			((Configurable) this.hadoopInputSplit).setConf(this.jobConf);
+		
+		
+		try {
+			hadoopInputSplit = (org.apache.hadoop.mapred.InputSplit) WritableFactories.newInstance(splitType);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate Hadoop InputSplit", e);
 		}
-		this.hadoopInputSplit.readFields(in);
-	}
-
-	@Override
-	public int getSplitNumber() {
-		return this.splitNumber;
-	}
 
-	@Override
-	public String[] getHostnames() {
-		try {
-			return this.hadoopInputSplit.getLocations();
-		} catch(IOException ioe) {
-			return new String[0];
+		if (hadoopInputSplit instanceof Configurable) {
+			((Configurable) hadoopInputSplit).setConf(this.jobConf);
 		}
+		hadoopInputSplit.readFields(in);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f27025ba/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
index 16d7f3a..15f7b9e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/hadoop/mapreduce/wrapper/HadoopInputSplit.java
@@ -24,8 +24,7 @@ import java.io.ObjectOutputStream;
 
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
+
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -35,95 +34,71 @@ import org.apache.hadoop.mapreduce.JobContext;
  * a Flink {@link InputSplit}.
  */
 public class HadoopInputSplit extends LocatableInputSplit {
+
+	private static final long serialVersionUID = 6119153593707857235L;
 	
-	private static final long serialVersionUID = 1L;
-	
-	public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit;
-	public transient JobContext jobContext;
-	
-	private int splitNumber;
-	
-	public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
-		return mapreduceInputSplit;
-	}
 	
+	private final Class<? extends org.apache.hadoop.mapreduce.InputSplit> splitType;
 	
-	public HadoopInputSplit() {
-		super();
-	}
+	private transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit;
 	
 	
 	public HadoopInputSplit(int splitNumber, org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit,
JobContext jobContext) {
-		this.splitNumber = splitNumber;
-		if(!(mapreduceInputSplit instanceof Writable)) {
+		super(splitNumber, (String) null);
+
+		if (mapreduceInputSplit == null) {
+			throw new NullPointerException("Hadoop input split must not be null");
+		}
+		if (!(mapreduceInputSplit instanceof Writable)) {
 			throw new IllegalArgumentException("InputSplit must implement Writable interface.");
 		}
+		this.splitType = mapreduceInputSplit.getClass();
 		this.mapreduceInputSplit = mapreduceInputSplit;
-		this.jobContext = jobContext;
 	}
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeInt(this.splitNumber);
-		out.writeUTF(this.mapreduceInputSplit.getClass().getName());
-		Writable w = (Writable) this.mapreduceInputSplit;
-		w.write(out);
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() {
+		return mapreduceInputSplit;
 	}
-	
+
 	@Override
-	public void read(DataInputView in) throws IOException {
-		this.splitNumber = in.readInt();
-		String className = in.readUTF();
-		
-		if(this.mapreduceInputSplit == null) {
-			try {
-				Class<? extends org.apache.hadoop.io.Writable> inputSplit = 
-						Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
-				this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
-			} catch (Exception e) {
-				throw new RuntimeException("Unable to create InputSplit", e);
-			}
+	public String[] getHostnames() {
+		try {
+			return mapreduceInputSplit.getLocations();
+		}
+		catch (Exception e) {
+			return new String[0];
 		}
-		((Writable)this.mapreduceInputSplit).readFields(in);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Serialization
+	// ------------------------------------------------------------------------
+
 	private void writeObject(ObjectOutputStream out) throws IOException {
-		out.writeInt(this.splitNumber);
-		out.writeUTF(this.mapreduceInputSplit.getClass().getName());
-		Writable w = (Writable) this.mapreduceInputSplit;
-		w.write(out);
+		// serialize the parent fields and the final fields
+		out.defaultWriteObject();
 
+		// write the input split
+		((Writable) mapreduceInputSplit).write(out);
 	}
 
 	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException
{
-		this.splitNumber=in.readInt();
-		String className = in.readUTF();
+		// read the parent fields and the final fields
+		in.defaultReadObject();
 
-		if(this.mapreduceInputSplit == null) {
-			try {
-				Class<? extends org.apache.hadoop.io.Writable> inputSplit =
-						Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class);
-				this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit);
-			} catch (Exception e) {
-				throw new RuntimeException("Unable to create InputSplit", e);
-			}
-		}
-		((Writable)this.mapreduceInputSplit).readFields(in);
-	}
-	
-	@Override
-	public int getSplitNumber() {
-		return this.splitNumber;
-	}
-	
-	@Override
-	public String[] getHostnames() {
 		try {
-			return this.mapreduceInputSplit.getLocations();
-		} catch (IOException e) {
-			return new String[0];
-		} catch (InterruptedException e) {
-			return new String[0];
+			Class<? extends Writable> writableSplit = splitType.asSubclass(Writable.class);
+			mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(writableSplit);
+		} 
+		
+		catch (Exception e) {
+			throw new RuntimeException("Unable to instantiate the Hadoop InputSplit", e);
 		}
+		
+		((Writable) mapreduceInputSplit).readFields(in);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f27025ba/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
index 9f0b345..e087cb1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
@@ -18,15 +18,11 @@
 
 package org.apache.flink.api.java.record.io;
 
-import java.io.IOException;
-
 import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.util.StringUtils;
 
 /**
- * The ExternalProcessInputSplit contains all informations for {@link org.apache.flink.api.common.io.InputFormat}
that read their data from external processes.
+ * The ExternalProcessInputSplit contains all information for {@link org.apache.flink.api.common.io.InputFormat}
+ * that read their data from external processes.
  * Each parallel instance of an InputFormat starts an external process and reads its output.
  * The command to start the external process must be executable on all nodes.
  * 
@@ -38,10 +34,7 @@ public class ExternalProcessInputSplit extends GenericInputSplit {
 	private static final long serialVersionUID = 1L;
 	
 	// command to be executed for this input split
-	private String extProcessCommand;
-	
-	// default constructor for deserialization
-	public ExternalProcessInputSplit() { }
+	private final String extProcessCommand;
 	
 	/**
 	 * Instantiates an ExternalProcessInputSplit
@@ -62,17 +55,4 @@ public class ExternalProcessInputSplit extends GenericInputSplit {
 	public String getExternalProcessCommand() {
 		return this.extProcessCommand;
 	}
-	
-	
-	@Override
-	public void read(DataInputView in) throws IOException {
-		super.read(in);
-		this.extProcessCommand = StringUtils.readNullableString(in);
-	}
-
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		super.write(out);
-		StringUtils.writeNullableString(this.extProcessCommand, out);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f27025ba/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
index cd495b5..ad491d0 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java
@@ -109,7 +109,7 @@ public class CollectionInputFormatTest {
 			@SuppressWarnings("unchecked")
 			CollectionInputFormat<ElementType> result = (CollectionInputFormat<ElementType>)
serializationResult;
 
-			GenericInputSplit inputSplit = new GenericInputSplit();
+			GenericInputSplit inputSplit = new GenericInputSplit(0, 1);
 			inputFormat.open(inputSplit);
 			result.open(inputSplit);
 
@@ -187,7 +187,7 @@ public class CollectionInputFormatTest {
 			int i = 0;
 			@SuppressWarnings("unchecked")
 			CollectionInputFormat<String> in = (CollectionInputFormat<String>) result;
-			in.open(new GenericInputSplit());
+			in.open(new GenericInputSplit(0, 1));
 			
 			while (!in.reachedEnd()) {
 				assertEquals(data[i++], in.nextRecord(""));

http://git-wip-us.apache.org/repos/asf/flink/blob/f27025ba/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
index 1de1e84..b1d05e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobTaskVertexTest.java
@@ -144,7 +144,12 @@ public class JobTaskVertexTest {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	private static final class TestSplit extends GenericInputSplit {}
+	private static final class TestSplit extends GenericInputSplit {
+		
+		public TestSplit(int partitionNumber, int totalNumberOfPartitions) {
+			super(partitionNumber, totalNumberOfPartitions);
+		}
+	}
 	
 	private static final class TestInputFormat extends GenericInputFormat<Object> {
 
@@ -160,7 +165,7 @@ public class JobTaskVertexTest {
 		
 		@Override
 		public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
-			return new GenericInputSplit[] { new TestSplit() };
+			return new GenericInputSplit[] { new TestSplit(0, 1) };
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f27025ba/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
b/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
index 6d8bf42..75f0b9b 100644
--- a/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
+++ b/flink-staging/flink-hbase/src/main/java/org/apache/flink/addons/hbase/TableInputSplit.java
@@ -15,13 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.addons.hbase;
 
-import java.io.IOException;
+package org.apache.flink.addons.hbase;
 
 import org.apache.flink.core.io.LocatableInputSplit;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 
 /**
  * This class implements a input splits for HBase. Each table input split corresponds to
a key range (low, high). All
@@ -31,20 +28,14 @@ public class TableInputSplit extends LocatableInputSplit {
 
 	private static final long serialVersionUID = 1L;
 
-	/**
-	 * The name of the table to retrieve data from
-	 */
-	private byte[] tableName;
+	/** The name of the table to retrieve data from */
+	private final byte[] tableName;
 
-	/**
-	 * The start row of the split.
-	 */
-	private byte[] startRow;
+	/** The start row of the split. */
+	private final byte[] startRow;
 
-	/**
-	 * The end row of the split.
-	 */
-	private byte[] endRow;
+	/** The end row of the split. */
+	private final byte[] endRow;
 
 	/**
 	 * Creates a new table input split
@@ -70,17 +61,6 @@ public class TableInputSplit extends LocatableInputSplit {
 	}
 
 	/**
-	 * Default constructor for serialization/deserialization.
-	 */
-	public TableInputSplit() {
-		super();
-
-		this.tableName = null;
-		this.startRow = null;
-		this.endRow = null;
-	}
-
-	/**
 	 * Returns the table name.
 	 * 
 	 * @return The table name.
@@ -106,63 +86,4 @@ public class TableInputSplit extends LocatableInputSplit {
 	public byte[] getEndRow() {
 		return this.endRow;
 	}
-
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-
-		super.write(out);
-
-		// Write the table name
-		if (this.tableName == null) {
-			out.writeInt(-1);
-		} else {
-			out.writeInt(this.tableName.length);
-			out.write(this.tableName);
-		}
-
-		// Write the start row
-		if (this.startRow == null) {
-			out.writeInt(-1);
-		} else {
-			out.writeInt(this.startRow.length);
-			out.write(this.startRow);
-		}
-
-		// Write the end row
-		if (this.endRow == null) {
-			out.writeInt(-1);
-		} else {
-			out.writeInt(this.endRow.length);
-			out.write(this.endRow);
-		}
-	}
-
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-
-		super.read(in);
-
-		// Read the table name
-		int len = in.readInt();
-		if (len >= 0) {
-			this.tableName = new byte[len];
-			in.readFully(this.tableName);
-		}
-
-		// Read the start row
-		len = in.readInt();
-		if (len >= 0) {
-			this.startRow = new byte[len];
-			in.readFully(this.startRow);
-		}
-
-		// Read the end row
-		len = in.readInt();
-		if (len >= 0) {
-			this.endRow = new byte[len];
-			in.readFully(this.endRow);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f27025ba/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
index e251f8d..36b56e0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/jar/CustomInputSplitProgram.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.test.classloading.jar;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -36,8 +35,6 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
 
 @SuppressWarnings("serial")
 public class CustomInputSplitProgram {
@@ -126,11 +123,7 @@ public class CustomInputSplitProgram {
 
 		private static final long serialVersionUID = 1L;
 
-		private int splitNumber;
-
-		public CustomInputSplit() {
-			this(-1);
-		}
+		private final int splitNumber;
 
 		public CustomInputSplit(int splitNumber) {
 			this.splitNumber = splitNumber;
@@ -140,16 +133,6 @@ public class CustomInputSplitProgram {
 		public int getSplitNumber() {
 			return this.splitNumber;
 		}
-
-		@Override
-		public void write(DataOutputView out) throws IOException {
-			out.writeInt(splitNumber);
-		}
-
-		@Override
-		public void read(DataInputView in) throws IOException {
-			splitNumber = in.readInt();
-		}
 	}
 
 	public static final class CustomSplitAssigner implements InputSplitAssigner {

http://git-wip-us.apache.org/repos/asf/flink/blob/f27025ba/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
index 84a0032..92575f5 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/io/CollectionInputFormatTest.scala
@@ -74,7 +74,7 @@ class CollectionInputFormatTest {
     assertTrue(serializationResult.isInstanceOf[CollectionInputFormat[_]])
 
     val result = serializationResult.asInstanceOf[CollectionInputFormat[ElementType]]
-    val inputSplit = new GenericInputSplit
+    val inputSplit = new GenericInputSplit(0, 1)
     inputFormat.open(inputSplit)
     result.open(inputSplit)
 
@@ -125,7 +125,7 @@ class CollectionInputFormatTest {
     assertTrue(result.isInstanceOf[CollectionInputFormat[_]])
     var i: Int = 0
     val in = result.asInstanceOf[CollectionInputFormat[String]]
-    in.open(new GenericInputSplit)
+    in.open(new GenericInputSplit(0, 1))
 
     while (!in.reachedEnd) {
       assertEquals(data(i), in.nextRecord(""))


Mime
View raw message