flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [6/6] incubator-flink git commit: [FLINK-1040] Updated JavaDocs and removed deprecated types() calls from examples and tests
Date Wed, 10 Dec 2014 15:50:57 GMT
[FLINK-1040] Updated JavaDocs and removed deprecated types() calls from examples and tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/799ff8ae
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/799ff8ae
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/799ff8ae

Branch: refs/heads/master
Commit: 799ff8ae974d2f90d993cbcce947ed6dbdcfb078
Parents: a9ecaba
Author: Fabian Hueske <fhueske@apache.org>
Authored: Wed Dec 10 16:27:29 2014 +0100
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Dec 10 16:29:56 2014 +0100

----------------------------------------------------------------------
 docs/dataset_transformations.md                 |   2 +-
 .../flink/compiler/IterationsCompilerTest.java  |   6 +-
 .../flink/compiler/NestedIterationsTest.java    |   2 +-
 ...ustomPartitioningGlobalOptimizationTest.java |   5 +-
 .../java/DeltaIterationDependenciesTest.java    |   4 +-
 .../flink/compiler/java/OpenIterationTest.java  |   2 +-
 .../examples/java/relational/TPCHQuery10.java   |   8 +-
 .../java/org/apache/flink/api/java/DataSet.java |  18 ++--
 .../flink/api/java/operators/CrossOperator.java |  90 ++++++++++++----
 .../flink/api/java/operators/JoinOperator.java  | 102 +++++++++++++++----
 .../api/java/operators/ProjectOperator.java     |  38 ++++++-
 .../SemanticPropertiesProjectionTest.java       |   6 +-
 .../api/java/operator/CrossOperatorTest.java    |  38 +++----
 .../api/java/operator/JoinOperatorTest.java     | 102 ++++++++-----------
 .../java/operator/ProjectionOperatorTest.java   |   4 +-
 .../DeltaIterationTranslationTest.java          |   2 +-
 .../examples/java8/relational/TPCHQuery10.java  |  13 +--
 .../test/javaApiOperators/AggregateITCase.java  |   6 +-
 .../test/javaApiOperators/CrossITCase.java      |   6 +-
 .../test/javaApiOperators/DistinctITCase.java   |   8 +-
 .../test/javaApiOperators/FirstNITCase.java     |   2 +-
 .../flink/test/javaApiOperators/JoinITCase.java |   6 +-
 .../test/javaApiOperators/ProjectITCase.java    |   2 +-
 .../test/javaApiOperators/SumMinMaxITCase.java  |   6 +-
 24 files changed, 293 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/docs/dataset_transformations.md
----------------------------------------------------------------------
diff --git a/docs/dataset_transformations.md b/docs/dataset_transformations.md
index bcd5679..94933d0 100644
--- a/docs/dataset_transformations.md
+++ b/docs/dataset_transformations.md
@@ -985,7 +985,7 @@ DataSet<Tuple3<Integer, Integer, String>>
                   // hint that the second DataSet is very large
             input1.crossWithHuge(input2)
                   // apply a projection (or any Cross function)
-                  .projectFirst(0,1).projectSecond(1).types(Integer.class, String.class, String.class)
+                  .projectFirst(0,1).projectSecond(1);
 ~~~
 
 </div>

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
index 23b7cfe..87af91b 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/IterationsCompilerTest.java
@@ -69,7 +69,7 @@ public class IterationsCompilerTest extends CompilerTestBase {
 			DataSet<Tuple2<Long, Long>> result =
 				invariantInput
 					.map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
-					.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1).types(Long.class, Long.class);
+					.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1);
 			
 			iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result).print();
 			
@@ -330,12 +330,12 @@ public class IterationsCompilerTest extends CompilerTestBase {
 		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> depIteration = vertices.iterateDelta(vertices, 100, 0);
 				
 		DataSet<Tuple1<Long>> candidates = depIteration.getWorkset().join(edges).where(0).equalTo(0)
-				.projectSecond(1).types(Long.class);
+				.projectSecond(1);
 		
 		DataSet<Tuple1<Long>> grouped = candidates.groupBy(0).reduceGroup(new Reduce101());
 		
 		DataSet<Tuple2<Long, Long>> candidatesDependencies = 
-				grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1).types(Long.class, Long.class);
+				grouped.join(edges).where(0).equalTo(1).projectSecond(0, 1);
 		
 		DataSet<Tuple2<Long, Long>> verticesWithNewComponents = 
 				candidatesDependencies.join(depIteration.getSolutionSet()).where(0).equalTo(0)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java
