flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject flink git commit: [FLINK-3321] TupleSerializer.getLength() can return fixed-length size
Date Tue, 08 Mar 2016 11:06:15 GMT
Repository: flink
Updated Branches:
  refs/heads/master 945fc023a -> d7aa989ee


[FLINK-3321] TupleSerializer.getLength() can return fixed-length size

This closes #1654.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d7aa989e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d7aa989e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d7aa989e

Branch: refs/heads/master
Commit: d7aa989ee1c72d7830c612554b89673bf38cf447
Parents: 945fc02
Author: zentol <s.motsu@web.de>
Authored: Tue Feb 16 12:23:41 2016 +0100
Committer: zentol <s.motsu@web.de>
Committed: Tue Mar 8 12:02:21 2016 +0100

----------------------------------------------------------------------
 .../typeutils/runtime/TupleSerializerBase.java   | 16 +++++++++++++++-
 .../typeutils/runtime/TupleSerializerTest.java   | 19 +++++++++----------
 .../api/scala/runtime/TupleSerializerTest.scala  | 18 +++++++++---------
 3 files changed, 33 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d7aa989e/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
index fc657a1..8b1d8ca 100644
--- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
+++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java
@@ -38,6 +38,8 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T>
{
 
 	protected final int arity;
 
+	private int length = -2;
+
 	@SuppressWarnings("unchecked")
 	public TupleSerializerBase(Class<T> tupleClass, TypeSerializer<?>[] fieldSerializers)
{
 		this.tupleClass = Preconditions.checkNotNull(tupleClass);
@@ -56,7 +58,19 @@ public abstract class TupleSerializerBase<T> extends TypeSerializer<T>
{
 
 	@Override
 	public int getLength() {
-		return -1;
+		if (length == -2) {
+			int sum = 0;
+			for (TypeSerializer<Object> serializer : fieldSerializers) {
+				if (serializer.getLength() > 0) {
+					sum += serializer.getLength();
+				} else {
+					length = -1;
+					return length;
+				}
+			}
+			length = sum;
+		}
+		return length;
 	}
 
 	public int getArity() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d7aa989e/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
index 017eb44..13f91b0 100644
--- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerTest.java
@@ -46,7 +46,7 @@ public class TupleSerializerTest {
 	public void testTuple0() {
 		Tuple0[] testTuples = new Tuple0[] { Tuple0.INSTANCE, Tuple0.INSTANCE, Tuple0.INSTANCE
};
 
-		runTests(testTuples);
+		runTests(1, testTuples);
 	}
 
 	@Test
@@ -57,7 +57,7 @@ public class TupleSerializerTest {
 			new Tuple1<Integer>(Integer.MAX_VALUE), new Tuple1<Integer>(Integer.MIN_VALUE)
 		};
 		
-		runTests(testTuples);
+		runTests(4, testTuples);
 	}
 	
 	@Test
@@ -74,7 +74,7 @@ public class TupleSerializerTest {
 			new Tuple1<String>("")
 		};
 		
-		runTests(testTuples);
+		runTests(-1, testTuples);
 	}
 	
 	@Test
@@ -101,7 +101,7 @@ public class TupleSerializerTest {
 			new Tuple1<String[]>(arr2)
 		};
 		
-		runTests(testTuples);
+		runTests(-1, testTuples);
 	}
 	
 	@Test
@@ -118,7 +118,7 @@ public class TupleSerializerTest {
 				new Tuple2<String, Double>(StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble())
 			};
 		
-		runTests(testTuples);
+		runTests(-1, testTuples);
 	}
 	
 	@Test
@@ -148,7 +148,7 @@ public class TupleSerializerTest {
 			new Tuple2<String, String[]>(StringUtils.getRandomString(rnd, 30, 170), arr2)
 		};
 		
-		runTests(testTuples);
+		runTests(-1, testTuples);
 	}
 	
 
@@ -212,17 +212,16 @@ public class TupleSerializerTest {
 				new Tuple5<SimpleTypes, Book, ComplexNestedObject1, BookAuthor, ComplexNestedObject2>(g,
b6, o4, ba1, co2)
 		};
 		
