flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-2203] handling null values for RowSerializer
Date Sat, 13 Jun 2015 19:09:21 GMT
Repository: flink
Updated Branches:
  refs/heads/master e4b569505 -> f8e12b20d


[FLINK-2203] handling null values for RowSerializer

This closes #831


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8e12b20
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8e12b20
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8e12b20

Branch: refs/heads/master
Commit: f8e12b20d925c3f6f24769327d1da5d98affa679
Parents: e4b5695
Author: Shiti <ssaxena.ece@gmail.com>
Authored: Fri Jun 12 20:55:48 2015 +0530
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Sat Jun 13 21:07:53 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-table/pom.xml               |  8 +++
 .../api/table/typeinfo/RowSerializer.scala      | 73 +++++++++++++++-----
 .../api/table/typeinfo/RowSerializerTest.scala  | 70 +++++++++++++++++++
 3 files changed, 133 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f8e12b20/flink-staging/flink-table/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/pom.xml b/flink-staging/flink-table/pom.xml
index cbd1c47..20600ff 100644
--- a/flink-staging/flink-table/pom.xml
+++ b/flink-staging/flink-table/pom.xml
@@ -94,6 +94,14 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/f8e12b20/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
index 8a8dc3d..527e2b4 100644
--- a/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
+++ b/flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowSerializer.scala
@@ -17,10 +17,13 @@
  */
 package org.apache.flink.api.table.typeinfo
 
-import org.apache.flink.api.table.Row
+import java.util
+
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.core.memory.{DataOutputView, DataInputView}
-;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer
+import org.apache.flink.api.table.Row
+import org.apache.flink.core.memory.{DataInputView, DataOutputView}
+
 
 /**
  * Serializer for [[Row]].
@@ -28,6 +31,8 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView}
 class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
   extends TypeSerializer[Row] {
 
+  private def getFieldSerializers = fieldSerializers
+
   override def isImmutableType: Boolean = false
 
   override def getLength: Int = -1
@@ -74,11 +79,17 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
 
   override def serialize(value: Row, target: DataOutputView) {
     val len = fieldSerializers.length
-    var i = 0
-    while (i < len) {
-      val serializer = fieldSerializers(i)
-      serializer.serialize(value.productElement(i), target)
-      i += 1
+    var index = 0
+    while (index < len) {
+      val o: AnyRef = value.productElement(index).asInstanceOf[AnyRef]
+      if (o == null) {
+        target.writeBoolean(true)
+      } else {
+        target.writeBoolean(false)
+        val serializer = fieldSerializers(index)
+        serializer.serialize(value.productElement(index), target)
+      }
+      index += 1
     }
   }
 
@@ -89,11 +100,17 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
       throw new RuntimeException("Row arity of reuse and fields do not match.")
     }
 
-    var i = 0
-    while (i < len) {
-      val field = reuse.productElement(i).asInstanceOf[AnyRef]
-      reuse.setField(i, fieldSerializers(i).deserialize(field, source))
-      i += 1
+    var index = 0
+    while (index < len) {
+      val isNull: Boolean = source.readBoolean
+      if (isNull) {
+        reuse.setField(index, null)
+      } else {
+        val field = reuse.productElement(index).asInstanceOf[AnyRef]
+        val serializer: TypeSerializer[Any] = fieldSerializers(index)
+        reuse.setField(index, serializer.deserialize(field, source))
+      }
+      index += 1
     }
     reuse
   }
@@ -102,10 +119,17 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
     val len = fieldSerializers.length
 
     val result = new Row(len)
-    var i = 0
-    while (i < len) {
-      result.setField(i, fieldSerializers(i).deserialize(source))
-      i += 1
+
+    var index = 0
+    while (index < len) {
+      val isNull: Boolean = source.readBoolean()
+      if (isNull) {
+        result.setField(index, null)
+      } else {
+        val serializer: TypeSerializer[Any] = fieldSerializers(index)
+        result.setField(index, serializer.deserialize(source))
+      }
+      index += 1
     }
     result
   }
@@ -114,8 +138,21 @@ class RowSerializer(fieldSerializers: Array[TypeSerializer[Any]])
     val len = fieldSerializers.length
     var i = 0
     while (i < len) {
-      fieldSerializers(i).copy(source, target)
+      val isNull = source.readBoolean()
+      target.writeBoolean(isNull)
+      if (!isNull) {
+        fieldSerializers(i).copy(source, target)
+      }
       i += 1
     }
   }
+
+  override def equals(any: scala.Any): Boolean = {
+    any match {
+      case otherRS: RowSerializer =>
+        val otherFieldSerializers = otherRS.getFieldSerializers.asInstanceOf[Array[AnyRef]]
+        util.Arrays.deepEquals(fieldSerializers.asInstanceOf[Array[AnyRef]], otherFieldSerializers)
+      case _ => false
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f8e12b20/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
new file mode 100644
index 0000000..cff276a
--- /dev/null
+++ b/flink-staging/flink-table/src/test/scala/org/apache/flink/api/table/typeinfo/RowSerializerTest.scala
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.typeinfo
+
+import org.apache.flink.api.common.ExecutionConfig
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeutils.{SerializerTestInstance, TypeSerializer}
+import org.apache.flink.api.table.Row
+import org.junit.Assert._
+import org.junit.Test
+
+class RowSerializerTest {
+
+  class RowSerializerTestInstance(serializer: TypeSerializer[Row],
+                                  testData: Array[Row])
+    extends SerializerTestInstance(serializer, classOf[Row], -1, testData: _*) {
+
+    override protected def deepEquals(message: String, should: Row, is: Row): Unit = {
+      val arity = should.productArity
+      assertEquals(message, arity, is.productArity)
+      var index = 0
+      while (index < arity) {
+        val copiedValue: Any = should.productElement(index)
+        val element: Any = is.productElement(index)
+        assertEquals(message, element, copiedValue)
+        index += 1
+      }
+    }
+  }
+
+  @Test
+  def testRowSerializer(): Unit ={
+
+    val rowInfo: TypeInformation[Row] = new RowTypeInfo(
+      Seq(BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO), Seq("id", "name"))
+
+    val row1 = new Row(2)
+    row1.setField(0, 1)
+    row1.setField(1, "a")
+
+    val row2 = new Row(2)
+    row2.setField(0, 2)
+    row2.setField(1, null)
+
+    val testData: Array[Row] = Array(row1, row2)
+
+    val rowSerializer: TypeSerializer[Row] = rowInfo.createSerializer(new ExecutionConfig)
+
+    val testInstance = new RowSerializerTestInstance(rowSerializer,testData)
+
+    testInstance.testAll()
+  }
+
+}


Mime
View raw message