index 88069fd..e38c821 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/NestedIterationsTest.java
@@ -159,7 +159,7 @@ public class NestedIterationsTest extends CompilerTestBase {
 			DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> mainIteration = data2.iterateDelta(data2, 100, 0);
 			
 			DataSet<Tuple2<Long, Long>> joined = mainIteration.getWorkset().join(firstResult).where(0).equalTo(0)
-							.projectFirst(0).projectSecond(0).types(Long.class, Long.class);
+							.projectFirst(0).projectSecond(0);
 			
 			DataSet<Tuple2<Long, Long>> mainResult = mainIteration.closeWith(joined, joined);
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java
index a03633d..f4d4e77 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/custompartition/CustomPartitioningGlobalOptimizationTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.compiler.custompartition;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import org.junit.Test;
 
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.functions.Partitioner;
@@ -40,8 +41,6 @@ import org.apache.flink.compiler.plan.SingleInputPlanNode;
 import org.apache.flink.compiler.plan.SinkPlanNode;
 import org.apache.flink.compiler.testfunctions.IdentityGroupReducer;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.junit.Test;
-
 
 @SuppressWarnings({"serial", "unchecked"})
 public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase {
@@ -65,7 +64,7 @@ public class CustomPartitioningGlobalOptimizationTest extends CompilerTestBase {
 			joined.groupBy(1).withPartitioner(partitioner)
 				.reduceGroup(new IdentityGroupReducer<Tuple3<Long,Long,Long>>())
 				.print();
-			
+
 			Plan p = env.createProgramPlan();
 			OptimizedPlan op = compileNoStats(p);
 			

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-compiler/src/test/java/org/apache/flink/compiler/java/DeltaIterationDependenciesTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/DeltaIterationDependenciesTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/java/DeltaIterationDependenciesTest.java
index 2ef016b..86b1c80 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/java/DeltaIterationDependenciesTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/DeltaIterationDependenciesTest.java
@@ -45,11 +45,11 @@ public class DeltaIterationDependenciesTest extends CompilerTestBase {
 
 			DataSet<Tuple2<Long, Long>> delta = deltaIteration.getSolutionSet().join(deltaIteration.getWorkset())
 														.where(0).equalTo(0)
-														.projectFirst(1).projectSecond(1).types(Long.class, Long.class);
+														.projectFirst(1).projectSecond(1);
 
 			DataSet<Tuple2<Long, Long>> nextWorkset = deltaIteration.getSolutionSet().join(input)
 														.where(0).equalTo(0)
-														.projectFirst(1).projectSecond(1).types(Long.class, Long.class);
+														.projectFirst(1).projectSecond(1);
 			
 
 			DataSet<Tuple2<Long, Long>> result = deltaIteration.closeWith(delta, nextWorkset);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-compiler/src/test/java/org/apache/flink/compiler/java/OpenIterationTest.java
----------------------------------------------------------------------
diff --git a/flink-compiler/src/test/java/org/apache/flink/compiler/java/OpenIterationTest.java b/flink-compiler/src/test/java/org/apache/flink/compiler/java/OpenIterationTest.java
index b142a23..5d3c903 100644
--- a/flink-compiler/src/test/java/org/apache/flink/compiler/java/OpenIterationTest.java
+++ b/flink-compiler/src/test/java/org/apache/flink/compiler/java/OpenIterationTest.java
@@ -161,7 +161,7 @@ public class OpenIterationTest extends CompilerTestBase {
 			DataSet<Tuple2<Long, Long>> mapped = iteration.getSolutionSet().map(new IdentityMapper<Tuple2<Long, Long>>());
 			
 			DataSet<Tuple2<Long, Long>> joined = iteration.getWorkset().join(mapped)
-												.where(0).equalTo(0).projectFirst(1).projectSecond(0).types(Long.class, Long.class);
+												.where(0).equalTo(0).projectFirst(1).projectSecond(0);
 			
 			iteration.closeWith(joined, joined)
 				.print();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
index 4ad4187..436a3d4 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/relational/TPCHQuery10.java
@@ -122,7 +122,7 @@ public class TPCHQuery10 {
 									}
 								})
 				// project fields out that are no longer required
-				.project(0,1).types(Integer.class, Integer.class);
+				.project(0,1);
 
 		// lineitems filtered by flag: (orderkey, revenue)
 		DataSet<Tuple2<Integer, Double>> lineitemsFilteredByFlag = 
@@ -154,15 +154,13 @@ public class TPCHQuery10 {
 		DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
 						.joinWithTiny(nations)
 						.where(3).equalTo(0)
-						.projectFirst(0,1,2).projectSecond(1).projectFirst(4)
-						.types(Integer.class, String.class, String.class, String.class, Double.class);
+						.projectFirst(0,1,2).projectSecond(1).projectFirst(4);
 
 		// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
 		DataSet<Tuple6<Integer, String, String, String, Double, Double>> result = 
 				customerWithNation.join(revenueByCustomer)
 				.where(0).equalTo(0)
-				.projectFirst(0,1,2,3,4).projectSecond(1)
-				.types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
+				.projectFirst(0,1,2,3,4).projectSecond(1);
 
 		// emit result
 		result.writeAsCsv(outputPath, "\n", "|");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index e4b2d4d..ebfd67f 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -230,18 +230,20 @@ public abstract class DataSet<T> {
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * Initiates a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
-	 * <b>Note: Only Tuple DataSets can be projected.</b></br>
+	 * Applies a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
+	 * <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
 	 * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
-	 * This method returns a {@link ProjectOperator} to complete the transformation.
-	 * 
-	 * @param fieldIndexes The field indexes of the input tuples that are retained.
+	 * Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}.
+	 *
+	 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+	 *
+	 * @param fieldIndexes The field indexes of the input tuple that are retained.
 	 * 					   The order of fields in the output tuple corresponds to the order of field indexes.
-	 * @return A ProjectOperator to complete the Project transformation.
-	 * 
+	 * @return A ProjectOperator that represents the projected DataSet.
+	 *
 	 * @see Tuple
 	 * @see DataSet
-	 * @see org.apache.flink.api.java.operators.ProjectOperator
+	 * @see ProjectOperator
 	 */
 	public <OUT extends Tuple> ProjectOperator<?, OUT> project(int... fieldIndexes) {
 		return new Projection<T>(this, fieldIndexes).projectTupleX();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
index 83db56c..58b48da 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/CrossOperator.java
@@ -150,14 +150,15 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 		 * {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectSecond(int...)}.
 		 *
+ 		 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+		 *
 		 * @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
 		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
-		 * @return A CrossProjection to complete the Cross transformation.
+		 * @return A ProjectCross which represents the projected cross result.
 		 *
 		 * @see Tuple
 		 * @see DataSet
-		 * @see org.apache.flink.api.java.operators.CrossOperator.CrossProjection
 		 * @see org.apache.flink.api.java.operators.CrossOperator.ProjectCross
 		 */
 		public <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectFirst(int... firstFieldIndexes) {
@@ -173,14 +174,15 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 		 * {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectSecond(int...)}.
 		 *
+		 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+		 *
 		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
 		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
-		 * @return A CrossProjection complete the Cross transformation by calling.
+		 * @return A ProjectCross which represents the projected cross result.
 		 *
 		 * @see Tuple
 		 * @see DataSet
-		 * @see org.apache.flink.api.java.operators.CrossOperator.CrossProjection
 		 * @see org.apache.flink.api.java.operators.CrossOperator.ProjectCross
 		 */
 		public <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectSecond(int... secondFieldIndexes) {
@@ -218,7 +220,27 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 			
 			this.crossProjection = crossProjection;
 		}
-		
+
+		/**
+		 * Continues a ProjectCross transformation and adds fields of the first cross input to the projection.<br/>
+		 * If the first cross input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
+		 * If the first cross input is not a Tuple DataSet, no parameters should be passed.<br/>
+		 *
+		 * Additional fields of the first and second input can be added by chaining the method calls of
+		 * {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectFirst(int...)} and
+		 * {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectSecond(int...)}.
+		 *
+		 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+		 *
+		 * @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
+		 * 					   For a non-Tuple DataSet, do not provide parameters.
+		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
+		 * @return A ProjectCross which represents the projected cross result.
+		 *
+		 * @see Tuple
+		 * @see DataSet
+		 * @see org.apache.flink.api.java.operators.CrossOperator.ProjectCross
+		 */
 		@SuppressWarnings("hiding")
 		public <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectFirst(int... firstFieldIndexes) {
 			crossProjection = crossProjection.projectFirst(firstFieldIndexes);
@@ -226,17 +248,55 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 			return crossProjection.projectTupleX();
 		}
 
+		/**
+		 * Continues a ProjectCross transformation and adds fields of the second cross input to the projection.<br/>
+		 * If the second cross input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
+		 * If the second cross input is not a Tuple DataSet, no parameters should be passed.<br/>
+		 *
+		 * Additional fields of the first and second input can be added by chaining the method calls of
+		 * {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectFirst(int...)} and
+		 * {@link org.apache.flink.api.java.operators.CrossOperator.ProjectCross#projectSecond(int...)}.
+		 *
+		 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+		 *
+		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
+		 * 					   For a non-Tuple DataSet, do not provide parameters.
+		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
+		 * @return A ProjectCross which represents the projected cross result.
+		 *
+		 * @see Tuple
+		 * @see DataSet
+		 * @see org.apache.flink.api.java.operators.CrossOperator.ProjectCross
+		 */
 		@SuppressWarnings("hiding")
 		public <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectSecond(int... secondFieldIndexes) {
 			crossProjection = crossProjection.projectSecond(secondFieldIndexes);
 			
 			return crossProjection.projectTupleX();
 		}
-		
+
+		/**
+		 * Deprecated method only kept for compatibility.
+		 *
+		 * @param types
+		 *
+		 * @return
+		 */
 		@SuppressWarnings({ "hiding", "unchecked" })
 		@Deprecated
-		public <OUT extends Tuple> ProjectCross<I1, I2, OUT> types(Class<?>... types) {
-			return (ProjectCross<I1, I2, OUT>) this;
+		public <OUT extends Tuple> CrossOperator<I1, I2, OUT> types(Class<?>... types) {
+			TupleTypeInfo<OUT> typeInfo = (TupleTypeInfo<OUT>)this.getResultType();
+
+			if(types.length != typeInfo.getArity()) {
+				throw new InvalidProgramException("Provided types do not match projection.");
+			}
+			for (int i=0; i<types.length; i++) {
+				Class<?> typeClass = types[i];
+				if (!typeClass.equals(typeInfo.getTypeAt(i).getTypeClass())) {
+					throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection");
+				}
+			}
+			return (CrossOperator<I1, I2, OUT>) this;
 		}
 
 		@Override
@@ -410,16 +470,14 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 		 * @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
 		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
-		 * @return A CrossProjection that needs to be converted into a {@link ProjectOperator} to complete the
-		 *           Cross transformation by calling
-		 *           {@link org.apache.flink.api.java.operators.CrossOperator.CrossProjection#types(Class)}.
+		 * @return An extended CrossProjection.
 		 *
 		 * @see Tuple
 		 * @see DataSet
 		 * @see org.apache.flink.api.java.operators.CrossOperator.CrossProjection
 		 * @see org.apache.flink.api.java.operators.CrossOperator.ProjectCross
 		 */
-		public CrossProjection<I1, I2> projectFirst(int... firstFieldIndexes) {
+		protected CrossProjection<I1, I2> projectFirst(int... firstFieldIndexes) {
 
 			boolean isFirstTuple;
 
@@ -478,16 +536,14 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
 		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
-		 * @return A CrossProjection that needs to be converted into a {@link ProjectOperator} to complete the
-		 *           Cross transformation by calling
-		 *           {@link org.apache.flink.api.java.operators.CrossOperator.CrossProjection#types(Class)}.
+		 * @return An extended CrossProjection.
 		 *
 		 * @see Tuple
 		 * @see DataSet
 		 * @see org.apache.flink.api.java.operators.CrossOperator.CrossProjection
 		 * @see org.apache.flink.api.java.operators.CrossOperator.ProjectCross
 		 */
-		public CrossProjection<I1, I2> projectSecond(int... secondFieldIndexes) {
+		protected CrossProjection<I1, I2> projectSecond(int... secondFieldIndexes) {
 
 			boolean isSecondTuple;
 
@@ -545,7 +601,7 @@ public class CrossOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1, I2, OUT,
 		 * 
 		 * @return The projected DataSet.
 		 * 
-		 * @see Projection
+		 * @see ProjectCross
 		 */
 		@SuppressWarnings("unchecked")
 		public <OUT extends Tuple> ProjectCross<I1, I2, OUT> projectTupleX() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
index 39d0f18..1310152 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/JoinOperator.java
@@ -547,22 +547,23 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 
 		/**
-		 * Initiates a ProjectJoin transformation and projects the first join input<br/>
+		 * Applies a ProjectJoin transformation and projects the first join input<br/>
 		 * If the first join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
 		 * If the first join input is not a Tuple DataSet, no parameters should be passed.<br/>
 		 * 
 		 * Fields of the first and second input can be added by chaining the method calls of
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
 		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
-		 * 
+		 *
+		 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+		 *
 		 * @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
 		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
-		 * @return A ProjectJoin to complete the Join transformation.
+		 * @return A ProjectJoin which represents the projected join result.
 		 * 
 		 * @see Tuple
 		 * @see DataSet
-		 * @see org.apache.flink.api.java.operators.JoinOperator.JoinProjection
 		 * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin
 		 */
 		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectFirst(int... firstFieldIndexes) {
@@ -572,22 +573,23 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		}
 		
 		/**
-		 * Initiates a ProjectJoin transformation and projects the second join input<br/>
+		 * Applies a ProjectJoin transformation and projects the second join input<br/>
 		 * If the second join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
 		 * If the second join input is not a Tuple DataSet, no parameters should be passed.<br/>
 		 * 
 		 * Fields of the first and second input can be added by chaining the method calls of
-		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectionFirst(int...)} and
-		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectionSecond(int...)}.
-		 * 
+		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
+		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
+		 *
+		 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+		 *
 		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields. 
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
 		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
-		 * @return A ProjectJoin to complete the Join transformation.
+		 * @return A ProjectJoin which represents the projected join result.
 		 * 
 		 * @see Tuple
 		 * @see DataSet
-		 * @see org.apache.flink.api.java.operators.JoinOperator.JoinProjection
 		 * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin
 		 */
 		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectSecond(int... secondFieldIndexes) {
@@ -645,14 +647,54 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			
 			this.joinProj = joinProj;
 		}
-		
+
+		/**
+		 * Continues a ProjectJoin transformation and adds fields of the first join input to the projection.<br/>
+		 * If the first join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
+		 * If the first join input is not a Tuple DataSet, no parameters should be passed.<br/>
+		 *
+		 * Additional fields of the first and second input can be added by chaining the method calls of
+		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
+		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
+		 *
+		 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+		 *
+		 * @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
+		 * 					   For a non-Tuple DataSet, do not provide parameters.
+		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
+		 * @return A ProjectJoin which represents the projected join result.
+		 *
+		 * @see Tuple
+		 * @see DataSet
+		 * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin
+		 */
 		@SuppressWarnings("hiding")
 		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectFirst(int... firstFieldIndexes) {	
 			joinProj = joinProj.projectFirst(firstFieldIndexes);
 			
 			return joinProj.projectTupleX();
 		}
-		
+
+		/**
+		 * Continues a ProjectJoin transformation and adds fields of the second join input to the projection.<br/>
+		 * If the second join input is a {@link Tuple} {@link DataSet}, fields can be selected by their index.
+		 * If the second join input is not a Tuple DataSet, no parameters should be passed.<br/>
+		 *
+		 * Additional fields of the first and second input can be added by chaining the method calls of
+		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectFirst(int...)} and
+		 * {@link org.apache.flink.api.java.operators.JoinOperator.ProjectJoin#projectSecond(int...)}.
+		 *
+		 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+		 *
+		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields.
+		 * 					   For a non-Tuple DataSet, do not provide parameters.
+		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
+		 * @return A ProjectJoin which represents the projected join result.
+		 *
+		 * @see Tuple
+		 * @see DataSet
+		 * @see org.apache.flink.api.java.operators.JoinOperator.ProjectJoin
+		 */
 		@SuppressWarnings("hiding")
 		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectSecond(int... secondFieldIndexes) {
 			joinProj = joinProj.projectSecond(secondFieldIndexes);
@@ -660,12 +702,30 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 			return joinProj.projectTupleX();
 		}
 
+		/**
+		 * Deprecated method only kept for compatibility.
+		 *
+		 * @param types
+		 *
+		 * @return
+		 */
 		@SuppressWarnings({ "unchecked", "hiding" })
 		@Deprecated
-		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> types(Class<?>... types) {
-			return (ProjectJoin<I1, I2, OUT>) this;
+		public <OUT extends Tuple> JoinOperator<I1, I2, OUT> types(Class<?>... types) {
+			TupleTypeInfo<OUT> typeInfo = (TupleTypeInfo<OUT>)this.getResultType();
+
+			if(types.length != typeInfo.getArity()) {
+				throw new InvalidProgramException("Provided types do not match projection.");
+			}
+			for (int i=0; i<types.length; i++) {
+				Class<?> typeClass = types[i];
+				if (!typeClass.equals(typeInfo.getTypeAt(i).getTypeClass())) {
+					throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection");
+				}
+			}
+			return (JoinOperator<I1, I2, OUT>) this;
 		}
-		
+
 		@Override
 		public JoinOperator<I1, I2, OUT> withConstantSetFirst(String... constantSetFirst) {
 			throw new InvalidProgramException("The semantic properties (constant fields and forwarded fields) are automatically calculated.");
@@ -1099,13 +1159,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * @param firstFieldIndexes If the first input is a Tuple DataSet, the indexes of the selected fields.
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
 		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
-		 * @return A JoinProjection that needs to be converted into a {@link ProjectOperator} to complete the 
-		 *           ProjectJoin transformation by calling the corresponding {@code types()} function.
+		 * @return An extended JoinProjection.
 		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
-		public JoinProjection<I1, I2> projectFirst(int... firstFieldIndexes) {
+		protected JoinProjection<I1, I2> projectFirst(int... firstFieldIndexes) {
 			
 			boolean isFirstTuple;
 			
@@ -1164,13 +1223,12 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * @param secondFieldIndexes If the second input is a Tuple DataSet, the indexes of the selected fields. 
 		 * 					   For a non-Tuple DataSet, do not provide parameters.
 		 * 					   The order of fields in the output tuple is defined by to the order of field indexes.
-		 * @return A JoinProjection that needs to be converted into a {@link ProjectOperator} to complete the 
-		 *           ProjectJoin transformation by calling the corresponding {@code types()} function.
+		 * @return An extended JoinProjection.
 		 * 
 		 * @see Tuple
 		 * @see DataSet
 		 */
-		public JoinProjection<I1, I2> projectSecond(int... secondFieldIndexes) {
+		protected JoinProjection<I1, I2> projectSecond(int... secondFieldIndexes) {
 			
 			boolean isSecondTuple;
 			
@@ -1228,7 +1286,7 @@ public abstract class JoinOperator<I1, I2, OUT> extends TwoInputUdfOperator<I1,
 		 * 
 		 * @return The projected DataSet.
 		 * 
-		 * @see Projection
+		 * @see ProjectJoin
 		 */
 		@SuppressWarnings("unchecked")
 		public <OUT extends Tuple> ProjectJoin<I1, I2, OUT> projectTupleX() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
index f761b81..daf129a 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/ProjectOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.api.java.operators;
 
 import java.util.Arrays;
 
+import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.Operator;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -75,7 +76,23 @@ public class ProjectOperator<IN, OUT extends Tuple>
 
 		return ppo;
 	}
-	
+
+	/**
+	 * Continues a Project transformation on a {@link Tuple} {@link DataSet}.<br/>
+	 * <b>Note: Only Tuple DataSets can be projected using field indexes.</b></br>
+	 * The transformation projects each Tuple of the DataSet onto a (sub)set of fields.</br>
+	 * Additional fields can be added to the projection by calling {@link ProjectOperator#project(int[])}.
+	 *
+	 * <b>Note: With the current implementation, the Project transformation looses type information.</b>
+	 *
+	 * @param fieldIndexes The field indexes which are added to the Project transformation.
+	 * 					   The order of fields in the output tuple corresponds to the order of field indexes.
+	 * @return A ProjectOperator that represents the projected DataSet.
+	 *
+	 * @see Tuple
+	 * @see DataSet
+	 * @see ProjectOperator
+	 */
 	@SuppressWarnings("hiding")
 	public <OUT extends Tuple> ProjectOperator<?, OUT> project(int... fieldIndexes) {
 		proj.acceptAdditionalIndexes(fieldIndexes);
@@ -83,14 +100,27 @@ public class ProjectOperator<IN, OUT extends Tuple>
 		return proj.projectTupleX();
 	}
 	/**
-	 * A fake types() call to make codes compatible
+	 * Deprecated method only kept for compatibility.
+	 *
 	 * @param types
+	 *
 	 * @return
 	 */
 	@SuppressWarnings({ "unchecked", "hiding" })
 	@Deprecated
-	public <OUT extends Tuple> ProjectOperator<?, OUT> types(Class<?>... types) {
-		return (ProjectOperator<?, OUT>) this;
+	public <OUT extends Tuple> ProjectOperator<IN, OUT> types(Class<?>... types) {
+		TupleTypeInfo<OUT> typeInfo = (TupleTypeInfo<OUT>)this.getResultType();
+
+		if(types.length != typeInfo.getArity()) {
+			throw new InvalidProgramException("Provided types do not match projection.");
+		}
+		for (int i=0; i<types.length; i++) {
+			Class<?> typeClass = types[i];
+			if (!typeClass.equals(typeInfo.getTypeAt(i).getTypeClass())) {
+				throw new InvalidProgramException("Provided type "+typeClass.getSimpleName()+" at position "+i+" does not match projection");
+			}
+		}
+		return (ProjectOperator<IN, OUT>) this;
 	}
 	
 	public static class Projection<T> {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
index 69ef5af..e5d7155 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/functions/SemanticPropertiesProjectionTest.java
@@ -63,7 +63,7 @@ public class SemanticPropertiesProjectionTest {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-			tupleDs.project(1, 3, 2).types(Long.class, Long.class, String.class).print();
+			tupleDs.project(1, 3, 2).print();
 
 			Plan plan = env.createProgramPlan();
 
@@ -121,7 +121,7 @@ public class SemanticPropertiesProjectionTest {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-			tupleDs.join(tupleDs).where(0).equalTo(0).projectFirst(2, 3).projectSecond(1, 4).types(String.class, Long.class, Long.class, Integer.class).print();
+			tupleDs.join(tupleDs).where(0).equalTo(0).projectFirst(2, 3).projectSecond(1, 4).print();
 
 			Plan plan = env.createProgramPlan();
 
@@ -181,7 +181,7 @@ public class SemanticPropertiesProjectionTest {
 			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);
 
-			DataSet<Tuple4<String, Long, Long, Integer>> result = tupleDs.cross(tupleDs).projectFirst(2, 3).projectSecond(1, 4).types(String.class, Long.class, Long.class, Integer.class);
+			DataSet<Tuple4<String, Long, Long, Integer>> result = tupleDs.cross(tupleDs).projectFirst(2, 3).projectSecond(1, 4);
 			result.print();
 
 			Plan plan = env.createProgramPlan();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
index c310c6e..474563d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
@@ -63,8 +63,7 @@ public class CrossOperatorTest {
 		// should work
 		try {
 			ds1.cross(ds2)
-				.projectFirst(0)
-				.types(Integer.class);
+				.projectFirst(0);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -96,8 +95,7 @@ public class CrossOperatorTest {
 		// should work
 		try {
 			ds1.cross(ds2)
-				.projectFirst(0,3)
-				.types(Integer.class, Long.class);
+				.projectFirst(0,3);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -113,8 +111,7 @@ public class CrossOperatorTest {
 		// should work
 		try {
 			ds1.cross(ds2)
-				.projectFirst(0,3)
-				.types(Integer.class, Long.class);
+				.projectFirst(0,3);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -131,8 +128,7 @@ public class CrossOperatorTest {
 		try {
 			ds1.cross(ds2)
 				.projectFirst(0)
-				.projectSecond(3)
-				.types(Integer.class, Long.class);
+				.projectSecond(3);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -167,8 +163,7 @@ public class CrossOperatorTest {
 			ds1.cross(ds2)
 				.projectFirst(0,2)
 				.projectSecond(1,4)
-				.projectFirst(1)
-				.types(Integer.class, String.class, Long.class, Integer.class, Long.class);
+				.projectFirst(1);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -206,8 +201,7 @@ public class CrossOperatorTest {
 			ds1.cross(ds2)
 				.projectSecond(0,2)
 				.projectFirst(1,4)
-				.projectFirst(1)
-				.types(Integer.class, String.class, Long.class, Integer.class, Long.class);
+				.projectFirst(1);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -242,8 +236,7 @@ public class CrossOperatorTest {
 		try {
 			ds1.cross(ds2)
 				.projectFirst()
-				.projectSecond()
-				.types(CustomType.class, CustomType.class);
+				.projectSecond();
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -277,8 +270,7 @@ public class CrossOperatorTest {
 		try {
 			ds1.cross(ds2)
 				.projectSecond()
-				.projectFirst(1,4)
-				.types(Tuple5.class, Long.class, Integer.class);
+				.projectFirst(1,4);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -310,8 +302,7 @@ public class CrossOperatorTest {
 
 		// should not work, index out of range
 		ds1.cross(ds2)
-			.projectFirst(5)
-			.types(Integer.class);
+			.projectFirst(5);
 	}
 	
 	@Test(expected=IndexOutOfBoundsException.class)
@@ -335,10 +326,9 @@ public class CrossOperatorTest {
 
 		// should not work, index out of range
 		ds1.cross(ds2)
-			.projectSecond(5)
-			.types(Integer.class);
+			.projectSecond(5);
 	}
-	
+
 	@Test(expected=IndexOutOfBoundsException.class)
 	public void testCrossProjection29() {
 
@@ -432,8 +422,7 @@ public class CrossOperatorTest {
 		// should not work, index out of range
 		ds1.cross(ds2)
 			.projectSecond(0)
-			.projectFirst(5)
-			.types(Integer.class);
+			.projectFirst(5);
 	}
 
 	@Test(expected=IndexOutOfBoundsException.class)
@@ -446,8 +435,7 @@ public class CrossOperatorTest {
 		// should not work, index out of range
 		ds1.cross(ds2)
 			.projectFirst(0)
-			.projectSecond(5)
-			.types(Integer.class);
+			.projectSecond(5);
 	}
 	
 	/*

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index 3f74e2e..3d4551d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -587,8 +587,7 @@ public class JoinOperatorTest {
 		// should work
 		try {
 			ds1.join(ds2).where(0).equalTo(0)
-			.projectFirst(0)
-			.types(Integer.class);
+			.projectFirst(0);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -640,8 +639,7 @@ public class JoinOperatorTest {
 		// should work
 		try {
 			ds1.join(ds2).where(0).equalTo(0)
-			.projectFirst(0,3)
-			.types(Integer.class, Long.class);
+			.projectFirst(0,3);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -658,8 +656,7 @@ public class JoinOperatorTest {
 		try {
 			ds1.join(ds2).where(0).equalTo(0)
 			.projectFirst(0)
-			.projectSecond(3)
-			.types(Integer.class, Long.class);
+			.projectSecond(3);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -677,8 +674,7 @@ public class JoinOperatorTest {
 			ds1.join(ds2).where(0).equalTo(0)
 			.projectFirst(0,2)
 			.projectSecond(1,4)
-			.projectFirst(1)
-			.types(Integer.class, String.class, Long.class, Integer.class, Long.class);
+			.projectFirst(1);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -697,8 +693,7 @@ public class JoinOperatorTest {
 			ds1.join(ds2).where(0).equalTo(0)
 			.projectSecond(0,2)
 			.projectFirst(1,4)
-			.projectFirst(1)
-			.types(Integer.class, String.class, Long.class, Integer.class, Long.class);
+			.projectFirst(1);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -714,26 +709,23 @@ public class JoinOperatorTest {
 		try {
 			ds1.join(ds2)
 			.where(
-					new KeySelector<CustomType, Long>() {
-							
-							@Override
-							public Long getKey(CustomType value) {
-								return value.myLong;
-							}
-						}
-					)
+				new KeySelector<CustomType, Long>() {
+					@Override
+					public Long getKey(CustomType value) {
+						return value.myLong;
+					}
+				}
+			)
 			.equalTo(
-					new KeySelector<CustomType, Long>() {
-							
-							@Override
-							public Long getKey(CustomType value) {
-								return value.myLong;
-							}
-						}
-					)
-				.projectFirst()
-				.projectSecond()
-				.types(CustomType.class, CustomType.class);
+				new KeySelector<CustomType, Long>() {
+					@Override
+					public Long getKey(CustomType value) {
+						return value.myLong;
+					}
+				}
+			)
+			.projectFirst()
+			.projectSecond();
 		} catch(Exception e) {
 			System.out.println("FAILED: " + e);
 			e.printStackTrace();
@@ -751,25 +743,23 @@ public class JoinOperatorTest {
 		try {
 			ds1.join(ds2)
 			.where(
-					new KeySelector<CustomType, Long>() {
-							
-							@Override
-							public Long getKey(CustomType value) {
-								return value.myLong;
-							}
-						}
-					)
+				new KeySelector<CustomType, Long>() {
+					@Override
+					public Long getKey(CustomType value) {
+						return value.myLong;
+					}
+				}
+			)
 			.equalTo(
-					new KeySelector<CustomType, Long>() {
-							
-							@Override
-							public Long getKey(CustomType value) {
-								return value.myLong;
-							}
-						}
-					)
-				.projectFirst()
-				.projectSecond();
+				new KeySelector<CustomType, Long>() {
+					@Override
+					public Long getKey(CustomType value) {
+						return value.myLong;
+					}
+				}
+			)
+			.projectFirst()
+			.projectSecond();
 		} catch(Exception e) {
 			System.out.println("FAILED: " + e);
 			e.printStackTrace();
@@ -788,8 +778,7 @@ public class JoinOperatorTest {
 		try {
 			ds1.join(ds2).where(0).equalTo(0)
 			.projectSecond()
-			.projectFirst(1,4)
-			.types(Tuple5.class, Long.class, Integer.class);
+			.projectFirst(1,4);
 		} catch(Exception e) {
 			Assert.fail();
 		}
@@ -821,8 +810,7 @@ public class JoinOperatorTest {
 
 		// should not work, index out of range
 		ds1.join(ds2).where(0).equalTo(0)
-		.projectFirst(5)
-		.types(Integer.class);
+		.projectFirst(5);
 	}
 	
 	@Test(expected=IndexOutOfBoundsException.class)
@@ -846,8 +834,7 @@ public class JoinOperatorTest {
 
 		// should not work, index out of range
 		ds1.join(ds2).where(0).equalTo(0)
-		.projectSecond(5)
-		.types(Integer.class);
+		.projectSecond(5);
 	}
 	
 	@Test(expected=IndexOutOfBoundsException.class)
@@ -905,8 +892,7 @@ public class JoinOperatorTest {
 		// should  work
 		ds1.join(ds2).where(0).equalTo(0)
 		.projectSecond(2)
-		.projectFirst(1)
-		.types(String.class);
+		.projectFirst(1);
 	}
 	
 	@Test(expected=IndexOutOfBoundsException.class)
@@ -919,8 +905,7 @@ public class JoinOperatorTest {
 		// should not work, index out of range
 		ds1.join(ds2).where(0).equalTo(0)
 		.projectSecond(0)
-		.projectFirst(5)
-		.types(Integer.class);
+		.projectFirst(5);
 	}
 	
 	@Test(expected=IndexOutOfBoundsException.class)
@@ -946,8 +931,7 @@ public class JoinOperatorTest {
 		// should not work, index out of range
 		ds1.join(ds2).where(0).equalTo(0)
 		.projectFirst(0)
-		.projectSecond(5)
-		.types(Integer.class);
+		.projectSecond(5);
 	}
 	
 	@Test(expected=IndexOutOfBoundsException.class)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-java/src/test/java/org/apache/flink/api/java/operator/ProjectionOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/ProjectionOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/ProjectionOperatorTest.java
index 9c5504b..048c311 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/ProjectionOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/ProjectionOperatorTest.java
@@ -103,14 +103,14 @@ public class ProjectionOperatorTest {
 
 		// should work
 		try {
-			tupleDs.project(0).types(Integer.class);
+			tupleDs.project(0);
 		} catch(Exception e) {
 			Assert.fail();
 		}
 		
 		// should work: dummy types() here
 		try {
-			tupleDs.project(2,1,4).types(String.class, Long.class);
+			tupleDs.project(2,1,4);
 		} catch(Exception e) {
 			Assert.fail();
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
index 3390c27..55a2aff 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operators/translation/DeltaIterationTranslationTest.java
@@ -83,7 +83,7 @@ public class DeltaIterationTranslationTest implements java.io.Serializable {
 				DataSet<Tuple2<Double, String>> worksetSelfJoin = 
 					iteration.getWorkset()
 						.map(new IdentityMapper<Tuple2<Double,String>>())
-						.join(iteration.getWorkset()).where(1).equalTo(1).projectFirst(0, 1).types(Double.class, String.class);
+						.join(iteration.getWorkset()).where(1).equalTo(1).projectFirst(0, 1);
 				
 				DataSet<Tuple3<Double, Long, String>> joined = worksetSelfJoin.join(iteration.getSolutionSet()).where(1).equalTo(2).with(new SolutionWorksetJoin());
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
----------------------------------------------------------------------
diff --git a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
index 4fd8b39..1e05a31 100644
--- a/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
+++ b/flink-java8/src/main/java/org/apache/flink/examples/java8/relational/TPCHQuery10.java
@@ -111,21 +111,20 @@ public class TPCHQuery10 {
 				// filter by year
 				orders.filter(order -> Integer.parseInt(order.f2.substring(0, 4)) > 1990)
 				// project fields out that are no longer required
-				.project(0,1).types(Integer.class, Integer.class);
+				.project(0,1);
 
 		// lineitems filtered by flag: (orderkey, extendedprice, discount)
 		DataSet<Tuple3<Integer, Double, Double>> lineitemsFilteredByFlag = 
 				// filter by flag
 				lineitems.filter(lineitem -> lineitem.f3.equals("R"))
 				// project fields out that are no longer required
-				.project(0,1,2).types(Integer.class, Double.class, Double.class);
+				.project(0,1,2);
 
 		// join orders with lineitems: (custkey, extendedprice, discount)
 		DataSet<Tuple3<Integer, Double, Double>> lineitemsOfCustomerKey = 
 				ordersFilteredByYear.joinWithHuge(lineitemsFilteredByFlag)
 									.where(0).equalTo(0)
-									.projectFirst(1).projectSecond(1,2)
-									.types(Integer.class, Double.class, Double.class);
+									.projectFirst(1).projectSecond(1,2);
 
 		// aggregate for revenue: (custkey, revenue)
 		DataSet<Tuple2<Integer, Double>> revenueOfCustomerKey = lineitemsOfCustomerKey
@@ -139,15 +138,13 @@ public class TPCHQuery10 {
 		DataSet<Tuple5<Integer, String, String, String, Double>> customerWithNation = customers
 						.joinWithTiny(nations)
 						.where(3).equalTo(0)
-						.projectFirst(0,1,2).projectSecond(1).projectFirst(4)
-						.types(Integer.class, String.class, String.class, String.class, Double.class);
+						.projectFirst(0,1,2).projectSecond(1).projectFirst(4);
 
 		// join customer (with nation) with revenue (custkey, name, address, nationname, acctbal, revenue)
 		DataSet<Tuple6<Integer, String, String, String, Double, Double>> customerWithRevenue = 
 				customerWithNation.join(revenueOfCustomerKey)
 				.where(0).equalTo(0)
-				.projectFirst(0,1,2,3,4).projectSecond(1)
-				.types(Integer.class, String.class, String.class, String.class, Double.class, Double.class);
+				.projectFirst(0,1,2,3,4).projectSecond(1);
 
 		// emit result
 		customerWithRevenue.writeAsCsv(outputPath);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
index 9263726..61ad863 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/AggregateITCase.java
@@ -94,7 +94,7 @@ public class AggregateITCase extends JavaProgramTestBase {
 				DataSet<Tuple2<Integer, Long>> aggregateDs = ds
 						.aggregate(Aggregations.SUM, 0)
 						.and(Aggregations.MAX, 1)
-						.project(0, 1).types(Integer.class, Long.class);
+						.project(0, 1);
 				
 				aggregateDs.writeAsCsv(resultPath);
 				env.execute();
@@ -112,7 +112,7 @@ public class AggregateITCase extends JavaProgramTestBase {
 				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 				DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
 						.aggregate(Aggregations.SUM, 0)
-						.project(1, 0).types(Long.class, Integer.class);
+						.project(1, 0);
 				
 				aggregateDs.writeAsCsv(resultPath);
 				env.execute();
@@ -136,7 +136,7 @@ public class AggregateITCase extends JavaProgramTestBase {
 				DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
 						.aggregate(Aggregations.MIN, 0)
 						.aggregate(Aggregations.MIN, 0)
-						.project(0).types(Integer.class);
+						.project(0);
 				
 				aggregateDs.writeAsCsv(resultPath);
 				env.execute();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
index 4b84bbf..7d79ea5 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/CrossITCase.java
@@ -262,8 +262,7 @@ public class CrossITCase extends JavaProgramTestBase {
 					.projectFirst(2, 1)
 					.projectSecond(3)
 					.projectFirst(0)
-					.projectSecond(4,1)
-					.types(String.class, Long.class, String.class, Integer.class, Long.class, Long.class);
+					.projectSecond(4,1);
 
 				crossDs.writeAsCsv(resultPath);
 				env.execute();
@@ -294,8 +293,7 @@ public class CrossITCase extends JavaProgramTestBase {
 						.projectSecond(3)
 						.projectFirst(2, 1)
 						.projectSecond(4,1)
-						.projectFirst(0)
-						.types(String.class, String.class, Long.class, Long.class, Long.class, Integer.class);
+						.projectFirst(0);
 
 					crossDs.writeAsCsv(resultPath);
 					env.execute();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
index b183a57..fb62459 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/DistinctITCase.java
@@ -115,7 +115,7 @@ public class DistinctITCase extends JavaProgramTestBase {
 				final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 				
 				DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
-				DataSet<Tuple1<Integer>> distinctDs = ds.union(ds).distinct(0).project(0).types(Integer.class);
+				DataSet<Tuple1<Integer>> distinctDs = ds.union(ds).distinct(0).project(0);
 				
 				distinctDs.writeAsCsv(resultPath);
 				env.execute();
@@ -140,7 +140,7 @@ public class DistinctITCase extends JavaProgramTestBase {
 									public Integer getKey(Tuple5<Integer, Long,  Integer, String, Long> in) {
 										return in.f0;
 									}
-								}).project(0).types(Integer.class);
+								}).project(0);
 				
 				reduceDs.writeAsCsv(resultPath);
 				env.execute();
@@ -222,7 +222,7 @@ public class DistinctITCase extends JavaProgramTestBase {
 										return new Tuple2<Integer, Long>(t.f0, t.f4);
 									}
 								})
-						.project(0,4).types(Integer.class, Long.class);
+						.project(0,4);
 				
 				reduceDs.writeAsCsv(resultPath);
 				env.execute();
@@ -249,7 +249,7 @@ public class DistinctITCase extends JavaProgramTestBase {
 				
 				DataSet<Tuple5<Integer, Long,  Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
 				DataSet<Tuple1<Integer>> reduceDs = ds.union(ds)
-						.distinct("f0").project(0).types(Integer.class);
+						.distinct("f0").project(0);
 				
 				reduceDs.writeAsCsv(resultPath);
 				env.execute();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
index 358b81b..770cf88 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/FirstNITCase.java
@@ -126,7 +126,7 @@ public class FirstNITCase extends JavaProgramTestBase {
 				
 				DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 				DataSet<Tuple2<Long, Integer>> first = ds.groupBy(1).sortGroup(0, Order.DESCENDING).first(3)
-															.project(1,0).types(Long.class, Integer.class);
+															.project(1,0);
 				
 				first.writeAsText(resultPath);
 				env.execute();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
index 7cfb867..5f8de8a 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/JoinITCase.java
@@ -343,8 +343,7 @@ public class JoinITCase extends JavaProgramTestBase {
 						   .projectFirst(2,1)
 						   .projectSecond(3)
 						   .projectFirst(0)
-						   .projectSecond(4,1)
-						   .types(String.class, Long.class, String.class, Integer.class, Long.class, Long.class);
+						   .projectSecond(4,1);
 				
 				joinDs.writeAsCsv(resultPath);
 				env.execute();
@@ -372,8 +371,7 @@ public class JoinITCase extends JavaProgramTestBase {
 						   .projectSecond(3)
 						   .projectFirst(2,1)
 						   .projectSecond(4,1)
-						   .projectFirst(0)
-						   .types(String.class, String.class, Long.class, Long.class, Long.class, Integer.class);
+						   .projectFirst(0);
 				
 				joinDs.writeAsCsv(resultPath);
 				env.execute();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
index 1796dd0..7bde1a4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/ProjectITCase.java
@@ -92,7 +92,7 @@ public class ProjectITCase extends JavaProgramTestBase {
 				
 				DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.get5TupleDataSet(env);
 				DataSet<Tuple3<String, Long, Integer>> projDs = ds.
-						project(3,4,2).types(String.class, Long.class, Integer.class);
+						project(3,4,2);
 				projDs.writeAsCsv(resultPath);
 				
 				env.execute();

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/799ff8ae/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
index 61ba722..d63d08c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/SumMinMaxITCase.java
@@ -92,7 +92,7 @@ public class SumMinMaxITCase extends JavaProgramTestBase  {
 					DataSet<Tuple2<Integer, Long>> sumDs = ds
 							.sum(0)
 							.andMax(1)
-							.project(0, 1).types(Integer.class, Long.class);
+							.project(0, 1);
 
 					sumDs.writeAsCsv(resultPath);
 					env.execute();
@@ -110,7 +110,7 @@ public class SumMinMaxITCase extends JavaProgramTestBase  {
 					DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
 					DataSet<Tuple2<Long, Integer>> aggregateDs = ds.groupBy(1)
 							.sum(0)
-							.project(1, 0).types(Long.class, Integer.class);
+							.project(1, 0);
 
 					aggregateDs.writeAsCsv(resultPath);
 					env.execute();
@@ -134,7 +134,7 @@ public class SumMinMaxITCase extends JavaProgramTestBase  {
 					DataSet<Tuple1<Integer>> aggregateDs = ds.groupBy(1)
 							.min(0)
 							.min(0)
-							.project(0).types(Integer.class);
+							.project(0);
 
 					aggregateDs.writeAsCsv(resultPath);
 					env.execute();


Mime
View raw message