-		runTests(testTuples);
+		runTests(-1, testTuples);
 	}
 
-	private <T extends Tuple> void runTests(T... instances) {
+	private <T extends Tuple> void runTests(int length, T... instances) {
 		try {
 			TupleTypeInfo<T> tupleTypeInfo = (TupleTypeInfo<T>) TypeExtractor.getForObject(instances[0]);
 			TypeSerializer<T> serializer = tupleTypeInfo.createSerializer(new ExecutionConfig());
 			
 			Class<T> tupleClass = tupleTypeInfo.getTypeClass();
-			
-			int length = -1;
+
 			if(tupleClass == Tuple0.class) {
 				length = 1;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/d7aa989e/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
index 368204b..b210c99 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/runtime/TupleSerializerTest.scala
@@ -40,7 +40,7 @@ class TupleSerializerTest {
   def testTuple1Int(): Unit = {
     val testTuples = Array(Tuple1(42), Tuple1(1), Tuple1(0), Tuple1(-1), Tuple1(Int.MaxValue),
       Tuple1(Int.MinValue))
-    runTests(testTuples)
+    runTests(testTuples, 4)
   }
 
   @Test
@@ -53,7 +53,7 @@ class TupleSerializerTest {
       Tuple1(StringUtils.getRandomString(rnd, 30, 170)),
       Tuple1(StringUtils.getRandomString(rnd, 15, 50)),
       Tuple1(""))
-    runTests(testTuples)
+    runTests(testTuples, -1)
   }
 
   @Test
@@ -77,7 +77,7 @@ class TupleSerializerTest {
       StringUtils.getRandomString(rnd, 100 * 1024, 105 * 1024),
       "bar")
     val testTuples = Array(Tuple1(arr1), Tuple1(arr2))
-    runTests(testTuples)
+    runTests(testTuples, -1)
   }
 
   @Test
@@ -91,7 +91,7 @@ class TupleSerializerTest {
       ("", rnd.nextDouble),
       (StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble),
       (StringUtils.getRandomString(rnd, 10, 100), rnd.nextDouble))
-    runTests(testTuples)
+    runTests(testTuples, -1)
   }
 
   @Test
@@ -106,7 +106,7 @@ class TupleSerializerTest {
       (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)),
       (StringUtils.getRandomString(rnd, 10, 100), new LocalDate(rnd.nextInt)))
       
-    runTests(testTuples)
+    runTests(testTuples, -1)
   }
 
   @Test
@@ -134,7 +134,7 @@ class TupleSerializerTest {
       (StringUtils.getRandomString(rnd, 30, 170), arr1),
       (StringUtils.getRandomString(rnd, 30, 170), arr2),
       (StringUtils.getRandomString(rnd, 30, 170), arr2))
-    runTests(testTuples)
+    runTests(testTuples, -1)
   }
 
   @Test
@@ -189,10 +189,10 @@ class TupleSerializerTest {
       (e, b4, o5, ba2, co4),
       (f, b5, o1, ba2, co4),
       (g, b6, o4, ba1, co2))
-    runTests(testTuples)
+    runTests(testTuples, -1)
   }
 
-  private final def runTests[T <: Product : TypeInformation](instances: Array[T]) {
+  private final def runTests[T <: Product : TypeInformation](instances: Array[T], length:
Int) {
     try {
       // Register the custom Kryo Serializer
       val conf = new ExecutionConfig()
@@ -201,7 +201,7 @@ class TupleSerializerTest {
       val tupleTypeInfo = implicitly[TypeInformation[T]].asInstanceOf[TupleTypeInfoBase[T]]
       val serializer = tupleTypeInfo.createSerializer(conf)
       val tupleClass = tupleTypeInfo.getTypeClass
-      val test = new TupleSerializerTestInstance[T](serializer, tupleClass, -1, instances)
+      val test = new TupleSerializerTestInstance[T](serializer, tupleClass, length, instances)
       test.testAll()
     } catch {
       case e: Exception => {


Mime
View raw message