flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [5/8] flink git commit: [FLINK-2906] Remove Record API
Date Thu, 26 Nov 2015 00:20:43 GMT
[FLINK-2906] Remove Record API

This closes #1403


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c787a037
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c787a037
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c787a037

Branch: refs/heads/master
Commit: c787a037de5bb456844f9704389bff65b4972e18
Parents: 8fddbf0
Author: zentol <chesnay@apache.org>
Authored: Tue Nov 24 19:19:42 2015 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Nov 25 23:45:24 2015 +0100

----------------------------------------------------------------------
 docs/apis/programming_guide.md                  |   4 +-
 docs/internals/fig/projects_dependencies.svg    |   6 -
 .../common/functions/GenericCollectorMap.java   |  33 --
 .../base/CollectorMapOperatorBase.java          |  60 ---
 .../java/org/apache/flink/types/Record.java     |   2 +-
 .../java/record/functions/CoGroupFunction.java  |  57 ---
 .../java/record/functions/CrossFunction.java    |  51 --
 .../record/functions/FunctionAnnotation.java    | 482 -------------------
 .../api/java/record/functions/JoinFunction.java |  44 --
 .../api/java/record/functions/MapFunction.java  |  53 --
 .../record/functions/MapPartitionFunction.java  |  54 ---
 .../java/record/functions/ReduceFunction.java   |  83 ----
 .../java/record/io/CollectionInputFormat.java   |  97 ----
 .../api/java/record/io/CsvInputFormat.java      | 414 ----------------
 .../api/java/record/io/CsvOutputFormat.java     | 479 ------------------
 .../java/record/io/DelimitedInputFormat.java    |  49 --
 .../java/record/io/DelimitedOutputFormat.java   | 319 ------------
 .../ExternalProcessFixedLengthInputFormat.java  | 227 ---------
 .../record/io/ExternalProcessInputFormat.java   | 143 ------
 .../record/io/ExternalProcessInputSplit.java    |  58 ---
 .../api/java/record/io/FileInputFormat.java     |  31 --
 .../api/java/record/io/FileOutputFormat.java    |  32 --
 .../java/record/io/FixedLengthInputFormat.java  | 245 ----------
 .../api/java/record/io/GenericInputFormat.java  |  29 --
 .../api/java/record/io/TextInputFormat.java     | 142 ------
 .../java/record/operators/BulkIteration.java    |  49 --
 .../java/record/operators/CoGroupOperator.java  | 399 ---------------
 .../record/operators/CollectionDataSource.java  | 225 ---------
 .../java/record/operators/CrossOperator.java    | 241 ----------
 .../operators/CrossWithLargeOperator.java       |  98 ----
 .../operators/CrossWithSmallOperator.java       |  98 ----
 .../java/record/operators/DeltaIteration.java   |  99 ----
 .../api/java/record/operators/FileDataSink.java | 236 ---------
 .../java/record/operators/FileDataSource.java   |  78 ---
 .../java/record/operators/GenericDataSink.java  | 250 ----------
 .../record/operators/GenericDataSource.java     |  77 ---
 .../api/java/record/operators/JoinOperator.java | 326 -------------
 .../api/java/record/operators/MapOperator.java  | 201 --------
 .../record/operators/MapPartitionOperator.java  | 200 --------
 .../record/operators/OperatorInfoHelper.java    |  49 --
 .../java/record/operators/ReduceOperator.java   | 407 ----------------
 .../record/CoGroupWrappingFunctionTest.java     | 206 --------
 .../java/record/ReduceWrappingFunctionTest.java | 233 ---------
 .../api/java/record/io/CsvInputFormatTest.java  | 406 ----------------
 .../api/java/record/io/CsvOutputFormatTest.java | 465 ------------------
 ...ternalProcessFixedLengthInputFormatTest.java | 298 ------------
 .../io/ExternalProcessInputFormatTest.java      | 283 -----------
 .../record/io/FixedLenghtInputFormatTest.java   | 212 --------
 .../api/java/record/io/TextInputFormatTest.java | 158 ------
 .../flink/optimizer/costs/CostEstimator.java    |   3 +-
 .../flink/optimizer/dag/CollectorMapNode.java   |  62 ---
 .../operators/CollectorMapDescriptor.java       |  75 ---
 .../plandump/PlanJSONDumpGenerator.java         |   1 -
 .../optimizer/plantranslate/JsonMapper.java     |   1 -
 .../traversals/GraphCreatingVisitor.java        |   4 -
 .../runtime/operators/CollectorMapDriver.java   | 118 -----
 .../flink/runtime/operators/DriverStrategy.java |   3 -
 .../chaining/ChainedCollectorMapDriver.java     |  87 ----
 .../flink/test/util/RecordAPITestBase.java      | 146 ------
 59 files changed, 3 insertions(+), 8985 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index fe3edaa..71a5e26 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -3079,9 +3079,7 @@ attribute. Both the command line and the web interface support a parameter to pa
 class name manually for cases where the JAR manifest contains neither attribute.
 
 2. If the entry point class implements the `org.apache.flinkapi.common.Program`, then the system
