flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [4/8] flink git commit: [FLINK-2906] Remove Record API
Date Thu, 26 Nov 2015 00:20:42 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedOutputFormat.java
deleted file mode 100644
index 49d9a2a..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedOutputFormat.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io;
-
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-
-/**
- * The base class for output formats that serialize their records into a delimited sequence.
- */
-@SuppressWarnings("deprecation")
-public abstract class DelimitedOutputFormat extends FileOutputFormat {
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * The configuration key for the entry that defines the record delimiter.
-	 */
-	public static final String RECORD_DELIMITER = "pact.output.delimited.delimiter";
-
-	/**
-	 * The configuration key to set the record delimiter encoding.
-	 */
-	private static final String RECORD_DELIMITER_ENCODING = "pact.output.delimited.delimiter-encoding";
-	
-	/**
-	 * The configuration key for the entry that defines the write-buffer size.
-	 */
-	public static final String WRITE_BUFFER_SIZE = "pact.output.delimited.buffersize";
-	
-	/**
-	 * The default write-buffer size. 64 KiByte. 
-	 */
-	private static final int DEFAULT_WRITE_BUFFER_SIZE = 64 * 1024;
-	
-	/**
-	 * The minimal write-buffer size, 1 KiByte.
-	 */
-	private static final int MIN_WRITE_BUFFER_SIZE = 1024;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	private byte[] delimiter;
-	
-	private byte[] buffer;
-	
-	private byte[] targetArray = new byte[64];
-	
-	private int pos;
-	
-	private int bufferSize;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	
-	/**
-	 * Calls the super classes to configure themselves and reads the config parameters for the delimiter and
-	 * the write buffer size.
-	 * 
-	 *  @param config The configuration to read the parameters from.
-	 *  
-	 * @see org.apache.flink.api.java.record.io.FileOutputFormat#configure(org.apache.flink.configuration.Configuration)
-	 */
-	public void configure(Configuration config)
-	{
-		super.configure(config);
-		
-		final String delim = config.getString(RECORD_DELIMITER, "\n");
-		final String charsetName = config.getString(RECORD_DELIMITER_ENCODING, null);		
-		if (delim == null) {
-			throw new IllegalArgumentException("The delimiter in the DelimitedOutputFormat must not be null.");
-		}
-		try {
-			this.delimiter = charsetName == null ? delim.getBytes() : delim.getBytes(charsetName);
-		} catch (UnsupportedEncodingException useex) {
-			throw new IllegalArgumentException("The charset with the name '" + charsetName + 
-				"' is not supported on this TaskManager instance.", useex);
-		}
-		
-		this.bufferSize = config.getInteger(WRITE_BUFFER_SIZE, DEFAULT_WRITE_BUFFER_SIZE);
-		if (this.bufferSize < MIN_WRITE_BUFFER_SIZE) {
-			throw new IllegalArgumentException("The write buffer size must not be less than " + MIN_WRITE_BUFFER_SIZE
-				+ " bytes.");
-		}
-	}
-	
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException
-	{
-		super.open(taskNumber, numTasks);
-		
-		if (this.buffer == null) {
-			this.buffer = new byte[this.bufferSize];
-		}
-		if (this.targetArray == null) {
-			this.targetArray = new byte[64];
-		}
-		this.pos = 0;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public void close() throws IOException {
-		if (this.stream != null) {
-			this.stream.write(this.buffer, 0, this.pos);
-		}
-		
-		// close file stream
-		super.close();
-	}
-
-	/**
-	 * This method is called for every record so serialize itself into the given target array. The method should
-	 * return the number of bytes occupied in the target array. If the target array is not large enough, a negative 
-	 * value should be returned.
-	 * <p>
-	 * The absolute value of the returned integer can be given as a hint how large an array is required. The array is
-	 * resized to the return value's absolute value, if that is larger than the current array size. Otherwise, the
-	 * array size is simply doubled.
-	 * 
-	 * @param rec The record to be serialized.
-	 * @param target The array to serialize the record into.
-	 * @return The length of the serialized contents, or a negative value, indicating that the array is too small.
-	 * 
-	 * @throws Exception If the user code produces an exception that prevents processing the record, it should
-	 *                   throw it such that the engine recognizes the situation as a fault.
-	 */
-	public abstract int serializeRecord(Record rec, byte[] target) throws Exception;
-	
-	
-
-	@Override
-	public void writeRecord(Record record) throws IOException
-	{
-		int size;
-		try {
-			while ((size = serializeRecord(record, this.targetArray)) < 0) {
-				if (-size > this.targetArray.length) {
-					this.targetArray = new byte[-size];
-				}
-				else {
-					this.targetArray = new byte[this.targetArray.length * 2];
-				}
-			}
-		}
-		catch (Exception ex) {
-			throw new IOException("Error while serializing the record to bytes: " + ex.getMessage(), ex);
-		}
-		
-		if (this.bufferSize - this.pos > size + this.delimiter.length) {
-			System.arraycopy(this.targetArray, 0, this.buffer, this.pos, size);
-			System.arraycopy(this.delimiter, 0, this.buffer, pos + size, this.delimiter.length);
-			pos += size + this.delimiter.length;
-		}
-		else {
-			// copy the target array (piecewise)
-			int off = 0;
-			while (off < size) {
-				int toCopy = Math.min(size - off, this.bufferSize - this.pos);
-				System.arraycopy(this.targetArray, off, this.buffer, this.pos, toCopy);
-
-				off += toCopy;
-				this.pos += toCopy;
-				if (this.pos == this.bufferSize) {
-					this.pos = 0;
-					this.stream.write(this.buffer, 0, this.bufferSize);
-				}
-			}
-			
-			// copy the delimiter (piecewise)
-			off = 0;
-			while (off < this.delimiter.length) {
-				int toCopy = Math.min(this.delimiter.length - off, this.bufferSize - this.pos);
-				System.arraycopy(this.delimiter, off, this.buffer, this.pos, toCopy);
-
-				off += toCopy;
-				this.pos += toCopy;
-				if (this.pos == this.bufferSize) {
-					this.pos = 0;
-					this.stream.write(this.buffer, 0, this.bufferSize);
-				}
-			}
-		}
-	}
-
-	// ============================================================================================
-	
-	/**
-	 * Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent
-	 * fashion.
-	 * 
-	 * @return A config builder for setting parameters.
-	 */
-	public static ConfigBuilder configureDelimitedFormat(FileDataSink target) {
-		return new ConfigBuilder(target.getParameters());
-	}
-	
-	/**
-	 * A builder used to set parameters to the input format's configuration in a fluent way.
-	 */
-	protected static abstract class AbstractConfigBuilder<T> extends FileOutputFormat.AbstractConfigBuilder<T>
-	{
-		private static final String NEWLINE_DELIMITER = "\n";
-		
-		// --------------------------------------------------------------------
-		
-		/**
-		 * Creates a new builder for the given configuration.
-		 * 
-		 * @param config The configuration into which the parameters will be written.
-		 */
-		protected AbstractConfigBuilder(Configuration config) {
-			super(config);
-		}
-		
-		// --------------------------------------------------------------------
-		
-		/**
-		 * Sets the delimiter to be a single character, namely the given one. The character must be within
-		 * the value range <code>0</code> to <code>127</code>.
-		 * 
-		 * @param delimiter The delimiter character.
-		 * @return The builder itself.
-		 */
-		public T recordDelimiter(char delimiter) {
-			if (delimiter == '\n') {
-				this.config.setString(RECORD_DELIMITER, NEWLINE_DELIMITER);
-			} else {
-				this.config.setString(RECORD_DELIMITER, String.valueOf(delimiter));
-			}
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-		
-		/**
-		 * Sets the delimiter to be the given string. The string will be converted to bytes for more efficient
-		 * comparison during input parsing. The conversion will be done using the platforms default charset.
-		 * 
-		 * @param delimiter The delimiter string.
-		 * @return The builder itself.
-		 */
-		public T recordDelimiter(String delimiter) {
-			this.config.setString(RECORD_DELIMITER, delimiter);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-		
-		/**
-		 * Sets the delimiter to be the given string. The string will be converted to bytes for more efficient
-		 * comparison during input parsing. The conversion will be done using the charset with the given name.
-		 * The charset must be available on the processing nodes, otherwise an exception will be raised at
-		 * runtime.
-		 * 
-		 * @param delimiter The delimiter string.
-		 * @param charsetName The name of the encoding character set.
-		 * @return The builder itself.
-		 */
-		public T recordDelimiter(String delimiter, String charsetName) {
-			this.config.setString(RECORD_DELIMITER, delimiter);
-			this.config.setString(RECORD_DELIMITER_ENCODING, charsetName);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-		
-		/**
-		 * Sets the size of the write buffer.
-		 * 
-		 * @param sizeInBytes The size of the write buffer in bytes.
-		 * @return The builder itself.
-		 */
-		public T writeBufferSize(int sizeInBytes) {
-			this.config.setInteger(WRITE_BUFFER_SIZE, sizeInBytes);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-	}
-	
-	/**
-	 * A builder used to set parameters to the input format's configuration in a fluent way.
-	 */
-	public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder>
-	{
-		/**
-		 * Creates a new builder for the given configuration.
-		 * 
-		 * @param targetConfig The configuration into which the parameters will be written.
-		 */
-		protected ConfigBuilder(Configuration targetConfig) {
-			super(targetConfig);
-		}
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormat.java
deleted file mode 100644
index d448c6a..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormat.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.IOException;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.types.Record;
-
-/**
- * This input format starts an external process and reads its input from the standard out (stdout) of the started process.
- * The input is split into fixed-sized segments from which a {@link Record} is generated. 
- * The external process is started outside of the JVM via a provided start command and can be an arbitrary program, 
- * e.g., a data generator or a shell script. The input format checks the exit code of the process 
- * to validate whether the process terminated correctly. A list of allowed exit codes can be provided.
- * The input format requires ({@link ExternalProcessInputSplit} objects that hold the command to execute.
- * 
- * <b>Warning:</b> This format does not consume the standard error stream (stderr) of the started process. This might cause deadlocks. 
- *
- * @param <T> The type of the input split (must extend ExternalProcessInputSplit)
- */
-public abstract class ExternalProcessFixedLengthInputFormat<T extends ExternalProcessInputSplit> extends ExternalProcessInputFormat<T> {
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * The config parameter which defines the fixed length of a record.
-	 */
-	public static final String RECORDLENGTH_PARAMETER_KEY = "pact.input.recordLength";
-	
-	/**
-	 * The default read buffer size = 1MB.
-	 */
-	private static final int DEFAULT_TARGET_READ_BUFFER_SIZE = 1024 * 1024;
-	
-	/**
-	 * Buffer to read a batch of records from a file 
-	 */
-	private byte[] readBuffer;
-	
-	/**
-	 * read position within the read buffer
-	 */
-	private int readBufferReadPos;
-	
-	/**
-	 * fill marker within the read buffer 
-	 */
-	private int readBufferFillPos;
-	
-	/**
-	 * remaining space within the read buffer
-	 */
-	private int readBufferRemainSpace;
-	
-	/**
-	 * target size of the read buffer
-	 */
-	private int targetReadBufferSize = DEFAULT_TARGET_READ_BUFFER_SIZE;
-	
-	/**
-	 * fixed length of all records
-	 */
-	protected int recordLength;
-	
-	/**
-	 * Flags to indicate the end of the split
-	 */
-	private boolean noMoreStreamInput;
-	private boolean noMoreRecordBuffers;
-	
-	/**
-	 * Reads a record out of the given buffer. This operation always consumes the standard number of
-	 * bytes, regardless of whether the produced record was valid.
-	 * 
-	 * @param target The target Record
-	 * @param buffer The buffer containing the binary data.
-	 * @param startPos The start position in the byte array.
-	 * @return True, is the record is valid, false otherwise.
-	 */
-	public abstract boolean readBytes(Record target, byte[] buffer, int startPos);
-	
-
-	@Override
-	public void configure(Configuration parameters)
-	{
-		// configure parent
-		super.configure(parameters);
-		
-		// read own parameters
-		this.recordLength = parameters.getInteger(RECORDLENGTH_PARAMETER_KEY, 0);
-		if (recordLength < 1) {
-			throw new IllegalArgumentException("The record length parameter must be set and larger than 0.");
-		}
-		
-	}
-	
-	/**
-	 * Sets the target size of the buffer to be used to read from the stdout stream. 
-	 * The actual size depends on the record length since it is chosen such that records are not split.
-	 * This method has only an effect, if it is called before the input format is opened.
-	 * 
-	 * @param targetReadBufferSize The target size of the read buffer.
-	 */
-	public void setTargetReadBufferSize(int targetReadBufferSize)
-	{
-		this.targetReadBufferSize = targetReadBufferSize;
-	}
-	
-
-	@Override
-	public void open(GenericInputSplit split) throws IOException {
-		
-		super.open(split);
-		
-		// compute readBufferSize
-		if(recordLength > this.targetReadBufferSize) {
-			// read buffer is at least as big as record
-			this.readBuffer = new byte[recordLength];
-		} else if (this.targetReadBufferSize % recordLength == 0) {
-			// target read buffer size is a multiple of record length, so it's ok
-			this.readBuffer = new byte[this.targetReadBufferSize];
-		} else {
-			// extent default read buffer size such that records are not split
-			this.readBuffer = new byte[(recordLength - (this.targetReadBufferSize % recordLength)) + this.targetReadBufferSize];
-		}
-		
-		// initialize read buffer positions
-		this.readBufferReadPos = 0;
-		this.readBufferFillPos = 0;
-		this.readBufferRemainSpace = readBuffer.length;
-		// initialize end flags
-		this.noMoreStreamInput = false;
-		this.noMoreRecordBuffers = false;
-		
-	}
-	
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return noMoreRecordBuffers;
-	}
-	
-
-	@Override
-	public Record nextRecord(Record reuse) throws IOException {
-		// check if read buffer must be filled (less than one record contained)
-		if(this.readBufferFillPos - this.readBufferReadPos < this.recordLength) {
-			// try to fill read buffer
-			if(!this.fillReadBuffer()) {
-				return null;
-			}
-		}
-
-		// update read buffer read marker
-		this.readBufferReadPos += this.recordLength;
-		
-		return this.readBytes(reuse, readBuffer, (this.readBufferReadPos-this.recordLength)) ? reuse : null;
-		
-	}
-
-	/**
-	 * Fills the read buffer by reading from the stdout stream of the external process.
-	 * WARNING: We do not read from the error stream. This might cause a deadlock.
-	 *  
-	 * @return true if new content was filled into the buffer, false otherwise.
-	 * @throws IOException
-	 */
-	private boolean fillReadBuffer() throws IOException {
-		// TODO: Add reading from error stream of external process. Otherwise the InputFormat might get deadlocked!
-		
-		// stream was completely processed
-		if(noMoreStreamInput) {
-			if(this.readBufferReadPos == this.readBufferFillPos) {
-				this.noMoreRecordBuffers = true;
-				return false;
-			} else {
-				throw new RuntimeException("External process produced incomplete record");
-			}
-		}
-		
-		// the buffer was completely filled and processed
-		if(this.readBufferReadPos == this.readBuffer.length && 
-				this.readBufferRemainSpace == 0) {
-			// reset counters and fill again
-			this.readBufferFillPos = 0;
-			this.readBufferRemainSpace = this.readBuffer.length;
-			this.readBufferReadPos = 0;
-		}
-		
-		// as long as not at least one record is complete
-		while(this.readBufferFillPos - this.readBufferReadPos < this.recordLength) {
-			// read from stdout
-			int readCnt = super.extProcOutStream.read(this.readBuffer, this.readBufferFillPos, this.readBufferRemainSpace);
-			
-			if(readCnt == -1) {
-				// the is nothing more to read
-				this.noMoreStreamInput = true;
-				return false;
-			} else {
-				// update fill position and remain cnt
-				this.readBufferFillPos += readCnt;
-				this.readBufferRemainSpace -= readCnt;
-			}
-		}
-		return true;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormat.java
deleted file mode 100644
index cbf16b5..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormat.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.StringTokenizer;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.io.GenericInputSplit;
-
-/**
- * This input format starts an external process and reads its input from the standard out (stdout) of the started process.
- * The process is started outside of the JVM via a provided start command and can be an arbitrary program, e.g., a data generator or a shell script.
- * The input format checks the exit code of the process to validate whether the process terminated correctly. A list of allowed exit codes can be provided.
- * The input format requires ({@link ExternalProcessInputSplit} objects that hold the command to execute.
- * 
- * <b>Attention! </b><br>
- * You must take care to read from (and process) both output streams of the process, standard out (stdout) and standard error (stderr). 
- * Otherwise, the input format might get deadlocked! 
- * 
- *
- * @param <T> The type of the input split (must extend ExternalProcessInputSplit)
- */
-public abstract class ExternalProcessInputFormat<T extends ExternalProcessInputSplit> extends GenericInputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	/**
-	 * The config parameter lists (comma separated) all allowed exit codes
-	 */
-	public static final String ALLOWEDEXITCODES_PARAMETER_KEY = "pact.input.externalProcess.allowedExitCodes";
-	
-	/**
-	 * The external process
-	 */
-	private Process extProc;
-	
-	/**
-	 * The stdout stream of the external process
-	 */
-	protected InputStream extProcOutStream;
-	
-	/**
-	 * The stderr stream of the external process
-	 */
-	protected InputStream extProcErrStream;
-	
-	/**
-	 * Array of allowed exit codes
-	 */
-	protected int[] allowedExitCodes;
-	
-
-	@Override
-	public void configure(Configuration parameters) {
-		// get allowed exit codes
-		String allowedExitCodesList = parameters.getString(ALLOWEDEXITCODES_PARAMETER_KEY, "0");
-		
-		// parse allowed exit codes
-		StringTokenizer st = new StringTokenizer(allowedExitCodesList, ",");
-		this.allowedExitCodes = new int[st.countTokens()];
-		
-		for(int i=0; i<this.allowedExitCodes.length; i++) {
-			this.allowedExitCodes[i] = Integer.parseInt(st.nextToken().trim());
-		}
-		
-	}
-	
-	@Override
-	public void close() throws IOException {
-		try {
-			// get exit code
-			int exitCode = this.extProc.exitValue();
-			// check whether exit code is allowed
-			boolean exitCodeOk = false;
-			for (int allowedExitCode : this.allowedExitCodes) {
-				if (allowedExitCode == exitCode) {
-					exitCodeOk = true;
-					break;
-				}
-			}
-			if(!exitCodeOk) {
-				// external process did not finish with an allowed exit code
-				throw new RuntimeException("External process did not finish with an allowed exit code: "+exitCode);
-			}
-		} catch(IllegalThreadStateException itse) {
-			// process did not terminate yet, shut it down!
-			this.extProc.destroy();
-			if(!this.reachedEnd()) {
-				throw new RuntimeException("External process was destroyed although stream was not fully read.");
-			}
-		} finally {
-			this.extProcErrStream.close();
-			this.extProcOutStream.close();
-		}
-	}
-
-	@Override
-	public void open(GenericInputSplit split) throws IOException {
-		
-		if(!(split instanceof ExternalProcessInputSplit)) {
-			throw new IOException("Invalid InputSplit type.");
-		}
-		
-		ExternalProcessInputSplit epSplit = (ExternalProcessInputSplit)split;		
-		
-		// check if process command is valid string
-		if(epSplit.getExternalProcessCommand() != null && !epSplit.getExternalProcessCommand().equals("")) {
-			try {
-				// run the external process
-				this.extProc = Runtime.getRuntime().exec(epSplit.getExternalProcessCommand());
-			} catch (IOException e) {
-				throw new IOException("IO Exception when starting external process: "+epSplit.getExternalProcessCommand());
-			}
-			// connect streams to stdout and stderr
-			this.extProcOutStream = this.extProc.getInputStream();
-			this.extProcErrStream = this.extProc.getErrorStream();
-		} else {
-			throw new IllegalArgumentException("External Process Command not set");
-		}
-	}
-	
-	public void waitForProcessToFinish() throws InterruptedException {
-		extProc.waitFor();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
deleted file mode 100644
index e087cb1..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io;
-
-import org.apache.flink.core.io.GenericInputSplit;
-
-/**
- * The ExternalProcessInputSplit contains all information for {@link org.apache.flink.api.common.io.InputFormat}
- * that read their data from external processes.
- * Each parallel instance of an InputFormat starts an external process and reads its output.
- * The command to start the external process must be executable on all nodes.
- * 
- * @see ExternalProcessInputFormat
- * @see ExternalProcessFixedLengthInputFormat
- */
-public class ExternalProcessInputSplit extends GenericInputSplit {
-
-	private static final long serialVersionUID = 1L;
-	
-	// command to be executed for this input split
-	private final String extProcessCommand;
-	
-	/**
-	 * Instantiates an ExternalProcessInputSplit
-	 * 
-	 * @param splitNumber The number of the input split
-	 * @param extProcCommand The command to be executed for the input split
-	 */
-	public ExternalProcessInputSplit(int splitNumber, int numSplits, String extProcCommand) {
-		super(splitNumber, numSplits);
-		this.extProcessCommand = extProcCommand;
-	}
-	
-	/**
-	 * Returns the command to be executed to derive the input for this split
-	 * 
-	 * @return the command to be executed to derive the input for this split
-	 */
-	public String getExternalProcessCommand() {
-		return this.extProcessCommand;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileInputFormat.java
deleted file mode 100644
index c73da71..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileInputFormat.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import org.apache.flink.types.Record;
-
-/**
- * The base interface for input formats that read {@link Record}s from a
- * file.
- */
-public abstract class FileInputFormat extends org.apache.flink.api.common.io.FileInputFormat<Record> {
-	
-	private static final long serialVersionUID = -8819984594406641418L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileOutputFormat.java
deleted file mode 100644
index c9591ee..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileOutputFormat.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import org.apache.flink.types.Record;
-
-
-/**
- * The abstract base class for all output formats that are file based. Contains the logic to open/close the target
- * file streams.
- */
-public abstract class FileOutputFormat extends org.apache.flink.api.common.io.FileOutputFormat<Record> {
-	
-	private static final long serialVersionUID = 3832934435044920834L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/FixedLengthInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FixedLengthInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/FixedLengthInputFormat.java
deleted file mode 100644
index e544eb9..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FixedLengthInputFormat.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-
-import java.io.IOException;
-
-import org.apache.flink.api.common.io.statistics.BaseStatistics;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.Record;
-
-/**
- * 
- */
-public abstract class FixedLengthInputFormat extends FileInputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	/**
-	 * The config parameter which defines the fixed length of a record.
-	 */
-	public static final String RECORDLENGTH_PARAMETER_KEY = "pact.fix-input.record-length";
-	
-	/**
-	 * The default read buffer size = 1MB.
-	 */
-	private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024;
-	
-	/**
-	 * Buffer to read a batch of records from a file 
-	 */
-	private byte[] readBuffer;
-	
-	/**
-	 * The position in the stream
-	 */
-	private long streamPos;
-	
-	/**
-	 * The end position in the stream.
-	 */
-	private long streamEnd;
-	
-	/**
-	 * read position within the read buffer
-	 */
-	private int readBufferPos;
-	
-	/**
-	 * The limit of the data in the read buffer.
-	 */
-	private int readBufferLimit;
-	
-	/**
-	 * fixed length of all records
-	 */
-	private int recordLength;
-	
-	/**
-	 * size of the read buffer
-	 */
-	private int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
-	
-	/**
-	 * The flag whether the stream is exhausted.
-	 */
-	private boolean exhausted;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Constructor only sets the key and value classes
-	 */
-	protected FixedLengthInputFormat() {}
-	
-	/**
-	 * Reads a record out of the given buffer. This operation always consumes the standard number of
-	 * bytes, regardless of whether the produced record was valid.
-	 * 
-	 * @param target The target Record
-	 * @param buffer The buffer containing the binary data.
-	 * @param startPos The start position in the byte array.
-	 * @return True, is the record is valid, false otherwise.
-	 */
-	public abstract boolean readBytes(Record target, byte[] buffer, int startPos);
-	
-	/**
-	 * Returns the fixed length of a record.
-	 * 
-	 * @return the fixed length of a record.
-	 */
-	public int getRecordLength() {
-		return this.recordLength;
-	}
-	
-	/**
-	 * Gets the size of the buffer internally used to parse record boundaries.
-	 * 
-	 * @return The size of the parsing buffer.
-	 */
-	public int getReadBufferSize() {
-		return this.readBuffer.length;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public void configure(Configuration parameters) {
-		// pass parameters to FileInputFormat
-		super.configure(parameters);
-
-		// read own parameters
-		this.recordLength = parameters.getInteger(RECORDLENGTH_PARAMETER_KEY, 0);
-		if (recordLength < 1) {
-			throw new IllegalArgumentException("The record length parameter must be set and larger than 0.");
-		}
-	}
-	
-	@Override
-	public void open(FileInputSplit split) throws IOException {
-		// open input split using FileInputFormat
-		super.open(split);
-		
-		// adjust the stream positions for boundary splits
-		int recordOffset = (int) (this.splitStart % this.recordLength);
-		if(recordOffset != 0) {
-			// move start to next boundary
-			super.stream.seek(this.splitStart + recordOffset);
-		}
-		this.streamPos = this.splitStart + recordOffset;
-		this.streamEnd = this.splitStart + this.splitLength;
-		this.streamEnd += this.streamEnd % this.recordLength;
-		
-		// adjust readBufferSize
-		this.readBufferSize += this.recordLength - (this.readBufferSize % this.recordLength);
-		
-		if (this.readBuffer == null || this.readBuffer.length != this.readBufferSize) {
-			this.readBuffer = new byte[this.readBufferSize];
-		}
-		
-		this.readBufferLimit = 0;
-		this.readBufferPos = 0;
-		this.exhausted = false;
-		fillReadBuffer();
-	}
-
-	/**
-	 * {@inheritDoc}
-	 * @throws IOException 
-	 */
-	@Override
-	public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException {
-		final FileBaseStatistics stats = super.getStatistics(cachedStats);
-		return stats == null ? null : 
-			new FileBaseStatistics(stats.getLastModificationTime(), stats.getTotalInputSize(), this.recordLength);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-
-	@Override
-	public boolean reachedEnd() {
-		return this.exhausted;
-	}
-	
-
-	@Override
-	public Record nextRecord(Record reuse) throws IOException {
-		// check if read buffer contains another full record
-		if (this.readBufferLimit - this.readBufferPos <= 0) {
-			// get another buffer
-			fillReadBuffer();
-			// check if source is exhausted
-			if (this.exhausted) {
-				return null;
-			}
-		}
-		else if (this.readBufferLimit - this.readBufferPos < this.recordLength) {
-			throw new IOException("Unable to read full record");
-		}
-		
-		boolean val = readBytes(reuse, this.readBuffer, this.readBufferPos);
-		
-		this.readBufferPos += this.recordLength;
-		if (this.readBufferPos >= this.readBufferLimit) {
-			fillReadBuffer();
-		}
-		return val ? reuse : null;
-	}
-	
-	/**
-	 * Fills the next read buffer from the file stream.
-	 * 
-	 * @throws IOException
-	 */
-	private void fillReadBuffer() throws IOException {
-		// special case for compressed files.
-		if(splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) {
-			int read = this.stream.read(this.readBuffer, 0, this.readBufferSize);
-			if (read == -1) {
-				exhausted = true;
-			} else {
-				this.streamPos += read;
-				this.readBufferPos = 0;
-				this.readBufferLimit = read;
-			}
-			return;
-		}
-		
-		int toRead = (int) Math.min(this.streamEnd - this.streamPos, this.readBufferSize);
-		if (toRead <= 0) {
-			this.exhausted = true;
-			return;
-		}
-		
-		// fill read buffer
-		int read = this.stream.read(this.readBuffer, 0, toRead);
-		
-		if (read <= 0) {
-			this.exhausted = true;
-		} else {
-			this.streamPos += read;
-			this.readBufferPos = 0;
-			this.readBufferLimit = read;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/GenericInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/GenericInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/GenericInputFormat.java
deleted file mode 100644
index 46fa6e5..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/GenericInputFormat.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import org.apache.flink.types.Record;
-
-/**
- * Generic base class for all inputs that are not based on files, specific to Record.
- */
-public abstract class GenericInputFormat extends org.apache.flink.api.common.io.GenericInputFormat<Record> {
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java
deleted file mode 100644
index c606458..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.CharacterCodingException;
-import java.nio.charset.Charset;
-import java.nio.charset.CharsetDecoder;
-import java.util.Arrays;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.StringValue;
-
-/**
- * Base implementation for an input format that returns each line as a separate record that contains
- * only a single string, namely the line.
- */
-public class TextInputFormat extends DelimitedInputFormat {
-	private static final long serialVersionUID = 1L;
-	
-	public static final String CHARSET_NAME = "textformat.charset";
-	
-	public static final String FIELD_POS = "textformat.pos";
-	
-	public static final String DEFAULT_CHARSET_NAME = "UTF-8";
-	
-	private static final Logger LOG = LoggerFactory.getLogger(TextInputFormat.class);
-	
-	
-	protected final StringValue theString = new StringValue();
-	
-	
-	// all fields below here are set in configure / open, so we do not serialze them
-	
-	protected transient CharsetDecoder decoder;
-	
-	protected transient ByteBuffer byteWrapper;
-	
-	protected transient int pos;
-	
-	protected transient boolean ascii;
-	
-	/**
-	 * Code of \r, used to remove \r from a line when the line ends with \r\n
-	 */
-	private static final byte CARRIAGE_RETURN = (byte) '\r';
-
-	/**
-	 * Code of \n, used to identify if \n is used as delimiter
-	 */
-	private static final byte NEW_LINE = (byte) '\n';
-
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public void configure(Configuration parameters) {
-		super.configure(parameters);
-		
-		// get the charset for the decoding
-		String charsetName = parameters.getString(CHARSET_NAME, DEFAULT_CHARSET_NAME);
-		if (charsetName == null || !Charset.isSupported(charsetName)) {
-			throw new RuntimeException("Unsupported charset: " + charsetName);
-		}
-		
-		if (charsetName.equals("ISO-8859-1") || charsetName.equalsIgnoreCase("ASCII")) {
-			this.ascii = true;
-		} else {
-			this.decoder = Charset.forName(charsetName).newDecoder();
-			this.byteWrapper = ByteBuffer.allocate(1);
-		}
-		
-		// get the field position to write in the record
-		this.pos = parameters.getInteger(FIELD_POS, 0);
-		if (this.pos < 0) {
-			throw new RuntimeException("Illegal configuration value for the target position: " + this.pos);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	public Record readRecord(Record reuse, byte[] bytes, int offset, int numBytes) {
-		StringValue str = this.theString;
-		
-		//Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line
-		if (this.getDelimiter() != null && this.getDelimiter().length == 1 
-				&& this.getDelimiter()[0] == NEW_LINE && offset+numBytes >= 1 
-				&& bytes[offset+numBytes-1] == CARRIAGE_RETURN){
-			numBytes -= 1;
-		}
-
-		
-		if (this.ascii) {
-			str.setValueAscii(bytes, offset, numBytes);
-		}
-		else {
-			ByteBuffer byteWrapper = this.byteWrapper;
-			if (bytes != byteWrapper.array()) {
-				byteWrapper = ByteBuffer.wrap(bytes, 0, bytes.length);
-				this.byteWrapper = byteWrapper;
-			}
-			byteWrapper.limit(offset + numBytes);
-			byteWrapper.position(offset);
-				
-			try {
-				CharBuffer result = this.decoder.decode(byteWrapper);
-				str.setValue(result);
-			}
-			catch (CharacterCodingException e) {
-				byte[] copy = new byte[numBytes];
-				System.arraycopy(bytes, offset, copy, 0, numBytes);
-				LOG.warn("Line could not be encoded: " + Arrays.toString(copy), e);
-				return null;
-			}
-		}
-		
-		reuse.clear();
-		reuse.setField(this.pos, str);
-		return reuse;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/BulkIteration.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/BulkIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/BulkIteration.java
deleted file mode 100644
index 29dab6a..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/BulkIteration.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import org.apache.flink.api.common.operators.base.BulkIterationBase;
-import org.apache.flink.types.Record;
-
-/**
- *  * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- */
-@Deprecated
-public class BulkIteration extends BulkIterationBase<Record> {
-	public BulkIteration() {
-		super(OperatorInfoHelper.unary());
-	}
-
-	public BulkIteration(String name) {
-		super(OperatorInfoHelper.unary(), name);
-	}
-
-	/**
-	 * Specialized operator to use as a recognizable place-holder for the input to the
-	 * step function when composing the nested data flow.
-	 */
-	public static class PartialSolutionPlaceHolder extends BulkIterationBase.PartialSolutionPlaceHolder<Record> {
-		public PartialSolutionPlaceHolder(BulkIterationBase<Record> container) {
-			super(container, OperatorInfoHelper.unary());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
deleted file mode 100644
index 09811d6..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.record.operators;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.Ordering;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.operators.translation.WrappingFunction;
-import org.apache.flink.api.java.record.functions.CoGroupFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-
-import com.google.common.base.Preconditions;
-
-/**
- * CoGroupOperator that applies a {@link CoGroupFunction} to groups of records sharing
- * the same key (one group per input).
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * @see CoGroupFunction
- */
-@Deprecated
-public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> implements RecordOperator {
-	
-	/**
-	 * The types of the keys that the operator groups on.
-	 */
-	private final Class<? extends Key<?>>[] keyTypes;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Creates a Builder with the provided {@link CoGroupFunction} implementation.
-	 * 
-	 * @param udf The {@link CoGroupFunction} implementation for this CoGroup operator.
-	 * @param keyClass The class of the key data type.
-	 * @param keyColumn1 The position of the key in the first input's records.
-	 * @param keyColumn2 The position of the key in the second input's records.
-	 */
-	public static Builder builder(CoGroupFunction udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) {
-		WrappingCoGroupFunction wrapper = new WrappingCoGroupFunction(udf);
-		return new Builder(new UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2);
-	}
-	
-	/**
-	 * Creates a Builder with the provided {@link CoGroupFunction} implementation.
-	 * 
-	 * @param udf The {@link CoGroupFunction} implementation for this CoGroup operator.
-	 * @param keyClass The class of the key data type.
-	 * @param keyColumn1 The position of the key in the first input's records.
-	 * @param keyColumn2 The position of the key in the second input's records.
-	 */
-	public static Builder builder(Class<? extends CoGroupFunction> udf, Class<? extends Key<?>> keyClass,
-			int keyColumn1, int keyColumn2)
-	{
-		WrappingCoGroupFunction wrapper = new WrappingClassCoGroupFunction(udf);
-		return new Builder(new UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2);
-	}
-	
-	/**
-	 * The private constructor that only gets invoked from the Builder.
-	 * @param builder
-	 */
-	protected CoGroupOperator(Builder builder) {
-		super(builder.udf, OperatorInfoHelper.binary(), builder.getKeyColumnsArray1(), builder.getKeyColumnsArray2(), builder.name);
-		this.keyTypes = builder.getKeyClassesArray();
-		
-		if (builder.inputs1 != null && !builder.inputs1.isEmpty()) {
-			setFirstInput(Operator.createUnionCascade(builder.inputs1));
-		}
-		if (builder.inputs2 != null && !builder.inputs2.isEmpty()) {
-			setSecondInput(Operator.createUnionCascade(builder.inputs2));
-		}
-
-		// sanity check solution set key mismatches
-		if (input1 instanceof DeltaIteration.SolutionSetPlaceHolder) {
-			int[] positions = getKeyColumns(0);
-			((DeltaIteration.SolutionSetPlaceHolder) input1).checkJoinKeyFields(positions);
-		}
-		if (input2 instanceof DeltaIteration.SolutionSetPlaceHolder) {
-			int[] positions = getKeyColumns(1);
-			((DeltaIteration.SolutionSetPlaceHolder) input2).checkJoinKeyFields(positions);
-		}
-		
-		setBroadcastVariables(builder.broadcastInputs);
-		setGroupOrderForInputOne(builder.secondaryOrder1);
-		setGroupOrderForInputTwo(builder.secondaryOrder2);
-		
-		CoGroupFunction function = ((WrappingCoGroupFunction) builder.udf.getUserCodeObject()).getWrappedFunction();
-		setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(new UserCodeObjectWrapper<CoGroupFunction>(function)));
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public Class<? extends Key<?>>[] getKeyClasses() {
-		return this.keyTypes;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	
-	/**
-	 * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
-	 */
-	public static class Builder {
-		
-		/* The required parameters */
-		private final UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf;
-		private final List<Class<? extends Key<?>>> keyClasses;
-		private final List<Integer> keyColumns1;
-		private final List<Integer> keyColumns2;
-		
-		/* The optional parameters */
-		private List<Operator<Record>> inputs1;
-		private List<Operator<Record>> inputs2;
-		private Map<String, Operator<Record>> broadcastInputs;
-		private Ordering secondaryOrder1;
-		private Ordering secondaryOrder2;
-		private String name;
-		
-		/**
-		 * Creates a Builder with the provided {@link CoGroupFunction} implementation.
-		 * 
-		 * @param udf The {@link CoGroupFunction} implementation for this CoGroup operator.
-		 * @param keyClass The class of the key data type.
-		 * @param keyColumn1 The position of the key in the first input's records.
-		 * @param keyColumn2 The position of the key in the second input's records.
-		 */
-		protected Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf, Class<? extends Key<?>> keyClass,
-				int keyColumn1, int keyColumn2)
-		{
-			this.udf = udf;
-			this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
-			this.keyClasses.add(keyClass);
-			this.keyColumns1 = new ArrayList<Integer>();
-			this.keyColumns1.add(keyColumn1);
-			this.keyColumns2 = new ArrayList<Integer>();
-			this.keyColumns2.add(keyColumn2);
-			this.inputs1 = new ArrayList<Operator<Record>>();
-			this.inputs2 = new ArrayList<Operator<Record>>();
-			this.broadcastInputs = new HashMap<String, Operator<Record>>();
-		}
-		
-		/**
-		 * Creates a Builder with the provided {@link CoGroupFunction} implementation. This method is intended
-		 * for special case sub-types only.
-		 * 
-		 * @param udf The {@link CoGroupFunction} implementation for this CoGroup operator.
-		 */
-		protected Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf) {
-			this.udf = udf;
-			this.keyClasses = new ArrayList<Class<? extends Key<?>>>();
-			this.keyColumns1 = new ArrayList<Integer>();
-			this.keyColumns2 = new ArrayList<Integer>();
-			this.inputs1 = new ArrayList<Operator<Record>>();
-			this.inputs2 = new ArrayList<Operator<Record>>();
-			this.broadcastInputs = new HashMap<String, Operator<Record>>();
-		}
-		
-		private int[] getKeyColumnsArray1() {
-			int[] result = new int[keyColumns1.size()];
-			for (int i = 0; i < keyColumns1.size(); ++i) {
-				result[i] = keyColumns1.get(i);
-			}
-			return result;
-		}
-		
-		private int[] getKeyColumnsArray2() {
-			int[] result = new int[keyColumns2.size()];
-			for (int i = 0; i < keyColumns2.size(); ++i) {
-				result[i] = keyColumns2.get(i);
-			}
-			return result;
-		}
-		
-		@SuppressWarnings("unchecked")
-		private Class<? extends Key<?>>[] getKeyClassesArray() {
-			return keyClasses.toArray(new Class[keyClasses.size()]);
-		}
-
-		/**
-		 * Adds additional key field.
-		 * 
-		 * @param keyClass The class of the key data type.
-		 * @param keyColumn1 The position of the key in the first input's records.
-		 * @param keyColumn2 The position of the key in the second input's records.
-		 */
-		public Builder keyField(Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) {
-			keyClasses.add(keyClass);
-			keyColumns1.add(keyColumn1);
-			keyColumns2.add(keyColumn2);
-			return this;
-		}
-		/**
-		 * Sets the order of the elements within a group for the first input.
-		 * 
-		 * @param order The order for the elements in a group.
-		 */
-		public Builder secondaryOrder1(Ordering order) {
-			this.secondaryOrder1 = order;
-			return this;
-		}
-		
-		/**
-		 * Sets the order of the elements within a group for the second input.
-		 * 
-		 * @param order The order for the elements in a group.
-		 */
-		public Builder secondaryOrder2(Ordering order) {
-			this.secondaryOrder2 = order;
-			return this;
-		}
-		
-		/**
-		 * Sets the input operator for input 1.
-		 * 
-		 * @param input The input operator for input 1. 
-		 */
-		public Builder input1(Operator<Record> input) {
-			Preconditions.checkNotNull(input, "The input must not be null");
-			
-			this.inputs1.clear();
-			this.inputs1.add(input);
-			return this;
-		}
-		
-		/**
-		 * Sets one or several inputs (union) for input 1.
-		 * 
-		 * @param inputs
-		 */
-		public Builder input1(Operator<Record>...inputs) {
-			this.inputs1.clear();
-			for (Operator<Record> c : inputs) {
-				this.inputs1.add(c);
-			}
-			return this;
-		}
-		
-		/**
-		 * Sets the input operator for input 2.
-		 * 
-		 * @param input The input operator for input 2. 
-		 */
-		public Builder input2(Operator<Record> input) {
-			Preconditions.checkNotNull(input, "The input must not be null");
-			
-			this.inputs2.clear();
-			this.inputs2.add(input);
-			return this;
-		}
-		
-		/**
-		 * Sets one or several inputs (union) for input 2.
-		 * 
-		 * @param inputs
-		 */
-		public Builder input2(Operator<Record>...inputs) {
-			this.inputs2.clear();
-			for (Operator<Record> c : inputs) {
-				this.inputs2.add(c);
-			}
-			return this;
-		}
-		
-		/**
-		 * Sets the first inputs.
-		 * 
-		 * @param inputs
-		 */
-		public Builder inputs1(List<Operator<Record>> inputs) {
-			this.inputs1 = inputs;
-			return this;
-		}
-		
-		/**
-		 * Sets the second inputs.
-		 * 
-		 * @param inputs
-		 */
-		public Builder inputs2(List<Operator<Record>> inputs) {
-			this.inputs2 = inputs;
-			return this;
-		}
-		
-		/**
-		 * Binds the result produced by a plan rooted at {@code root} to a 
-		 * variable used by the UDF wrapped in this operator.
-		 */
-		public Builder setBroadcastVariable(String name, Operator<Record> input) {
-			this.broadcastInputs.put(name, input);
-			return this;
-		}
-		
-		/**
-		 * Binds multiple broadcast variables.
-		 */
-		public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
-			this.broadcastInputs.clear();
-			this.broadcastInputs.putAll(inputs);
-			return this;
-		}
-		
-		/**
-		 * Sets the name of this operator.
-		 * 
-		 * @param name
-		 */
-		public Builder name(String name) {
-			this.name = name;
-			return this;
-		}
-		
-		/**
-		 * Creates and returns a CoGroupOperator from using the values given 
-		 * to the builder.
-		 * 
-		 * @return The created operator
-		 */
-		public CoGroupOperator build() {
-			if (keyClasses.size() <= 0) {
-				throw new IllegalStateException("At least one key attribute has to be set.");
-			}
-			
-			if (name == null) {
-				name = udf.getUserCodeClass().getName();
-			}
-			return new CoGroupOperator(this);
-		}
-	}
-	
-	// ============================================================================================
-	
-	public static class WrappingCoGroupFunction extends WrappingFunction<CoGroupFunction> 
-			implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> {
-		
-		private static final long serialVersionUID = 1L;
-		
-		public WrappingCoGroupFunction(CoGroupFunction coGrouper) {
-			super(coGrouper);
-		}
-		
-		@Override
-		public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) throws Exception {
-			this.wrappedFunction.coGroup(records1.iterator(), records2.iterator(), out);
-		}
-	}
-	
-	public static final class WrappingClassCoGroupFunction extends WrappingCoGroupFunction {
-		
-		private static final long serialVersionUID = 1L;
-		
-		public WrappingClassCoGroupFunction(Class<? extends CoGroupFunction> reducer) {
-			super(InstantiationUtil.instantiate(reducer));
-		}
-		
-		private void writeObject(ObjectOutputStream out) throws IOException {
-			out.writeObject(wrappedFunction.getClass());
-		}
-
-		private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-			Class<?> clazz = (Class<?>) in.readObject();
-			this.wrappedFunction = (CoGroupFunction) InstantiationUtil.instantiate(clazz);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CollectionDataSource.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CollectionDataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CollectionDataSource.java
deleted file mode 100644
index 2b84b73..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CollectionDataSource.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.operators.GenericDataSourceBase;
-import org.apache.flink.api.java.record.io.CollectionInputFormat;
-import org.apache.flink.types.Record;
-
-/**
- * 
- *  * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * Operator for input nodes which reads data from collection or iterator.
- * Use this operator if you want to use a collection of data from the application 
- * as an input for a job to the cluster.
- * There are two main ways to use the CollectionDataSource:
- * 
- * Using an {@link java.util.Iterator} that is also {@link java.io.Serializable}.
- * <pre>
- * CollectionDataSource source = new CollectionDataSource(mySerializableIterator, &quot;IterSource&quot;);
- * </pre>
- * 
- * Using a Collection of Java Objects.
- * 
- * <pre>
- * CollectionDataSource source2 = new CollectionDataSource(new List&lt;String&gt;(), &quot;Collection source&quot;);
- * </pre>
- * 
- * Note that you can as many elements as you want to the constructor:
- * 
- * <pre>
- * CollectionDataSource(&quot;Varargs String source&quot;, &quot;some&quot;, &quot;strings&quot;, &quot;that&quot;, &quot;get&quot;, &quot;distributed&quot;);
- * </pre>
- * 
- * The only limitation is that the elements need to have the same type.
- */
-@Deprecated
-public class CollectionDataSource extends GenericDataSourceBase<Record, GenericInputFormat<Record>> {
-
-	private static String DEFAULT_NAME = "<Unnamed Collection Data Source>";
-
-	/**
-	 * Creates a new instance for the given input using the given input format.
-	 * 
-	 * @param f
-	 *        The {@link CollectionInputFormat} implementation used to read the data.
-	 * @param data
-	 *        The input data. It should be a collection, an array or a serializable iterator.
-	 * @param name
-	 *        The given name for the Pact, used in plans, logs and progress messages.
-	 */
-	public CollectionDataSource(CollectionInputFormat f, String name, Object... data) {
-		super(f, OperatorInfoHelper.source(), name);
-		Collection<Object> tmp = new ArrayList<Object>();
-		for (Object o : data) {
-			tmp.add(o);
-		}
-		checkFormat(tmp);
-		f.setData(tmp);
-	}
-
-	public CollectionDataSource(CollectionInputFormat f, String name, Object[][] data) {
-		super(f, OperatorInfoHelper.source(), name);
-		Collection<Object> tmp = new ArrayList<Object>();
-		for (Object o : data) {
-			tmp.add(o);
-		}
-		checkFormat(tmp);
-		f.setData(tmp);
-	}
-
-	public CollectionDataSource(CollectionInputFormat f, Collection<?> data, String name) {
-		super(f, OperatorInfoHelper.source(), name);
-		checkFormat(data);
-		f.setData(data);
-	}
-
-	public <T extends Iterator<?>, Serializable> CollectionDataSource(CollectionInputFormat f, T data, String name) {
-		super(f, OperatorInfoHelper.source(), name);
-		f.setIter(data);
-	}
-
-	/**
-	 * Creates a new instance for the given input using the given input format. The contract has the default name.
-	 * The input types will be checked. If the input types don't agree, an exception will occur.
-	 * 
-	 * @param args
-	 *        The input data. It should be a collection, an array or a serializable iterator.
-	 * @param name
-	 *        The given name for the Pact, used in plans, logs and progress messages.
-	 */
-	public CollectionDataSource(String name, Object... args) {
-		this(new CollectionInputFormat(), name, args);
-	}
-
-	public CollectionDataSource(String name, Object[][] args) {
-		this(new CollectionInputFormat(), name, args);
-	}
-
-	public CollectionDataSource(Collection<?> args, String name) {
-		this(new CollectionInputFormat(), args, name);
-	}
-
-	public <T extends Iterator<?>, Serializable> CollectionDataSource(T args, String name) {
-		this(new CollectionInputFormat(), args, name);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	/**
-	 * for scala compatible, scala-to-java type conversion always has an object wrapper
-	 */
-	public CollectionDataSource(Object... args) {
-		this(new CollectionInputFormat(), args);
-	}
-
-	@SuppressWarnings("unchecked")
-	public CollectionDataSource(CollectionInputFormat f, Object... data) {
-		super(f, OperatorInfoHelper.source(), DEFAULT_NAME);
-		if (data.length == 1 && data[0] instanceof Iterator) {
-			f.setIter((Iterator<Object>) data[0]);
-		}
-		else if (data.length == 1 && data[0] instanceof Collection) {
-			checkFormat((Collection<Object>) data[0]);
-			f.setData((Collection<Object>) data[0]);
-		}
-		else {
-			Collection<Object> tmp = new ArrayList<Object>();
-			for (Object o : data) {
-				tmp.add(o);
-			}
-			checkFormat(tmp);
-			f.setData(tmp);
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	/*
-	 * check whether the input field has the same type
-	 */
-	private <T> void checkFormat(Collection<T> c) {
-		Class<?> type = null;
-		List<Class<?>> typeList = new ArrayList<Class<?>>();
-		Iterator<T> it = c.iterator();
-		while (it.hasNext()) {
-			Object o = it.next();
-
-			// check the input types for 1-dimension
-			if (type != null && !type.equals(o.getClass())) {
-				throw new RuntimeException("elements of input list should have the same type");
-			}
-			else {
-				type = o.getClass();
-			}
-
-			// check the input types for 2-dimension array
-			if (typeList.size() == 0 && o.getClass().isArray()) {
-				for (Object s : (Object[]) o) {
-					typeList.add(s.getClass());
-				}
-			}
-			else if (o.getClass().isArray()) {
-				int index = 0;
-				if (((Object[]) o).length != typeList.size()) {
-					throw new RuntimeException("elements of input list should have the same size");
-				}
-				for (Object s : (Object[]) o) {
-					if (!s.getClass().equals(typeList.get(index++))) {
-						throw new RuntimeException("elements of input list should have the same type");
-					}
-				}
-			}
-
-			// check the input types for 2-dimension collection
-			if (typeList.size() == 0 && o instanceof Collection) {
-				@SuppressWarnings("unchecked")
-				Iterator<Object> tmpIt = ((Collection<Object>) o).iterator();
-				while (tmpIt.hasNext()) {
-					Object s = tmpIt.next();
-					typeList.add(s.getClass());
-				}
-			}
-			else if (o instanceof Collection) {
-				int index = 0;
-				@SuppressWarnings("unchecked")
-				Iterator<Object> tmpIt = ((Collection<Object>) o).iterator();
-				while (tmpIt.hasNext()) {
-					Object s = tmpIt.next();
-					if (!s.getClass().equals(typeList.get(index++))) {
-						throw new RuntimeException("elements of input list should have the same type");
-					}
-				}
-
-				if (index != typeList.size()) {
-					throw new RuntimeException("elements of input list should have the same size");
-				}
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossOperator.java
deleted file mode 100644
index 80e89a5..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossOperator.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.common.operators.RecordOperator;
-import org.apache.flink.api.common.operators.base.CrossOperatorBase;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.CrossFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation;
-import org.apache.flink.types.Key;
-import org.apache.flink.types.Record;
-
-import com.google.common.base.Preconditions;
-
-
-/**
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * 
- * CrossOperator that applies a {@link CrossFunction} to each element of the Cartesian Product.
- * 
- * @see CrossFunction
- */
-@Deprecated
-public class CrossOperator extends CrossOperatorBase<Record, Record, Record, CrossFunction> implements RecordOperator {
-
-	/**
-	 * Creates a Builder with the provided {@link CrossFunction} implementation.
-	 * 
-	 * @param udf The {@link CrossFunction} implementation for this Cross operator.
-	 */
-	public static Builder builder(CrossFunction udf) {
-		return new Builder(new UserCodeObjectWrapper<CrossFunction>(udf));
-	}
-	
-	/**
-	 * Creates a Builder with the provided {@link CrossFunction} implementation.
-	 * 
-	 * @param udf The {@link CrossFunction} implementation for this Cross operator.
-	 */
-	public static Builder builder(Class<? extends CrossFunction> udf) {
-		return new Builder(new UserCodeClassWrapper<CrossFunction>(udf));
-	}
-	
-	/**
-	 * The private constructor that only gets eIinvoked from the Builder.
-	 * @param builder
-	 */
-	protected CrossOperator(Builder builder) {
-		super(builder.udf, OperatorInfoHelper.binary(), builder.name);
-		
-		if (builder.inputs1 != null && !builder.inputs1.isEmpty()) {
-			setFirstInput(Operator.createUnionCascade(builder.inputs1));
-		}
-		if (builder.inputs2 != null && !builder.inputs2.isEmpty()) {
-			setSecondInput(Operator.createUnionCascade(builder.inputs2));
-		}
-		
-		setBroadcastVariables(builder.broadcastInputs);
-		setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(builder.udf));
-	}
-	
-
-	@Override
-	public Class<? extends Key<?>>[] getKeyClasses() {
-		return emptyClassArray();
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
-	 */
-	public static class Builder {
-		
-		/* The required parameters */
-		private final UserCodeWrapper<CrossFunction> udf;
-		
-		/* The optional parameters */
-		private List<Operator<Record>> inputs1;
-		private List<Operator<Record>> inputs2;
-		private Map<String, Operator<Record>> broadcastInputs;
-		private String name;
-		
-		/**
-		 * Creates a Builder with the provided {@link CrossFunction} implementation.
-		 * 
-		 * @param udf The {@link CrossFunction} implementation for this Cross operator.
-		 */
-		protected Builder(UserCodeWrapper<CrossFunction> udf) {
-			this.udf = udf;
-			this.inputs1 = new ArrayList<Operator<Record>>();
-			this.inputs2 = new ArrayList<Operator<Record>>();
-			this.broadcastInputs = new HashMap<String, Operator<Record>>();
-		}
-		
-		/**
-		 * Sets the first input.
-		 * 
-		 * @param input The first input.
-		 */
-		public Builder input1(Operator<Record> input) {
-			Preconditions.checkNotNull(input, "The input must not be null");
-			
-			this.inputs1.clear();
-			this.inputs1.add(input);
-			return this;
-		}
-		
-		/**
-		 * Sets the second input.
-		 * 
-		 * @param input The second input.
-		 */
-		public Builder input2(Operator<Record> input) {
-			Preconditions.checkNotNull(input, "The input must not be null");
-			
-			this.inputs2.clear();
-			this.inputs2.add(input);
-			return this;
-		}
-		
-		/**
-		 * Sets one or several inputs (union) for input 1.
-		 * 
-		 * @param inputs
-		 */
-		public Builder input1(Operator<Record>...inputs) {
-			this.inputs1.clear();
-			for (Operator<Record> c : inputs) {
-				this.inputs1.add(c);
-			}
-			return this;
-		}
-		
-		/**
-		 * Sets one or several inputs (union) for input 2.
-		 * 
-		 * @param inputs
-		 */
-		public Builder input2(Operator<Record>...inputs) {
-			this.inputs2.clear();
-			for (Operator<Record> c : inputs) {
-				this.inputs2.add(c);
-			}
-			return this;
-		}
-		
-		/**
-		 * Sets the first inputs.
-		 * 
-		 * @param inputs
-		 */
-		public Builder inputs1(List<Operator<Record>> inputs) {
-			this.inputs1 = inputs;
-			return this;
-		}
-		
-		/**
-		 * Sets the second inputs.
-		 * 
-		 * @param inputs
-		 */
-		public Builder inputs2(List<Operator<Record>> inputs) {
-			this.inputs2 = inputs;
-			return this;
-		}
-		
-		/**
-		 * Binds the result produced by a plan rooted at {@code root} to a 
-		 * variable used by the UDF wrapped in this operator.
-		 */
-		public Builder setBroadcastVariable(String name, Operator<Record> input) {
-			this.broadcastInputs.put(name, input);
-			return this;
-		}
-		
-		/**
-		 * Binds multiple broadcast variables.
-		 */
-		public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) {
-			this.broadcastInputs.clear();
-			this.broadcastInputs.putAll(inputs);
-			return this;
-		}
-		
-		/**
-		 * Sets the name of this operator.
-		 * 
-		 * @param name
-		 */
-		public Builder name(String name) {
-			this.name = name;
-			return this;
-		}
-		
-		/**
-		 * Creates and returns a CrossOperator from using the values given 
-		 * to the builder.
-		 * 
-		 * @return The created operator
-		 */
-		public CrossOperator build() {
-			setNameIfUnset();
-			return new CrossOperator(this);
-		}
-		
-		protected void setNameIfUnset() {
-			if (name == null) {
-				name = udf.getUserCodeClass().getName();
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithLargeOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithLargeOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithLargeOperator.java
deleted file mode 100644
index f4a2ad1..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithLargeOperator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossWithLarge;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.CrossFunction;
-
-
-/**
- * 
- *  <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * This operator represents a Cartesian-Product operation. Of the two inputs, the first is expected to be large
- * and the second is expected to be small. 
- * 
- * @see CrossFunction
- */
-
-@Deprecated
-public class CrossWithLargeOperator extends CrossOperator implements CrossWithLarge {
-	
-	/**
-	 * Creates a Builder with the provided {@link CrossFunction} implementation.
-	 * 
-	 * @param udf The {@link CrossFunction} implementation for this Cross operator.
-	 */
-	public static Builder builder(CrossFunction udf) {
-		return new Builder(new UserCodeObjectWrapper<CrossFunction>(udf));
-	}
-	
-	/**
-	 * Creates a Builder with the provided {@link CrossFunction} implementation.
-	 * 
-	 * @param udf The {@link CrossFunction} implementation for this Cross operator.
-	 */
-	public static Builder builder(Class<? extends CrossFunction> udf) {
-		return new Builder(new UserCodeClassWrapper<CrossFunction>(udf));
-	}
-	
-	/**
-	 * The private constructor that only gets invoked from the Builder.
-	 * @param builder
-	 */
-	protected CrossWithLargeOperator(Builder builder) {
-		super(builder);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
-	 */
-	public static class Builder extends CrossOperator.Builder {
-		
-		/**
-		 * Creates a Builder with the provided {@link CrossFunction} implementation.
-		 * 
-		 * @param udf The {@link CrossFunction} implementation for this Cross operator.
-		 */
-		private Builder(UserCodeWrapper<CrossFunction> udf) {
-			super(udf);
-		}
-		
-		/**
-		 * Creates and returns a CrossOperator from using the values given 
-		 * to the builder.
-		 * 
-		 * @return The created operator
-		 */
-		@Override
-		public CrossWithLargeOperator build() {
-			setNameIfUnset();
-			return new CrossWithLargeOperator(this);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithSmallOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithSmallOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithSmallOperator.java
deleted file mode 100644
index 9c9b758..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithSmallOperator.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.operators;
-
-import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossWithSmall;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.api.java.record.functions.CrossFunction;
-
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * 
- * This operator represents a Cartesian-Product operation. Of the two inputs, the first is expected to be large
- * and the second is expected to be small. 
- * 
- * @see CrossFunction
- */
-@Deprecated
-public class CrossWithSmallOperator extends CrossOperator implements CrossWithSmall {
-	
-	/**
-	 * Creates a Builder with the provided {@link CrossFunction} implementation.
-	 * 
-	 * @param udf The {@link CrossFunction} implementation for this Cross operator.
-	 */
-	public static Builder builder(CrossFunction udf) {
-		return new Builder(new UserCodeObjectWrapper<CrossFunction>(udf));
-	}
-	
-	/**
-	 * Creates a Builder with the provided {@link CrossFunction} implementation.
-	 * 
-	 * @param udf The {@link CrossFunction} implementation for this Cross operator.
-	 */
-	public static Builder builder(Class<? extends CrossFunction> udf) {
-		return new Builder(new UserCodeClassWrapper<CrossFunction>(udf));
-	}
-	
-	/**
-	 * The private constructor that only gets invoked from the Builder.
-	 * @param builder
-	 */
-	protected CrossWithSmallOperator(Builder builder) {
-		super(builder);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition).
-	 */
-	public static class Builder extends CrossOperator.Builder {
-		
-		/**
-		 * Creates a Builder with the provided {@link CrossFunction} implementation.
-		 * 
-		 * @param udf The {@link CrossFunction} implementation for this Cross operator.
-		 */
-		private Builder(UserCodeWrapper<CrossFunction> udf) {
-			super(udf);
-		}
-		
-		/**
-		 * Creates and returns a CrossOperator from using the values given 
-		 * to the builder.
-		 * 
-		 * @return The created operator
-		 */
-		@Override
-		public CrossWithSmallOperator build() {
-			setNameIfUnset();
-			return new CrossWithSmallOperator(this);
-		}
-	}
-}


Mime
View raw message