flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/6] incubator-flink git commit: [FLINK-1040] Make types() call in projections optional
Date Wed, 10 Dec 2014 15:50:52 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master 6d69f697f -> 799ff8ae9


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a9ecaba7/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
index c40f75b..c310c6e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/CrossOperatorTest.java
@@ -69,6 +69,22 @@ public class CrossOperatorTest {
 			Assert.fail();
 		}
 	}
+	
+	@Test
+	public void testCrossProjection21() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should work
+		try {
+			ds1.cross(ds2)
+				.projectFirst(0);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
 
 	@Test
 	public void testCrossProjection2() {
@@ -86,6 +102,23 @@ public class CrossOperatorTest {
 			Assert.fail();
 		}
 	}
+	
+	@Test
+	public void testCrossProjection22() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should work
+		try {
+			ds1.cross(ds2)
+				.projectFirst(0,3)
+				.types(Integer.class, Long.class);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
 
 	@Test
 	public void testCrossProjection3() {
@@ -104,6 +137,23 @@ public class CrossOperatorTest {
 			Assert.fail();
 		}
 	}
+	
+	@Test
+	public void testCrossProjection23() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should work
+		try {
+			ds1.cross(ds2)
+				.projectFirst(0)
+				.projectSecond(3);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
 
 	@Test
 	public void testCrossProjection4() {
@@ -124,6 +174,25 @@ public class CrossOperatorTest {
 		}
 
 	}
+	
+	@Test
+	public void testCrossProjection24() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should work
+		try {
+			ds1.cross(ds2)
+				.projectFirst(0,2)
+				.projectSecond(1,4)
+				.projectFirst(1);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+
+	}
 
 	@Test
 	public void testCrossProjection5() {
@@ -143,6 +212,24 @@ public class CrossOperatorTest {
 			Assert.fail();
 		}
 	}
+	
+	@Test
+	public void testCrossProjection25() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should work
+		try {
+			ds1.cross(ds2)
+				.projectSecond(0,2)
+				.projectFirst(1,4)
+				.projectFirst(1);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
 
 	@Test
 	public void testCrossProjection6() {
@@ -161,6 +248,23 @@ public class CrossOperatorTest {
 			Assert.fail();
 		}
 	}
+	
+	@Test
+	public void testCrossProjection26() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+
+		// should work
+		try {
+			ds1.cross(ds2)
+				.projectFirst()
+				.projectSecond();
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
 
 	@Test
 	public void testCrossProjection7() {
@@ -179,6 +283,23 @@ public class CrossOperatorTest {
 			Assert.fail();
 		}
 	}
+	
+	@Test
+	public void testCrossProjection27() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should work
+		try {
+			ds1.cross(ds2)
+				.projectSecond()
+				.projectFirst(1,4);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
 
 	@Test(expected=IndexOutOfBoundsException.class)
 	public void testCrossProjection8() {
@@ -192,6 +313,18 @@ public class CrossOperatorTest {
 			.projectFirst(5)
 			.types(Integer.class);
 	}
+	
+	@Test(expected=IndexOutOfBoundsException.class)
+	public void testCrossProjection28() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should not work, index out of range
+		ds1.cross(ds2)
+			.projectFirst(5);
+	}
 
 	@Test(expected=IndexOutOfBoundsException.class)
 	public void testCrossProjection9() {
@@ -205,45 +338,88 @@ public class CrossOperatorTest {
 			.projectSecond(5)
 			.types(Integer.class);
 	}
+	
+	@Test(expected=IndexOutOfBoundsException.class)
+	public void testCrossProjection29() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should not work, index out of range
+		ds1.cross(ds2)
+			.projectSecond(5);
+	}
 
-	@Test(expected=IllegalArgumentException.class)
 	public void testCrossProjection10() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
 
+		// should work
+		ds1.cross(ds2)
+			.projectFirst(2);
+	}
+	
+	@Test(expected=IndexOutOfBoundsException.class)
+	public void testCrossProjection30() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
 		// should not work, type does not match
 		ds1.cross(ds2)
-			.projectFirst(2)
-			.types(Integer.class);
+			.projectFirst(-1);
 	}
 
-	@Test(expected=IllegalArgumentException.class)
 	public void testCrossProjection11() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
 
-		// should not work, type does not match
+		// should work
 		ds1.cross(ds2)
-			.projectSecond(2)
-			.types(Integer.class);
+			.projectSecond(2);
 	}
 
-	@Test(expected=IllegalArgumentException.class)
+	@Test(expected=IndexOutOfBoundsException.class)
+	public void testCrossProjection31() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should not work, type does not match
+		ds1.cross(ds2)
+			.projectSecond(-1);
+	}
+	
 	public void testCrossProjection12() {
 
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
 
+		// should work
+		ds1.cross(ds2)
+			.projectSecond(2)
+			.projectFirst(1);
+	}
+	
+	@Test(expected=IndexOutOfBoundsException.class)
+	public void testCrossProjection32() {
+
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
 		// should not work, number of types and fields does not match
 		ds1.cross(ds2)
 			.projectSecond(2)
-			.projectFirst(1)
-			.types(String.class);
+			.projectFirst(-1);
 	}
 
 	@Test(expected=IndexOutOfBoundsException.class)

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a9ecaba7/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
index de50fd8..3f74e2e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/JoinOperatorTest.java
@@ -595,6 +595,42 @@ public class JoinOperatorTest {
 	}
 	
 	@Test
+	public void testJoinProjection21() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should work
+		try {
+			ds1.join(ds2).where(0).equalTo(0)
+			.projectFirst(0);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work: field index is out of bounds of input tuple
+		try {
+			ds1.join(ds2).where(0).equalTo(0).projectFirst(-1);
+			Assert.fail();
+		} catch(IndexOutOfBoundsException iob) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work: field index is out of bounds of input tuple
+		try {
+			ds1.join(ds2).where(0).equalTo(0).project(9);
+			Assert.fail();
+		} catch(IndexOutOfBoundsException iob) {
+			// we're good here
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+	
+	@Test
 	public void testJoinProjection2() {
 		
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -706,6 +742,42 @@ public class JoinOperatorTest {
 	}
 	
 	@Test
+	public void testJoinProjection26() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<CustomType> ds1 = env.fromCollection(customTypeData);
+		DataSet<CustomType> ds2 = env.fromCollection(customTypeData);
+		// should work
+		try {
+			ds1.join(ds2)
+			.where(
+					new KeySelector<CustomType, Long>() {
+							
+							@Override
+							public Long getKey(CustomType value) {
+								return value.myLong;
+							}
+						}
+					)
+			.equalTo(
+					new KeySelector<CustomType, Long>() {
+							
+							@Override
+							public Long getKey(CustomType value) {
+								return value.myLong;
+							}
+						}
+					)
+				.projectFirst()
+				.projectSecond();
+		} catch(Exception e) {
+			System.out.println("FAILED: " + e);
+			e.printStackTrace();
+			Assert.fail();
+		}
+	}
+	
+	@Test
 	public void testJoinProjection7() {
 		
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -723,6 +795,23 @@ public class JoinOperatorTest {
 		}
 	}
 	
+	@Test
+	public void testJoinProjection27() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should work
+		try {
+			ds1.join(ds2).where(0).equalTo(0)
+			.projectSecond()
+			.projectFirst(1,4);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+	}
+	
 	@Test(expected=IndexOutOfBoundsException.class)
 	public void testJoinProjection8() {
 		
@@ -737,6 +826,18 @@ public class JoinOperatorTest {
 	}
 	
 	@Test(expected=IndexOutOfBoundsException.class)
+	public void testJoinProjection28() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should not work, index out of range
+		ds1.join(ds2).where(0).equalTo(0)
+		.projectFirst(5);
+	}
+	
+	@Test(expected=IndexOutOfBoundsException.class)
 	public void testJoinProjection9() {
 		
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -749,20 +850,41 @@ public class JoinOperatorTest {
 		.types(Integer.class);
 	}
 	
-	@Test(expected=IllegalArgumentException.class)
+	@Test(expected=IndexOutOfBoundsException.class)
+	public void testJoinProjection29() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should not work, index out of range
+		ds1.join(ds2).where(0).equalTo(0)
+		.projectSecond(5);
+	}
+	
 	public void testJoinProjection10() {
 		
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
 
+		// should work
+		ds1.join(ds2).where(0).equalTo(0)
+		.projectFirst(2);
+	}
+	
+	@Test(expected=IndexOutOfBoundsException.class)
+	public void testJoinProjection30() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
 		// should not work, type does not match
 		ds1.join(ds2).where(0).equalTo(0)
-		.projectFirst(2)
-		.types(Integer.class);
+		.projectFirst(-1);
 	}
 	
-	@Test(expected=IllegalArgumentException.class)
 	public void testJoinProjection11() {
 		
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -771,18 +893,16 @@ public class JoinOperatorTest {
 
 		// should not work, type does not match
 		ds1.join(ds2).where(0).equalTo(0)
-		.projectSecond(2)
-		.types(Integer.class);
+		.projectSecond(2);
 	}
 	
-	@Test(expected=IllegalArgumentException.class)
 	public void testJoinProjection12() {
 		
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
 		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
 
-		// should not work, number of types and fields does not match
+		// should  work
 		ds1.join(ds2).where(0).equalTo(0)
 		.projectSecond(2)
 		.projectFirst(1)
@@ -804,6 +924,19 @@ public class JoinOperatorTest {
 	}
 	
 	@Test(expected=IndexOutOfBoundsException.class)
+	public void testJoinProjection33() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should not work, index out of range
+		ds1.join(ds2).where(0).equalTo(0)
+		.projectSecond(-1)
+		.projectFirst(3);
+	}
+	
+	@Test(expected=IndexOutOfBoundsException.class)
 	public void testJoinProjection14() {
 		
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -817,6 +950,19 @@ public class JoinOperatorTest {
 		.types(Integer.class);
 	}
 	
+	@Test(expected=IndexOutOfBoundsException.class)
+	public void testJoinProjection34() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds1 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> ds2 = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should not work, index out of range
+		ds1.join(ds2).where(0).equalTo(0)
+		.projectFirst(0)
+		.projectSecond(-1);
+	}
+	
 	/*
 	 * ####################################################################
 	 */

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/a9ecaba7/flink-java/src/test/java/org/apache/flink/api/java/operator/ProjectionOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/ProjectionOperatorTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/operator/ProjectionOperatorTest.java
index 5c7b91e..9c5504b 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/ProjectionOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/ProjectionOperatorTest.java
@@ -108,21 +108,43 @@ public class ProjectionOperatorTest {
 			Assert.fail();
 		}
 		
-		// should not work: too few types
+		// should work: dummy types() here
 		try {
 			tupleDs.project(2,1,4).types(String.class, Long.class);
+		} catch(Exception e) {
 			Assert.fail();
-		} catch(IllegalArgumentException iae) {
+		}
+		
+	}
+	
+	@Test
+	public void testProjectionWithoutTypes() {
+		
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData,
tupleTypeInfo);
+
+		// should work
+		try {
+			tupleDs.project(2,0,4);
+		} catch(Exception e) {
+			Assert.fail();
+		}
+		
+		// should not work: field index is out of bounds of input tuple
+		try {
+			tupleDs.project(2,-1,4);
+			Assert.fail();
+		} catch(IndexOutOfBoundsException iob) {
 			// we're good here
 		} catch(Exception e) {
 			Assert.fail();
 		}
 		
-		// should not work: given types do not match input types
+		// should not work: field index is out of bounds of input tuple
 		try {
-			tupleDs.project(2,1,4).types(String.class, Long.class, Long.class);
+			tupleDs.project(2,1,4,5,8,9);
 			Assert.fail();
-		} catch(IllegalArgumentException iae) {
+		} catch(IndexOutOfBoundsException iob) {
 			// we're good here
 		} catch(Exception e) {
 			Assert.fail();


Mime
View raw message