-calls the `getPlan(String...)` method to obtain the program plan to execute. The
-`getPlan(String...)` method was the only possible way of defining a program in the *Record API*
-(see [0.4 docs](http://stratosphere.eu/docs/0.4/)) and is also supported in the new Java API.
+calls the `getPlan(String...)` method to obtain the program plan to execute.
 
 3. If the entry point class does not implement the `org.apache.flinkapi.common.Program` interface,
 the system will invoke the main method of the class.

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/docs/internals/fig/projects_dependencies.svg
----------------------------------------------------------------------
diff --git a/docs/internals/fig/projects_dependencies.svg b/docs/internals/fig/projects_dependencies.svg
index d537ab8..76f6276 100644
--- a/docs/internals/fig/projects_dependencies.svg
+++ b/docs/internals/fig/projects_dependencies.svg
@@ -176,12 +176,6 @@ under the License.
        y="145.73346"
        style="font-size:15.003737px;fill:#000000;font-style:italic;font-weight:normal;text-align:start;text-anchor:start;font-family:Verdana;"
        id="text3029">Java API</text>
-    <text
-       xml:space="preserve"
-       x="127.97233"
-       y="163.73794"
-       style="font-size:15.003737px;fill:#000000;font-style:italic;font-weight:normal;text-align:start;text-anchor:start;font-family:Verdana;"
-       id="text3031">Old Record API</text>
     <path
        style="fill:#ffffff;fill-rule:evenodd;fill-opacity:1;stroke:none;"
        d="  M 260.48364 198.25564   C 260.48364 195.0861 263.05303 192.51671 266.20382 192.51671   L 399.26822 192.51671   C 402.419 192.51671 404.98839 195.0861 404.98839 198.25564   L 404.98839 221.13634   C 404.98839 224.30588 402.419 226.87527 399.26822 226.87527   L 266.20382 226.87527   C 263.05303 226.87527 260.48364 224.30588 260.48364 221.13634   z"

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
deleted file mode 100644
index d335862..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.functions;
-
-import org.apache.flink.util.Collector;
-
-/**
- * Variant of the flat map that is used for backwards compatibility in the deprecated Record-API-
- *
- * @param <T> The input data type.
- * @param <O> The result data type.
- */
-@Deprecated
-public interface GenericCollectorMap<T, O> extends RichFunction {
-	
-	void map(T record, Collector<O> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
deleted file mode 100644
index b7ff2ce..0000000
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.common.operators.base;
-
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.api.common.operators.SingleInputOperator;
-import org.apache.flink.api.common.operators.UnaryOperatorInformation;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-
-/**
- * The CollectorMap is the old version of the Map operator. It is effectively a "flatMap", where the
- * UDF is called "map".
- * 
- * @see GenericCollectorMap
- */
-@Deprecated
-@SuppressWarnings("deprecation")
-public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> {
-	
-	public CollectorMapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
-		super(udf, operatorInfo, name);
-	}
-	
-	public CollectorMapOperatorBase(FT udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
-		super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, name);
-	}
-	
-	public CollectorMapOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) {
-		super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) {
-		throw new UnsupportedOperationException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-core/src/main/java/org/apache/flink/types/Record.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java b/flink-core/src/main/java/org/apache/flink/types/Record.java
index 8ef972f..7cbbc44 100644
--- a/flink-core/src/main/java/org/apache/flink/types/Record.java
+++ b/flink-core/src/main/java/org/apache/flink/types/Record.java
@@ -34,7 +34,7 @@ import org.apache.flink.util.InstantiationUtil;
 
 
 /**
- * The Record represents a multi-valued data record and forms the base of the "Record API"
+ * The Record represents a multi-valued data record.
  * The record is a tuple of arbitrary values. It implements a sparse tuple model, meaning that the record can contain
  * many fields which are actually null and not represented in the record. It has internally a bitmap marking which fields
  * are set and which are not.

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
deleted file mode 100644
index 1ddf362..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.functions;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * 
- * The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CoGroupOperator}.
- */
-
-@Deprecated
-public abstract class CoGroupFunction extends AbstractRichFunction {
-
-	private static final long serialVersionUID = 1L;
-
-	/**
-	 * This method must be implemented to provide a user implementation of a
-	 * matcher. It is called for each two key-value pairs that share the same
-	 * key and come from different inputs.
-	 * 
-	 * @param records1 The records from the first input which were paired with the key.
-	 * @param records2 The records from the second input which were paired with the key.
-	 * @param out A collector that collects all output pairs.
-	 * 
-	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
-	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
-	 *                   decide whether to retry the task execution.
-	 */
-	public abstract void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
deleted file mode 100644
index eaf34a0..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.api.java.record.functions;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.types.Record;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * The CrossFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CrossOperator}.
- */
-@Deprecated
-public abstract class CrossFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CrossFunction<Record, Record, Record> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	/**
-	 * This method must be implemented to provide a user implementation of a cross.
-	 * It is called for each element of the Cartesian product of both input sets.
-
-	 * @param first The record from the second input.
-	 * @param second The record from the second input.
-	 * @return The result of the cross UDF
-	 * 
-	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
-	 *                   runtime catches an exception, it aborts the task and lets the fail-over logic
-	 *                   decide whether to retry the task execution.
-	 */
-	@Override
-	public abstract Record cross(Record first, Record second) throws Exception;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
deleted file mode 100644
index 71d2a62..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java
+++ /dev/null
@@ -1,482 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.functions;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import org.apache.flink.api.common.operators.DualInputSemanticProperties;
-import org.apache.flink.api.common.operators.SingleInputSemanticProperties;
-import org.apache.flink.api.common.operators.util.FieldSet;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * 
- * 
- * This class defines the semantic assertions that can be added to functions.
- * The assertions are realized as java annotations, to be added to the class declaration of
- * the class that realized the user function. For example, to declare the <i>ConstantFieldsExcept</i> 
- * annotation for a map-type function that realizes a simple absolute function,
- * use it the following way:
- * 
- * <pre>{@code
- * {@literal @}ConstantFieldsExcept(fields={2})
- * public class MyMapper extends MapFunction
- * {
- *     public void map(Record record, Collector out)
- *     {
- *        int value = record.getField(2, IntValue.class).getValue();
-		record.setField(2, new IntValue(Math.abs(value)));
-		
-		out.collect(record);
- *     }
- * }
- * }</pre>
- * 
- * Be aware that some annotations should only be used for functions with as single input 
- * ({@link MapFunction}, {@link ReduceFunction}) and some only for stubs with two inputs 
- * ({@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction}).
- */
-@Deprecated
-public class FunctionAnnotation {
-	
-	/**
-	 * Specifies the fields of an input record that are unchanged in the output of 
-	 * a stub with a single input ( {@link MapFunction}, {@link ReduceFunction}).
-	 * 
-	 * A field is considered to be constant if its value is not changed and copied to the same position of 
-	 * output record.
-	 * 
-	 * <b>
-	 * It is very important to follow a conservative strategy when specifying constant fields.
-	 * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be 
-	 * inserted! Otherwise, the correct execution of a program can not be guaranteed.
-	 * So if in doubt, do not add a field to this set.
-	 * </b>
-	 * 
-	 * This annotation is mutually exclusive with the {@link ConstantFieldsExcept} annotation.
-	 * 
-	 * If this annotation and the {@link ConstantFieldsExcept} annotation is not set, it is 
-	 * assumed that <i>no</i> field is constant.
-	 *
-	 */
-	@Target(ElementType.TYPE)
-	@Retention(RetentionPolicy.RUNTIME)
-	public @interface ConstantFields {
-		int[] value();
-	}
-	
-	/**
-	 * Specifies that all fields of an input record that are unchanged in the output of 
-	 * a {@link MapFunction}, or {@link ReduceFunction}).
-	 * 
-	 * A field is considered to be constant if its value is not changed and copied to the same position of 
-	 * output record.
-	 * 
-	 * <b>
-	 * It is very important to follow a conservative strategy when specifying constant fields.
-	 * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be 
-	 * inserted! Otherwise, the correct execution of a program can not be guaranteed.
-	 * So if in doubt, do not add a field to this set.
-	 * </b>
-	 * 
-	 * This annotation is mutually exclusive with the {@link ConstantFieldsExcept} annotation.
-	 * 
-	 * If this annotation and the {@link ConstantFieldsExcept} annotation is not set, it is 
-	 * assumed that <i>no</i> field is constant.
-	 */
-	@Target(ElementType.TYPE)
-	@Retention(RetentionPolicy.RUNTIME)
-	public @interface AllFieldsConstants {}
-	
-	/**
-	 * Specifies the fields of an input record of the first input that are unchanged in 
-	 * the output of a stub with two inputs ( {@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction})
-	 * 
-	 * A field is considered to be constant if its value is not changed and copied to the same position of 
-	 * output record.
-	 * 
-	 * <b>
-	 * It is very important to follow a conservative strategy when specifying constant fields.
-	 * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be 
-	 * inserted! Otherwise, the correct execution of a program can not be guaranteed.
-	 * So if in doubt, do not add a field to this set.
-	 * </b>
-	 *
-	 * This annotation is mutually exclusive with the {@link ConstantFieldsFirstExcept} annotation.
-	 * 
-	 * If this annotation and the {@link ConstantFieldsFirstExcept} annotation is not set, it is 
-	 * assumed that <i>no</i> field is constant.
-	 * 
-	 *
-	 */
-	@Target(ElementType.TYPE)
-	@Retention(RetentionPolicy.RUNTIME)
-	public @interface ConstantFieldsFirst {
-		int[] value();
-	}
-	
-	/**
-	 * Specifies the fields of an input record of the second input that are unchanged in 
-	 * the output of a stub with two inputs ( {@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction})
-	 * 
-	 * A field is considered to be constant if its value is not changed and copied to the same position of 
-	 * output record.
-	 * 
-	 * <b>
-	 * It is very important to follow a conservative strategy when specifying constant fields.
-	 * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be 
-	 * inserted! Otherwise, the correct execution of a program can not be guaranteed.
-	 * So if in doubt, do not add a field to this set.
-	 * </b>
-	 *
-	 * This annotation is mutually exclusive with the {@link ConstantFieldsSecondExcept} annotation.
-	 * 
-	 * If this annotation and the {@link ConstantFieldsSecondExcept} annotation is not set, it is 
-	 * assumed that <i>no</i> field is constant.
-	 * 
-	 */
-	@Target(ElementType.TYPE)
-	@Retention(RetentionPolicy.RUNTIME)
-	public @interface ConstantFieldsSecond {
-		int[] value();
-	}
-	
-	/**
-	 * Specifies the fields of an input record that are changed in the output of 
-	 * a stub with a single input ( {@link MapFunction}, {@link ReduceFunction}). All other 
-	 * fields are assumed to be constant.
-	 * 
-	 * A field is considered to be constant if its value is not changed and copied to the same position of 
-	 * output record.
-	 * 
-	 * <b>
-	 * It is very important to follow a conservative strategy when specifying constant fields.
-	 * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be 
-	 * inserted! Otherwise, the correct execution of a program can not be guaranteed.
-	 * So if in doubt, do not add a field to this set.
-	 * </b>
-	 * 
-	 * This annotation is mutually exclusive with the {@link ConstantFields} annotation.
-	 * 
-	 * If this annotation and the {@link ConstantFields} annotation is not set, it is 
-	 * assumed that <i>no</i> field is constant.
-	 *
-	 */
-	@Target(ElementType.TYPE)
-	@Retention(RetentionPolicy.RUNTIME)
-	public @interface ConstantFieldsExcept {
-		int[] value();
-	}
-	
-	/**
-	 * Specifies the fields of an input record of the first input that are changed in 
-	 * the output of a stub with two inputs ( {@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction})
-	 * All other fields are assumed to be constant.
-	 * 
-	 * A field is considered to be constant if its value is not changed and copied to the same position of 
-	 * output record.
-	 * 
-	 * <b>
-	 * It is very important to follow a conservative strategy when specifying constant fields.
-	 * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be 
-	 * inserted! Otherwise, the correct execution of a program can not be guaranteed.
-	 * So if in doubt, do not add a field to this set.
-	 * </b>
-	 *
-	 * This annotation is mutually exclusive with the {@link ConstantFieldsFirst} annotation.
-	 * 
-	 * If this annotation and the {@link ConstantFieldsFirst} annotation is not set, it is 
-	 * assumed that <i>no</i> field is constant.
-	 * 
-	 */
-	@Target(ElementType.TYPE)
-	@Retention(RetentionPolicy.RUNTIME)
-	public @interface ConstantFieldsFirstExcept {
-		int[] value();
-	}
-	
-	
-	/**
-	 * Specifies the fields of an input record of the second input that are changed in 
-	 * the output of a stub with two inputs ( {@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction})
-	 * All other fields are assumed to be constant.
-	 * 
-	 * A field is considered to be constant if its value is not changed and copied to the same position of 
-	 * output record.
-	 * 
-	 * <b>
-	 * It is very important to follow a conservative strategy when specifying constant fields.
-	 * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be 
-	 * inserted! Otherwise, the correct execution of a program can not be guaranteed.
-	 * So if in doubt, do not add a field to this set.
-	 * </b>
-	 *
-	 * This annotation is mutually exclusive with the {@link ConstantFieldsSecond} annotation.
-	 * 
-	 * If this annotation and the {@link ConstantFieldsSecond} annotation is not set, it is 
-	 * assumed that <i>no</i> field is constant.
-	 * 
-	 */
-	@Target(ElementType.TYPE)
-	@Retention(RetentionPolicy.RUNTIME)
-	public @interface ConstantFieldsSecondExcept {
-		int[] value();
-	}
-
-	/**
-	 * Private constructor to prevent instantiation. This class is intended only as a container.
-	 */
-	private FunctionAnnotation() {}
-	
-	// --------------------------------------------------------------------------------------------
-	//                                   Function Annotation Handling
-	// --------------------------------------------------------------------------------------------
-	
-	public static SingleInputSemanticProperties readSingleConstantAnnotations(UserCodeWrapper<?> udf) {
-		
-		// get constantSet annotation from stub
-		AllFieldsConstants allConstants = udf.getUserCodeAnnotation(AllFieldsConstants.class);
-		ConstantFields constantSet = udf.getUserCodeAnnotation(ConstantFields.class);
-		ConstantFieldsExcept notConstantSet = udf.getUserCodeAnnotation(ConstantFieldsExcept.class);
-
-		if (notConstantSet != null && (constantSet != null || allConstants != null)) {
-			throw new RuntimeException("Either ConstantFields or ConstantFieldsExcept can be specified, not both.");
-		}
-		
-		// extract notConstantSet from annotation
-		if (notConstantSet != null) {
-			FieldSet nonConstant = new FieldSet(notConstantSet.value());
-			return new ImplicitlyForwardingSingleInputSemanticProperties(nonConstant);
-		}
-		
-		// extract notConstantSet from annotation
-		if (allConstants != null) {
-			return new SingleInputSemanticProperties.AllFieldsForwardedProperties();
-		}
-		
-		SingleInputSemanticProperties semanticProperties = new SingleInputSemanticProperties();
-		
-		// extract constantSet from annotation
-		if (constantSet != null) {
-			for (int value: constantSet.value()) {
-				semanticProperties.addForwardedField(value,value);
-			}
-		}
-		
-		return semanticProperties;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public static DualInputSemanticProperties readDualConstantAnnotations(UserCodeWrapper<?> udf) {
-		ImplicitlyForwardingTwoInputSemanticProperties semanticProperties = new ImplicitlyForwardingTwoInputSemanticProperties();
-
-		// get readSet annotation from stub
-		ConstantFieldsFirst constantSet1Annotation = udf.getUserCodeAnnotation(ConstantFieldsFirst.class);
-		ConstantFieldsSecond constantSet2Annotation = udf.getUserCodeAnnotation(ConstantFieldsSecond.class);
-		
-		// get readSet annotation from stub
-		ConstantFieldsFirstExcept notConstantSet1Annotation = udf.getUserCodeAnnotation(ConstantFieldsFirstExcept.class);
-		ConstantFieldsSecondExcept notConstantSet2Annotation = udf.getUserCodeAnnotation(ConstantFieldsSecondExcept.class);
-		
-		
-		if (notConstantSet1Annotation != null && constantSet1Annotation != null) {
-			throw new RuntimeException("Either ConstantFieldsFirst or ConstantFieldsFirstExcept can be specified, not both.");
-		}
-		
-		if (constantSet2Annotation != null && notConstantSet2Annotation != null) {
-			throw new RuntimeException("Either ConstantFieldsSecond or ConstantFieldsSecondExcept can be specified, not both.");
-		}
-		
-		
-		// extract readSets from annotations
-		if(notConstantSet1Annotation != null) {
-			semanticProperties.setImplicitlyForwardingFirstExcept(new FieldSet(notConstantSet1Annotation.value()));
-		}
-		
-		if(notConstantSet2Annotation != null) {
-			semanticProperties.setImplicitlyForwardingSecondExcept(new FieldSet(notConstantSet2Annotation.value()));
-		}
-		
-		// extract readSets from annotations
-		if (constantSet1Annotation != null) {
-			for(int value: constantSet1Annotation.value()) {
-				semanticProperties.addForwardedField(0, value, value);
-			}
-		}
-		
-		if (constantSet2Annotation != null) {
-			for(int value: constantSet2Annotation.value()) {
-				semanticProperties.addForwardedField(1, value, value);
-			}
-		}
-		
-		return semanticProperties;
-	}
-
-
-	private static final class ImplicitlyForwardingSingleInputSemanticProperties extends SingleInputSemanticProperties {
-
-		private static final long serialVersionUID = 1l;
-
-		private FieldSet nonForwardedFields;
-
-		private ImplicitlyForwardingSingleInputSemanticProperties(FieldSet nonForwardedFields) {
-			this.nonForwardedFields = nonForwardedFields;
-		}
-
-		@Override
-		public FieldSet getForwardingTargetFields(int input, int sourceField) {
-
-			if (input != 0) {
-				throw new IndexOutOfBoundsException();
-			}
-
-			if (nonForwardedFields == null) {
-				return super.getForwardingTargetFields(input, sourceField);
-			} else {
-				if (this.nonForwardedFields.contains(sourceField)) {
-					return FieldSet.EMPTY_SET;
-				} else {
-					return new FieldSet(sourceField);
-				}
-			}
-		}
-
-		@Override
-		public int getForwardingSourceField(int input, int targetField) {
-
-			if (input != 0) {
-				throw new IndexOutOfBoundsException();
-			}
-
-			if (nonForwardedFields == null) {
-				return super.getForwardingSourceField(input, targetField);
-			} else {
-				if (this.nonForwardedFields.contains(targetField)) {
-					return -1;
-				} else {
-					return targetField;
-				}
-			}
-		}
-
-		@Override
-		public FieldSet getReadFields(int input) {
-			return null;
-		}
-
-		@Override
-		public void addForwardedField(int sourceField, int destinationField) {
-			if (this.nonForwardedFields == null) {
-				super.addForwardedField(sourceField, destinationField);
-			} else {
-				throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" +
-						"(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields.");
-			}
-		}
-
-	}
-
-	private static final class ImplicitlyForwardingTwoInputSemanticProperties extends DualInputSemanticProperties {
-		private static final long serialVersionUID = 1L;
-
-		private FieldSet nonForwardedFields1;
-		private FieldSet nonForwardedFields2;
-
-		private ImplicitlyForwardingTwoInputSemanticProperties() {}
-
-		public void setImplicitlyForwardingFirstExcept(FieldSet nonForwardedFields) {
-			this.nonForwardedFields1 = nonForwardedFields;
-		}
-
-		public void setImplicitlyForwardingSecondExcept(FieldSet nonForwardedFields) {
-			this.nonForwardedFields2 = nonForwardedFields;
-		}
-
-		@Override
-		public FieldSet getForwardingTargetFields(int input, int sourceField) {
-
-			if(input != 0 && input != 1) {
-				throw new IndexOutOfBoundsException();
-			} else if (input == 0) {
-
-				if (this.nonForwardedFields1 == null) {
-					return super.getForwardingTargetFields(0, sourceField);
-				}
-				else {
-					if (this.nonForwardedFields1.contains(sourceField)) {
-						return FieldSet.EMPTY_SET;
-					} else {
-						return new FieldSet(sourceField);
-					}
-				}
-			} else {
-
-				if (this.nonForwardedFields2 == null) {
-					return super.getForwardingTargetFields(1, sourceField);
-				}
-				else {
-					if (this.nonForwardedFields2.contains(sourceField)) {
-						return FieldSet.EMPTY_SET;
-					} else {
-						return new FieldSet(sourceField);
-					}
-				}
-			}
-		}
-
-		@Override
-		public void addForwardedField(int input, int sourceField, int destinationField) {
-			if (input != 0 && input != 1) {
-				throw new IndexOutOfBoundsException();
-			} else if (input == 0) {
-				if (this.nonForwardedFields1 == null) {
-					super.addForwardedField(0, sourceField, destinationField);
-				} else {
-					throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" +
-							"(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields.");
-				}
-			} else {
-				if (this.nonForwardedFields2 == null) {
-					super.addForwardedField(1, sourceField, destinationField);
-				} else {
-					throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" +
-							"(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields.");
-				}
-			}
-		}
-
-		@Override
-		public FieldSet getReadFields(int input) {
-			return null;
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
deleted file mode 100644
index 3afb271..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.functions;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * 
- * The JoinFunction must implementation by functions of a {@link org.apache.flink.api.java.record.operators.JoinOperator}.
- * It resembles an equality join of both inputs on their key fields.
- */
-
-@Deprecated
-public abstract class JoinFunction extends AbstractRichFunction implements FlatJoinFunction<Record, Record, Record> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	@Override
-	public abstract void join(Record value1, Record value2, Collector<Record> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
deleted file mode 100644
index e51c35f..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.functions;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.GenericCollectorMap;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * The MapFunction must be extended to provide a mapper implementation
- * By definition, the mapper is called for each individual input record.
- */
-@Deprecated
-public abstract class MapFunction extends AbstractRichFunction implements GenericCollectorMap<Record, Record> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	/**
-	 * This method must be implemented to provide a user implementation of a mapper.
-	 * It is called for each individual record.
-	 * 
-	 * @param record The record to be mapped.
-	 * @param out A collector that collects all output records.
-	 * 
-	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
-	 *                   runtime catches an exception, it aborts the map task and lets the fail-over logic
-	 *                   decide whether to retry the mapper execution.
-	 */
-	@Override
-	public abstract void map(Record record, Collector<Record> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
deleted file mode 100644
index ac18c95..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.functions;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * 
- * The MapPartitionFunction must be extended to provide a map partition implementation
- * By definition, the map partition is called for a full input set.
- */
-
-@Deprecated
-public abstract class MapPartitionFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.MapPartitionFunction<Record, Record> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	/**
-	 * This method must be implemented to provide a user implementation of a MapPartitionFunction.
-	 * It is called for a full input set.
-	 *
-	 * @param values all input records
-	 * @param out A collector that collects all output records.
-	 *
-	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
-	 *                   runtime catches an exception, it aborts the map task and lets the fail-over logic
-	 *                   decide whether to retry the mapper execution.
-	 */
-	@Override
-	public abstract void mapPartition(Iterable<Record> values, Collector<Record> out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
deleted file mode 100644
index 96350aa..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.functions;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-
-/**
- * 
- * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from
- * the code at some point.
- * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b>
- * 
- * 
- * The ReduceFunction must be extended to provide a reducer implementation, as invoked by a
- * {@link org.apache.flink.api.java.record.operators.ReduceOperator}.
- */
-@Deprecated
-public abstract class ReduceFunction extends AbstractRichFunction {
-	
-	private static final long serialVersionUID = 1L;
-	
-	/**
-	 * The central function to be implemented for a reducer. The function receives per call one
-	 * key and all the values that belong to that key. Each key is guaranteed to be processed by exactly
-	 * one function call across all involved instances across all computing nodes.
-	 * 
-	 * @param records All records that belong to the given input key.
-	 * @param out The collector to hand results to.
-	 * 
-	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
-	 *                   runtime catches an exception, it aborts the reduce task and lets the fail-over logic
-	 *                   decide whether to retry the reduce execution.
-	 */
-	public abstract void reduce(Iterator<Record> records, Collector<Record> out) throws Exception;
-
-	/**
-	 * No default implementation provided.
-	 * This method must be overridden by reduce stubs that want to make use of the combining feature.
-	 * In addition, the ReduceFunction extending class must be annotated as Combinable.
-	 * Note that this function must be implemented, if the reducer is annotated as combinable.
-	 * <p>
-	 * The use of the combiner is typically a pre-reduction of the data. It works similar as the reducer, only that is
-	 * is not guaranteed to see all values with the same key in one call to the combine function. Since it is called
-	 * prior to the <code>reduce()</code> method, input and output types of the combine method are the input types of
-	 * the <code>reduce()</code> method.
-	 * 
-	 * @see org.apache.flink.api.java.record.operators.ReduceOperator.Combinable
-	 * @param records
-	 *        The records to be combined. Unlike in the reduce method, these are not necessarily all records
-	 *        belonging to the given key.
-	 * @param out The collector to write the result to.
-	 * 
-	 * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the
-	 *                   runtime catches an exception, it aborts the combine task and lets the fail-over logic
-	 *                   decide whether to retry the combiner execution.
-	 */
-	public void combine(Iterator<Record> records, Collector<Record> out) throws Exception {
-		// to be implemented, if the reducer should use a combiner. Note that the combining method
-		// is only used, if the stub class is further annotated with the annotation
-		// @ReduceOperator.Combinable
-		reduce(records, out);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/CollectionInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CollectionInputFormat.java
deleted file mode 100644
index c94c727..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CollectionInputFormat.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.io.GenericInputFormat;
-import org.apache.flink.api.common.io.NonParallelInput;
-import org.apache.flink.core.io.GenericInputSplit;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.ValueUtil;
-
-/**
- * input format for java collection input. It can accept collection data or serializable iterator
- */
-public class CollectionInputFormat extends GenericInputFormat<Record> implements NonParallelInput {
-
-	private static final long serialVersionUID = 1L;
-
-	private Collection<?> dataSet; // input data as collection
-
-	private Iterator<?> serializableIter; // input data as serializable iterator
-
-	private transient Iterator<?> it;
-
-	@Override
-	public boolean reachedEnd() throws IOException {
-		return !it.hasNext();
-	}
-
-	@Override
-	public void open(GenericInputSplit split) throws IOException {
-		super.open(split);
-		if (serializableIter != null) {
-			it = serializableIter;
-		}
-		else {
-			it = this.dataSet.iterator();
-		}
-	}
-
-	@Override
-	public Record nextRecord(Record record) throws IOException {
-		if (it.hasNext()) {
-			record.clear();
-			Object b = it.next();
-			// check whether the record field is one-dimensional or multi-dimensional
-			if (b.getClass().isArray()) {
-				for (Object s : (Object[]) b) {
-					record.addField(ValueUtil.toFlinkValueType(s));
-				}
-			}
-			else if (b instanceof Collection) {
-				@SuppressWarnings("unchecked")
-				Iterator<Object> tmpIter = ((Collection<Object>) b).iterator();
-				while (tmpIter.hasNext()) {
-					Object s = tmpIter.next();
-					record.addField(ValueUtil.toFlinkValueType(s));
-				}
-			}
-			else {
-				record.setField(0, ValueUtil.toFlinkValueType(b));
-			}
-			return record;
-		} else {
-			return null;
-		}
-	}
-
-	public void setData(Collection<?> data) {
-		this.dataSet = data;
-		this.serializableIter = null;
-	}
-
-	public <T extends Iterator<?>, Serializable> void setIter(T iter) {
-		this.serializableIter = iter;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
deleted file mode 100644
index 4e92874..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.flink.api.common.io.GenericCsvInputFormat;
-import org.apache.flink.api.common.io.ParseException;
-import org.apache.flink.api.common.operators.CompilerHints;
-import org.apache.flink.api.common.operators.Operator;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-import org.apache.flink.types.parser.FieldParser;
-
-import java.io.IOException;
-
-/**
- * Input format to parse text files and generate Records. 
- * The input file is structured by record delimiters and field delimiters (CSV files are common).
- * Record delimiter separate records from each other ('\n' is common).
- * Field delimiters separate fields within a record. 
- * Record and field delimiters must be configured using the InputFormat {@link Configuration}.
- * 
- * The number of fields to parse must be configured as well.  
- * For each field a data type must be specified using the {@link CsvInputFormat#FIELD_TYPE_PARAMETER_PREFIX} config key.
- * 
- * The position within the text record can be configured for each field using the {@link CsvInputFormat#TEXT_POSITION_PARAMETER_PREFIX} config key.
- * Either all text positions must be configured or none. If none is configured, the index of the config key is used.
- * The position of a value within the {@link Record} is the index of the config key.
- * 
- * @see Configuration
- * @see Record
- */
-@SuppressWarnings("deprecation")
-public class CsvInputFormat extends GenericCsvInputFormat<Record> {
-	
-	private static final long serialVersionUID = 1L;
-	
-	private transient Value[] parsedValues;
-	
-	private int[] targetPositions = new int[0];
-
-	private boolean configured = false;
-	
-	//To speed up readRecord processing. Used to find windows line endings.
-	//It is set when open so that readRecord does not have to evaluate it
-	private boolean lineDelimiterIsLinebreak = false;
-	
-	// --------------------------------------------------------------------------------------------
-	//  Constructors and getters/setters for the configurable parameters
-	// --------------------------------------------------------------------------------------------
-	
-	public CsvInputFormat() {
-		super();
-	}
-	
-	public CsvInputFormat(char fieldDelimiter) {
-		super();
-		setFieldDelimiter(fieldDelimiter);
-	}
-	
-	public CsvInputFormat(Class<? extends Value> ... fields) {
-		super();
-		setFieldTypes(fields);
-	}
-	
-	public CsvInputFormat(char fieldDelimiter, Class<? extends Value> ... fields) {
-		super();
-		setFieldDelimiter(fieldDelimiter);
-		setFieldTypes(fields);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	public void setFieldTypesArray(Class<? extends Value>[] fieldTypes) {
-		setFieldTypes(fieldTypes);
-	}
-
-	public void setFieldTypes(Class<? extends Value> ... fieldTypes) {
-		if (fieldTypes == null) {
-			throw new IllegalArgumentException("Field types must not be null.");
-		}
-		
-		// sanity check
-		for (Class<? extends Value> type : fieldTypes) {
-			if (type != null && !Value.class.isAssignableFrom(type)) {
-				throw new IllegalArgumentException("The types must be subclasses if " + Value.class.getName());
-			}
-		}
-		
-		setFieldTypesGeneric(fieldTypes);
-	}
-
-	public void setFields(int[] sourceFieldIndices, Class<? extends Value>[] fieldTypes) {
-		Preconditions.checkNotNull(fieldTypes);
-		
-		// sanity check
-		for (Class<? extends Value> type : fieldTypes) {
-			if (!Value.class.isAssignableFrom(type)) {
-				throw new IllegalArgumentException("The types must be subclasses if " + Value.class.getName());
-			}
-		}
-		
-		setFieldsGeneric(sourceFieldIndices, fieldTypes);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Pre-flight: Configuration
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void configure(Configuration config) {
-		super.configure(config);
-
-		if (configured) {
-			return;
-		}
-		
-		final String fieldDelimStr = config.getString(FIELD_DELIMITER_PARAMETER, null);
-		if (fieldDelimStr != null) {
-			setFieldDelimiter(fieldDelimStr);
-		}
-		
-		// read number of field configured via configuration
-		int numConfigFields = config.getInteger(NUM_FIELDS_PARAMETER, -1);
-		if (numConfigFields != -1) {
-			if (numConfigFields <= 0) {
-				throw new IllegalConfigurationException("The number of fields for the CsvInputFormat is invalid.");
-			}
-			
-			if (getNumberOfNonNullFields() > 0) {
-				throw new IllegalConfigurationException("Mixing configuration via instance parameters and config parameters is not possible.");
-			}
-		
-			int[] textPosIdx = new int[numConfigFields];
-			boolean anyTextPosSet = false;
-			boolean allTextPosSet = true;
-			int maxTextPos = -1;
-			
-			// parse text positions
-			for (int i = 0; i < numConfigFields; i++) {
-				int pos = config.getInteger(TEXT_POSITION_PARAMETER_PREFIX + i, -1);
-				if (pos == -1) {
-					allTextPosSet = false;
-					textPosIdx[i] = i;
-					maxTextPos = i;
-				} else {
-					anyTextPosSet = true;
-					textPosIdx[i] = pos;
-					maxTextPos = pos > maxTextPos ? pos : maxTextPos;
-				}
-			}
-			// check if either none or all text positions have been set
-			if (anyTextPosSet && !allTextPosSet) {
-				throw new IllegalArgumentException("Invalid configuration for CsvInputFormat: " +
-						"Not all text positions set");
-			}
-			
-			// init the array of types to be set. unify the types from the config 
-			// with the types array set on the instance
-			
-			// make sure we have a sufficiently large types array
-			@SuppressWarnings("unchecked")
-			Class<? extends Value>[] types = (Class<? extends Value>[]) new Class[maxTextPos+1];
-			int[] targetPos = new int[maxTextPos+1];
-			
-			ClassLoader cl = Thread.currentThread().getContextClassLoader();
-			
-			// set the fields
-			try {
-				for (int i = 0; i < numConfigFields; i++) {
-					int pos = textPosIdx[i];
-					
-					Class<? extends Value> clazz = config.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null, cl).asSubclass(Value.class);
-					if (clazz == null) {
-						throw new IllegalConfigurationException("Invalid configuration for CsvInputFormat: " +
-							"No field parser class for parameter " + i);
-					}
-					
-					types[pos] = clazz;
-					targetPos[pos] = i;
-				}
-			}
-			catch (ClassNotFoundException e) {
-				throw new RuntimeException("Could not resolve type classes", e);
-			}
-			
-			// update the field types
-			setFieldTypes(types);
-			
-			// make a dense target pos array
-			this.targetPositions = new int[numConfigFields];
-			for (int i = 0, k = 0; i < targetPos.length; i++) {
-				if (types[i] != null) {
-					this.targetPositions[k++] = targetPos[i];
-				}
-			}
-		}
-		else {
-			// not configured via config parameters
-			if (this.targetPositions.length == 0) {
-				this.targetPositions = new int[getNumberOfNonNullFields()];
-				for (int i = 0; i < this.targetPositions.length; i++) {
-					this.targetPositions[i] = i;
-				}
-			}
-		}
-		
-		if (getNumberOfNonNullFields() == 0) {
-			throw new IllegalConfigurationException("No fields configured in the CsvInputFormat.");
-		}
-
-		this.configured = true;
-	}
-	
-	
-	@Override
-	public void open(FileInputSplit split) throws IOException {
-		super.open(split);
-		
-		@SuppressWarnings("unchecked")
-		FieldParser<Value>[] fieldParsers = (FieldParser<Value>[]) getFieldParsers();
-		
-		// create the value holders
-		this.parsedValues = new Value[fieldParsers.length];
-		for (int i = 0; i < fieldParsers.length; i++) {
-			this.parsedValues[i] = fieldParsers[i].createValue();
-		}
-		
-		//left to right evaluation makes access [0] okay
-		//this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default
-		if(this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) {
-					this.lineDelimiterIsLinebreak = true;
-		}
-	}
-	
-	@Override
-	public Record readRecord(Record reuse, byte[] bytes, int offset, int numBytes) throws ParseException {
-		/*
-		 * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n
-		 */
-		//Find windows end line, so find carriage return before the newline
-		if(this.lineDelimiterIsLinebreak && bytes[offset + numBytes -1] == '\r') {
-			//reduce the number of bytes so that the Carriage return is not taken as data
-			numBytes--;
-		}
-		
-		if (parseRecord(parsedValues, bytes, offset, numBytes)) {
-			// valid parse, map values into pact record
-			for (int i = 0; i < parsedValues.length; i++) {
-				reuse.setField(targetPositions[i], parsedValues[i]);
-			}
-			return reuse;
-		} else {
-			return null;
-		}
-	}
-	
-	// ============================================================================================
-	//  Parameterization via configuration
-	// ============================================================================================
-	
-	// ------------------------------------- Config Keys ------------------------------------------
-	
-	private static final String FIELD_DELIMITER_PARAMETER = "recordinformat.delimiter.field";
-	
-	private static final String NUM_FIELDS_PARAMETER = "recordinformat.field.number";
-	
-	private static final String FIELD_TYPE_PARAMETER_PREFIX = "recordinformat.field.type_";
-	
-	private static final String TEXT_POSITION_PARAMETER_PREFIX = "recordinformat.text.position_";
-	
-	/**
-	 * Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent
-	 * fashion.
-	 * 
-	 * @return A config builder for setting parameters.
-	 */
-	public static ConfigBuilder configureRecordFormat(FileDataSource target) {
-		return new ConfigBuilder(target, target.getParameters());
-	}
-	
-	/**
-	 * An abstract builder used to set parameters to the input format's configuration in a fluent way.
-	 */
-	protected static class AbstractConfigBuilder<T> extends DelimitedInputFormat.AbstractConfigBuilder<T> {
-		
-		protected final RecordFormatCompilerHints hints;
-		
-		/**
-		 * Creates a new builder for the given configuration.
-		 *
-		 * @param contract The contract from which the compiler hints are used.
-		 *                 If contract is null, new compiler hints are generated.  
-		 * @param config The configuration into which the parameters will be written.
-		 */
-		protected AbstractConfigBuilder(Operator<?> contract, Configuration config) {
-			super(config);
-			
-			if (contract != null) {
-				this.hints = new RecordFormatCompilerHints(contract.getCompilerHints());
-				
-				// initialize with 2 bytes length for the header (its actually 3, but one is skipped on the first field
-				this.hints.addWidthRecordFormat(2);
-			}
-			else {
-				this.hints = new RecordFormatCompilerHints(new CompilerHints());
-			}
-		}
-		
-		// --------------------------------------------------------------------
-		
-		/**
-		 * Sets the delimiter that delimits the individual fields in the records textual input representation.
-		 * 
-		 * @param delimiter The character to be used as a field delimiter.
-		 * @return The builder itself.
-		 */
-		public T fieldDelimiter(char delimiter) {
-			this.config.setString(FIELD_DELIMITER_PARAMETER, String.valueOf(delimiter));
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-		
-		public T field(Class<? extends Value> type, int textPosition) {
-			return field(type, textPosition, Float.NEGATIVE_INFINITY);
-
-		}
-		
-		public T field(Class<? extends Value> type, int textPosition, float avgLen) {
-			// register field
-			final int numYet = this.config.getInteger(NUM_FIELDS_PARAMETER, 0);
-			this.config.setClass(FIELD_TYPE_PARAMETER_PREFIX + numYet, type);
-			this.config.setInteger(TEXT_POSITION_PARAMETER_PREFIX + numYet, textPosition);
-			this.config.setInteger(NUM_FIELDS_PARAMETER, numYet + 1);
-			
-			// register length
-			if (avgLen == Float.NEGATIVE_INFINITY) {
-				if (type == IntValue.class) {
-					avgLen = 4f;
-				} else if (type == DoubleValue.class || type == LongValue.class) {
-					avgLen = 8f;
-				}
-			}
-			
-			if (avgLen != Float.NEGATIVE_INFINITY) {
-				// add the len, plus one byte for the offset coding
-				this.hints.addWidthRecordFormat(avgLen + 1);
-			}
-			
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-	}
-	
-	/**
-	 * A builder used to set parameters to the input format's configuration in a fluent way.
-	 */
-	public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
-		
-		protected ConfigBuilder(Operator<?> target, Configuration targetConfig) {
-			super(target, targetConfig);
-		}
-	}
-	
-	private static final class RecordFormatCompilerHints extends CompilerHints {
-		
-		private float width = 0.0f;
-		
-		private RecordFormatCompilerHints(CompilerHints parent) {
-			copyFrom(parent);
-		}
-
-		@Override
-		public float getAvgOutputRecordSize() {
-			float superWidth = super.getAvgOutputRecordSize();
-			if (superWidth > 0.0f || this.width <= 0.0f) {
-				return superWidth;
-			} else {
-				return this.width;
-			}
-		}
-
-		private void addWidthRecordFormat(float width) {
-			this.width += width;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
deleted file mode 100644
index a5d83c3..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java
+++ /dev/null
@@ -1,479 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.api.java.record.io;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.io.Writer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.types.Record;
-import org.apache.flink.types.Value;
-
-/**
- * This is an OutputFormat to serialize {@link Record}s to text. The output is
- * structured by record delimiters and field delimiters as common in CSV files.
- * Record delimiter separate records from each other ('\n' is common). Field
- * delimiters separate fields within a record. Record and field delimiters can
- * be configured using the CsvOutputFormat {@link Configuration}.
- * 
- * The number of fields to serialize must be configured as well. For each field
- * the type of the {@link Value} must be specified using the
- * {@link CsvOutputFormat#FIELD_TYPE_PARAMETER_PREFIX} config key and an index
- * running from 0 to the number of fields.
- * 
- * The position within the {@link Record} can be configured for each field using
- * the {@link CsvOutputFormat#RECORD_POSITION_PARAMETER_PREFIX} config key.
- * Either all {@link Record} positions must be configured or none. If none is
- * configured, the index of the config key is used.
- * 
- * @see Value
- * @see Configuration
- * @see Record
- */
-@SuppressWarnings("deprecation")
-public class CsvOutputFormat extends FileOutputFormat {
-	private static final long serialVersionUID = 1L;
-
-	public static final String RECORD_DELIMITER_PARAMETER = "output.record.delimiter";
-
-	private static final String RECORD_DELIMITER_ENCODING = "output.record.delimiter-encoding";
-
-	public static final String FIELD_DELIMITER_PARAMETER = "output.record.field-delimiter";
-
-	public static final String NUM_FIELDS_PARAMETER = "output.record.num-fields";
-
-	public static final String FIELD_TYPE_PARAMETER_PREFIX = "output.record.type_";
-
-	public static final String RECORD_POSITION_PARAMETER_PREFIX = "output.record.position_";
-
-	public static final String LENIENT_PARSING = "output.record.lenient";
-
-	@SuppressWarnings("unused")
-	private static final Logger LOG = LoggerFactory.getLogger(CsvOutputFormat.class);
-
-	// --------------------------------------------------------------------------------------------
-
-	private int numFields;
-
-	private Class<? extends Value>[] classes;
-
-	private int[] recordPositions;
-
-	private Writer wrt;
-
-	private String fieldDelimiter;
-
-	private String recordDelimiter;
-
-	private String charsetName;
-
-	private boolean lenient;
-	
-	private boolean ctorInstantiation = false;
-
-	// --------------------------------------------------------------------------------------------
-	// Constructors and getters/setters for the configurable parameters
-	// --------------------------------------------------------------------------------------------
-
-	public CsvOutputFormat() {
-	}
-
-	/**
-	 * Creates an instance of CsvOutputFormat. The position of the fields in the
-	 * record is determined by the order in which the classes are given to this
-	 * constructor. As the default value for separating records '\n' is used.
-	 * The default field delimiter is ','.
-	 * 
-	 * @param types
-	 *            The types of the fields that are in the record.
-	 */
-	public CsvOutputFormat(Class<? extends Value>... types) {
-		this("\n", ",", types);
-	}
-
-	/**
-	 * Creates an instance of CsvOutputFormat. The position of the fields in the
-	 * record is determined by the order in which the classes are given to this
-	 * constructor. As the default value for separating records '\n' is used.
-	 * 
-	 * @param fieldDelimiter
-	 *            The delimiter that is used to separate the different fields in
-	 *            the record.
-	 * @param types
-	 *            The types of the fields that are in the record.
-	 */
-	public CsvOutputFormat(String fieldDelimiter, Class<? extends Value>... types) {
-		this("\n", fieldDelimiter, types);
-	}
-
-	/**
-	 * Creates an instance of CsvOutputFormat. The position of the fields in the
-	 * record is determined by the order in which the classes are given to this
-	 * constructor.
-	 * 
-	 * @param recordDelimiter
-	 *            The delimiter that is used to separate the different records.
-	 * @param fieldDelimiter
-	 *            The delimiter that is used to separate the different fields in
-	 *            the record.
-	 * @param types
-	 *            The types of the fields that are in the record.
-	 */
-	public CsvOutputFormat(String recordDelimiter, String fieldDelimiter, Class<? extends Value>... types) {
-		if (recordDelimiter == null) {
-			throw new IllegalArgumentException("RecordDelmiter shall not be null.");
-		}
-		if (fieldDelimiter == null) {
-			throw new IllegalArgumentException("FieldDelimiter shall not be null.");
-		}
-		if (types.length == 0) {
-			throw new IllegalArgumentException("No field types given.");
-		}
-
-		this.fieldDelimiter = fieldDelimiter;
-		this.recordDelimiter = recordDelimiter;
-		this.lenient = false;
-
-		setTypes(types);
-		ctorInstantiation = true;
-	}
-	
-	public void setTypes(Class<? extends Value>... types) {
-		this.classes = types;
-		this.numFields = types.length;
-		this.recordPositions = new int[types.length];
-		for (int i = 0; i < types.length; i++) {
-			if (types[i] == null) {
-				throw new IllegalArgumentException("Invalid Constructor Parameter: No type class for parameter " + (2 * i));
-			}
-			this.recordPositions[i] = i;
-		}
-		
-		if (this.fieldDelimiter == null) {
-			this.fieldDelimiter = ",";
-		}
-		
-		if (this.recordDelimiter == null) {
-			this.recordDelimiter = "\n";
-		}
-	}
-	
-	public void setLenient(boolean lenient) {
-		this.lenient = lenient;
-	}
-
-	@Override
-	public void configure(Configuration parameters) {
-		super.configure(parameters);
-
-		int configNumFields = parameters.getInteger(NUM_FIELDS_PARAMETER, -1);
-		
-		if (ctorInstantiation) {
-			if (configNumFields > 0) {
-				throw new IllegalStateException("CsvOutputFormat instantiated via both parameters and config.");
-			}				
-			return;										//already configured, no further actions required
-		}
-		
-		if (configNumFields < 1) {			
-			throw new IllegalStateException("CsvOutputFormat not configured via parameters or config.");			
-		}
-		
-		this.numFields = configNumFields;
-
-		@SuppressWarnings("unchecked")
-		Class<Value>[] arr = new Class[this.numFields];
-		this.classes = arr;
-
-		try {
-			ClassLoader cl = Thread.currentThread().getContextClassLoader();
-			
-			for (int i = 0; i < this.numFields; i++) {
-				Class<? extends Value> clazz =  parameters.<Value>getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null, cl);
-				if (clazz == null) {
-					throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " + "No type class for parameter " + i);
-				}
-	
-				this.classes[i] = clazz;
-			}
-		}
-		catch (ClassNotFoundException e) {
-			throw new RuntimeException("Could not resolve type classes", e);
-		}
-
-		this.recordPositions = new int[this.numFields];
-		boolean anyRecordPosDefined = false;
-		boolean allRecordPosDefined = true;
-
-		for (int i = 0; i < this.numFields; i++) {
-			int pos = parameters.getInteger(RECORD_POSITION_PARAMETER_PREFIX + i, Integer.MIN_VALUE);
-			if (pos != Integer.MIN_VALUE) {
-				anyRecordPosDefined = true;
-				if (pos < 0) {
-					throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: "
-							+ "Invalid record position for parameter " + i);
-				}
-				this.recordPositions[i] = pos;
-			} else {
-				allRecordPosDefined = false;
-				this.recordPositions[i] = i;
-			}
-		}
-
-		if (anyRecordPosDefined && !allRecordPosDefined) {
-			throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: "
-					+ "Either none or all record positions must be defined.");
-		}
-
-		this.recordDelimiter = parameters.getString(RECORD_DELIMITER_PARAMETER, AbstractConfigBuilder.NEWLINE_DELIMITER);
-		if (this.recordDelimiter == null) {
-			throw new IllegalArgumentException("The delimiter in the DelimitedOutputFormat must not be null.");
-		}
-		this.charsetName = parameters.getString(RECORD_DELIMITER_ENCODING, null);
-		this.fieldDelimiter = parameters.getString(FIELD_DELIMITER_PARAMETER, ",");
-		this.lenient = parameters.getBoolean(LENIENT_PARSING, false);
-	}
-
-	@Override
-	public void open(int taskNumber, int numTasks) throws IOException
-	{
-		super.open(taskNumber, numTasks);
-		this.wrt = this.charsetName == null ? new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096)) :
-				new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096), this.charsetName);
-	}
-
-	@Override
-	public void close() throws IOException {
-		if (wrt != null) {
-			this.wrt.close();
-		}
-		super.close();
-	}
-
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	public void writeRecord(Record record) throws IOException {
-		int numRecFields = record.getNumFields();
-		int readPos;
-
-		for (int i = 0; i < this.numFields; i++) {
-			readPos = this.recordPositions[i];
-			if (readPos < numRecFields) {
-				Value v = record.getField(this.recordPositions[i], this.classes[i]);
-				if (v != null) {
-					if (i != 0) {
-						this.wrt.write(this.fieldDelimiter);
-					}
-					this.wrt.write(v.toString());
-				} else {
-					if (this.lenient) {
-						if (i != 0) {
-							this.wrt.write(this.fieldDelimiter);
-						}
-					} else {
-						throw new RuntimeException("Cannot serialize record with <null> value at position: " + readPos);
-					}
-				}
-
-			} else {
-				if (this.lenient) {
-					if (i != 0) {
-						this.wrt.write(this.fieldDelimiter);
-					}
-				} else {
-					throw new RuntimeException("Cannot serialize record with out field at position: " + readPos);
-				}
-			}
-
-		}
-
-		// add the record delimiter
-		this.wrt.write(this.recordDelimiter);
-	}
-
-	// ============================================================================================
-
-	/**
-	 * Creates a configuration builder that can be used to set the input
-	 * format's parameters to the config in a fluent fashion.
-	 * 
-	 * @return A config builder for setting parameters.
-	 */
-	public static ConfigBuilder configureRecordFormat(FileDataSink target) {
-		return new ConfigBuilder(target.getParameters());
-	}
-
-	/**
-	 * Abstract builder used to set parameters to the input format's
-	 * configuration in a fluent way.
-	 */
-	protected static abstract class AbstractConfigBuilder<T> extends FileOutputFormat.AbstractConfigBuilder<T> {
-		private static final String NEWLINE_DELIMITER = "\n";
-
-		// --------------------------------------------------------------------
-
-		/**
-		 * Creates a new builder for the given configuration.
-		 * 
-		 * @param config
-		 *            The configuration into which the parameters will be
-		 *            written.
-		 */
-		protected AbstractConfigBuilder(Configuration config) {
-			super(config);
-		}
-
-		// --------------------------------------------------------------------
-
-		/**
-		 * Sets the delimiter to be a single character, namely the given one.
-		 * The character must be within the value range <code>0</code> to
-		 * <code>127</code>.
-		 * 
-		 * @param delimiter
-		 *            The delimiter character.
-		 * @return The builder itself.
-		 */
-		public T recordDelimiter(char delimiter) {
-			if (delimiter == '\n') {
-				this.config.setString(RECORD_DELIMITER_PARAMETER, NEWLINE_DELIMITER);
-			} else {
-				this.config.setString(RECORD_DELIMITER_PARAMETER, String.valueOf(delimiter));
-			}
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the delimiter to be the given string. The string will be
-		 * converted to bytes for more efficient comparison during input
-		 * parsing. The conversion will be done using the platforms default
-		 * charset.
-		 * 
-		 * @param delimiter
-		 *            The delimiter string.
-		 * @return The builder itself.
-		 */
-		public T recordDelimiter(String delimiter) {
-			this.config.setString(RECORD_DELIMITER_PARAMETER, delimiter);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the delimiter to be the given string. The string will be
-		 * converted to bytes for more efficient comparison during input
-		 * parsing. The conversion will be done using the charset with the given
-		 * name. The charset must be available on the processing nodes,
-		 * otherwise an exception will be raised at runtime.
-		 * 
-		 * @param delimiter
-		 *            The delimiter string.
-		 * @param charsetName
-		 *            The name of the encoding character set.
-		 * @return The builder itself.
-		 */
-		public T recordDelimiter(String delimiter, String charsetName) {
-			this.config.setString(RECORD_DELIMITER_PARAMETER, delimiter);
-			this.config.setString(RECORD_DELIMITER_ENCODING, charsetName);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the delimiter that delimits the individual fields in the records
-		 * textual output representation.
-		 * 
-		 * @param delimiter
-		 *            The character to be used as a field delimiter.
-		 * @return The builder itself.
-		 */
-		public T fieldDelimiter(char delimiter) {
-			this.config.setString(FIELD_DELIMITER_PARAMETER, String.valueOf(delimiter));
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Adds a field of the record to be serialized to the output. The field
-		 * at the given position will be interpreted as the type represented by
-		 * the given class. The types {@link Object#toString()} method will be
-		 * invoked to create a textual representation.
-		 * 
-		 * @param type
-		 *            The type of the field.
-		 * @param recordPosition
-		 *            The position in the record.
-		 * @return The builder itself.
-		 */
-		public T field(Class<? extends Value> type, int recordPosition) {
-			final int numYet = this.config.getInteger(NUM_FIELDS_PARAMETER, 0);
-			this.config.setClass(FIELD_TYPE_PARAMETER_PREFIX + numYet, type);
-			this.config.setInteger(RECORD_POSITION_PARAMETER_PREFIX + numYet, recordPosition);
-			this.config.setInteger(NUM_FIELDS_PARAMETER, numYet + 1);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-
-		/**
-		 * Sets the leniency for the serializer. A lenient serializer simply
-		 * skips missing fields and null fields in the record, while a non
-		 * lenient one throws an exception.
-		 * 
-		 * @param lenient
-		 *            True, if the serializer should be lenient, false
-		 *            otherwise.
-		 * @return The builder itself.
-		 */
-		public T lenient(boolean lenient) {
-			this.config.setBoolean(LENIENT_PARSING, lenient);
-			@SuppressWarnings("unchecked")
-			T ret = (T) this;
-			return ret;
-		}
-	}
-
-	/**
-	 * A builder used to set parameters to the input format's configuration in a
-	 * fluent way.
-	 */
-	public static final class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> {
-		/**
-		 * Creates a new builder for the given configuration.
-		 * 
-		 * @param targetConfig
-		 *            The configuration into which the parameters will be
-		 *            written.
-		 */
-		protected ConfigBuilder(Configuration targetConfig) {
-			super(targetConfig);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedInputFormat.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedInputFormat.java
deleted file mode 100644
index e74ee49..0000000
--- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedInputFormat.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.java.record.io;
-
-import org.apache.flink.types.Record;
-
-/**
- * Base implementation for input formats that split the input at a delimiter into records.
- * The parsing of the record bytes into the record has to be implemented in the
- * {@link #readRecord(Record, byte[], int, int)} method.
- * <p>
- * The default delimiter is the newline character {@code '\n'}.
- */
-public abstract class DelimitedInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat<Record> {
-	
-	private static final long serialVersionUID = -2297199268758915692L;
-
-	// --------------------------------------------------------------------------------------------
-	//  User-defined behavior
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * This function parses the given byte array which represents a serialized key/value
-	 * pair. The parsed content is then returned by setting the pair variables. If the
-	 * byte array contains invalid content the record can be skipped by returning <tt>false</tt>.
-	 * 
-	 * @param reuse The optional reusable holder for the line that is read.
-	 * @param bytes The serialized record.
-	 * @return returns whether the record was successfully deserialized
-	 */
-	public abstract Record readRecord(Record reuse, byte[] bytes, int offset, int numBytes);
-}


Mime
View raw message