flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject flink git commit: [FLINK-8854] [table] Fix schema mapping with time attributes
Date Fri, 09 Mar 2018 12:56:42 GMT
Repository: flink
Updated Branches:
  refs/heads/master 69b329962 -> c53148628


[FLINK-8854] [table] Fix schema mapping with time attributes

This closes #5662.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c5314862
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c5314862
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c5314862

Branch: refs/heads/master
Commit: c531486288caf5241cdf7f0f00f087f3ce82239f
Parents: 69b3299
Author: Timo Walther <twalthr@apache.org>
Authored: Thu Mar 8 11:51:38 2018 +0100
Committer: Timo Walther <twalthr@apache.org>
Committed: Fri Mar 9 13:54:15 2018 +0100

----------------------------------------------------------------------
 .../KafkaJsonTableSourceFactoryTestBase.java    | 20 +++++--
 .../table/descriptors/SchemaValidator.scala     | 19 ++++++-
 .../table/sources/definedTimeAttributes.scala   | 19 ++++++-
 .../sources/tsextractors/ExistingField.scala    | 10 +++-
 .../BoundedOutOfOrderTimestamps.scala           | 10 ++++
 .../table/descriptors/SchemaValidatorTest.scala | 58 +++++++++++++++++++-
 6 files changed, 120 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c5314862/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
index 2b081a9..583b71d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactoryTestBase.java
@@ -25,10 +25,13 @@ import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.descriptors.FormatDescriptor;
 import org.apache.flink.table.descriptors.Json;
 import org.apache.flink.table.descriptors.Kafka;
+import org.apache.flink.table.descriptors.Rowtime;
 import org.apache.flink.table.descriptors.Schema;
 import org.apache.flink.table.descriptors.TestTableSourceDescriptor;
 import org.apache.flink.table.sources.TableSource;
 import org.apache.flink.table.sources.TableSourceFactoryService;
+import org.apache.flink.table.sources.tsextractors.ExistingField;
+import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks;
 
 import org.junit.Test;
 
@@ -55,9 +58,11 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
 		"      'type': 'integer'" +
 		"    }," +
 		"    'time': {" +
-		"      'description': 'Age in years'," +
-		"      'type': 'number'" +
-		"    }" + "  }," +
+		"      'description': 'row time'," +
+		"      'type': 'string'," +
+		"      'format': 'date-time'" +
+		"    }" +
+		"  }," +
 		"  'required': ['name', 'count', 'time']" +
 		"}";
 
@@ -89,9 +94,10 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
 		// construct table source using a builder
 
 		final Map<String, String> tableJsonMapping = new HashMap<>();
+		tableJsonMapping.put("name", "name");
 		tableJsonMapping.put("fruit-name", "name");
 		tableJsonMapping.put("count", "count");
-		tableJsonMapping.put("event-time", "time");
+		tableJsonMapping.put("time", "time");
 
 		final Properties props = new Properties();
 		props.put("group.id", "test-group");
@@ -112,10 +118,11 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
 					TableSchema.builder()
 						.field("fruit-name", Types.STRING)
 						.field("count", Types.BIG_INT)
-						.field("event-time", Types.BIG_DEC)
+						.field("event-time", Types.SQL_TIMESTAMP)
 						.field("proc-time", Types.SQL_TIMESTAMP)
 						.build())
 				.withProctimeAttribute("proc-time")
+				.withRowtimeAttribute("event-time", new ExistingField("time"), PreserveWatermarks.INSTANCE())
 				.build();
 
 		// construct table source using descriptors and table source factory
@@ -135,7 +142,8 @@ public abstract class KafkaJsonTableSourceFactoryTestBase {
 				new Schema()
 						.field("fruit-name", Types.STRING).from("name")
 						.field("count", Types.BIG_INT) // no from so it must match with the input
-						.field("event-time", Types.BIG_DEC).from("time")
+						.field("event-time", Types.SQL_TIMESTAMP).rowtime(
+							new Rowtime().timestampsFromField("time").watermarksFromSource())
 						.field("proc-time", Types.SQL_TIMESTAMP).proctime());
 
 		final TableSource<?> factorySource = TableSourceFactoryService.findAndCreateTableSource(testDesc);

http://git-wip-us.apache.org/repos/asf/flink/blob/c5314862/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
index 0a23911..9cb3258 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/SchemaValidator.scala
@@ -23,7 +23,7 @@ import java.util.Optional
 
 import org.apache.flink.table.api.{TableSchema, ValidationException}
 import org.apache.flink.table.descriptors.DescriptorProperties.{toJava, toScala}
-import org.apache.flink.table.descriptors.RowtimeValidator.{ROWTIME, ROWTIME_TIMESTAMPS_TYPE}
+import org.apache.flink.table.descriptors.RowtimeValidator.{ROWTIME, ROWTIME_TIMESTAMPS_FROM,
ROWTIME_TIMESTAMPS_TYPE, ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD}
 import org.apache.flink.table.descriptors.SchemaValidator._
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor
 
@@ -148,6 +148,13 @@ object SchemaValidator {
 
     val schema = properties.getTableSchema(SCHEMA)
 
+    // add all source fields first because rowtime might reference one of them
+    toScala(sourceSchema).map(_.getColumnNames).foreach { names =>
+      names.foreach { name =>
+        mapping.put(name, name)
+      }
+    }
+
     // add all schema fields first for implicit mappings
     schema.getColumnNames.foreach { name =>
       mapping.put(name, name)
@@ -198,14 +205,20 @@ object SchemaValidator {
       val isProctime = properties
         .getOptionalBoolean(s"$SCHEMA.$i.$SCHEMA_PROCTIME")
         .orElse(false)
-      val isRowtime = properties
-        .containsKey(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE")
+      val tsType = s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_TYPE"
+      val isRowtime = properties.containsKey(tsType)
       if (!isProctime && !isRowtime) {
         // check for a aliasing
         val fieldName = properties.getOptionalString(s"$SCHEMA.$i.$SCHEMA_FROM")
           .orElse(n)
         builder.field(fieldName, t)
       }
+      // only use the rowtime attribute if it references a field
+      else if (isRowtime &&
+          properties.getString(tsType) == ROWTIME_TIMESTAMPS_TYPE_VALUE_FROM_FIELD) {
+        val field = properties.getString(s"$SCHEMA.$i.$ROWTIME_TIMESTAMPS_FROM")
+        builder.field(field, t)
+      }
     }
 
     builder.build()

http://git-wip-us.apache.org/repos/asf/flink/blob/c5314862/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
index f09baa3..73b76a5 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/definedTimeAttributes.scala
@@ -19,6 +19,7 @@
 package org.apache.flink.table.sources
 
 import java.util
+import java.util.Objects
 
 import org.apache.flink.table.api.TableSchema
 import org.apache.flink.table.api.Types
@@ -65,9 +66,9 @@ trait DefinedRowtimeAttributes {
   * @param watermarkStrategy The watermark strategy associated with the attribute.
   */
 class RowtimeAttributeDescriptor(
-  attributeName: String,
-  timestampExtractor: TimestampExtractor,
-  watermarkStrategy: WatermarkStrategy) {
+  val attributeName: String,
+  val timestampExtractor: TimestampExtractor,
+  val watermarkStrategy: WatermarkStrategy) {
 
   /** Returns the name of the rowtime attribute. */
   def getAttributeName: String = attributeName
@@ -77,4 +78,16 @@ class RowtimeAttributeDescriptor(
 
   /** Returns the [[WatermarkStrategy]] for the attribute. */
   def getWatermarkStrategy: WatermarkStrategy = watermarkStrategy
+
+  override def equals(other: Any): Boolean = other match {
+    case that: RowtimeAttributeDescriptor =>
+        Objects.equals(attributeName, that.attributeName) &&
+        Objects.equals(timestampExtractor, that.timestampExtractor) &&
+        Objects.equals(watermarkStrategy, that.watermarkStrategy)
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Objects.hash(attributeName, timestampExtractor, watermarkStrategy)
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c5314862/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
index 12cd564..866029b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
@@ -27,7 +27,7 @@ import org.apache.flink.table.expressions.{Cast, Expression, ResolvedFieldRefere
   *
   * @param field The field to convert into a rowtime attribute.
   */
-class ExistingField(field: String) extends TimestampExtractor {
+class ExistingField(val field: String) extends TimestampExtractor {
 
   override def getArgumentFields: Array[String] = Array(field)
 
@@ -65,4 +65,12 @@ class ExistingField(field: String) extends TimestampExtractor {
     }
   }
 
+  override def equals(other: Any): Boolean = other match {
+    case that: ExistingField => field == that.field
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    field.hashCode
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c5314862/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
index 8f7c235..4718bad 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/wmstrategies/BoundedOutOfOrderTimestamps.scala
@@ -38,4 +38,14 @@ final class BoundedOutOfOrderTimestamps(val delay: Long) extends PeriodicWaterma
   }
 
   override def getWatermark: Watermark = new Watermark(maxTimestamp - delay)
+
+  override def equals(other: Any): Boolean = other match {
+    case that: BoundedOutOfOrderTimestamps =>
+      delay == that.delay
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    delay.hashCode()
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c5314862/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
index ba05dff..bf7b84b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/SchemaValidatorTest.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.descriptors
 import java.util.Optional
 
 import org.apache.flink.table.api.{TableSchema, Types}
-import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp
+import org.apache.flink.table.sources.tsextractors.{ExistingField, StreamRecordTimestamp}
 import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks
 import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.Test
@@ -34,7 +34,7 @@ import scala.collection.JavaConverters._
 class SchemaValidatorTest {
 
   @Test
-  def testSchema(): Unit = {
+  def testSchemaWithRowtimeFromSource(): Unit = {
      val desc1 = Schema()
       .field("otherField", Types.STRING).from("csvField")
       .field("abcField", Types.STRING)
@@ -60,7 +60,11 @@ class SchemaValidatorTest {
     assertTrue(rowtime.getWatermarkStrategy.isInstanceOf[PreserveWatermarks])
 
     // test field mapping
-    val expectedMapping = Map("otherField" -> "csvField", "abcField" -> "abcField").asJava
+    val expectedMapping = Map(
+      "otherField" -> "csvField",
+      "csvField" -> "csvField",
+      "abcField" -> "abcField",
+      "myField" -> "myField").asJava
     assertEquals(
       expectedMapping,
       SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema)))
@@ -73,4 +77,52 @@ class SchemaValidatorTest {
       .build()
     assertEquals(expectedFormatSchema, formatSchema)
   }
+
+  @Test
+  def testSchemaWithRowtimeFromField(): Unit = {
+     val desc1 = Schema()
+      .field("otherField", Types.STRING).from("csvField")
+      .field("abcField", Types.STRING)
+      .field("p", Types.SQL_TIMESTAMP).proctime()
+      .field("r", Types.SQL_TIMESTAMP).rowtime(
+        Rowtime().timestampsFromField("myTime").watermarksFromSource())
+    val props = new DescriptorProperties()
+    desc1.addProperties(props)
+
+    val inputSchema = TableSchema.builder()
+      .field("csvField", Types.STRING)
+      .field("abcField", Types.STRING)
+      .field("myField", Types.BOOLEAN)
+      .field("myTime", Types.SQL_TIMESTAMP)
+      .build()
+
+    // test proctime
+    assertEquals(Optional.of("p"), SchemaValidator.deriveProctimeAttribute(props))
+
+    // test rowtime
+    val rowtime = SchemaValidator.deriveRowtimeAttributes(props).get(0)
+    assertEquals("r", rowtime.getAttributeName)
+    assertTrue(rowtime.getTimestampExtractor.isInstanceOf[ExistingField])
+    assertTrue(rowtime.getWatermarkStrategy.isInstanceOf[PreserveWatermarks])
+
+    // test field mapping
+    val expectedMapping = Map(
+      "otherField" -> "csvField",
+      "csvField" -> "csvField",
+      "abcField" -> "abcField",
+      "myField" -> "myField",
+      "myTime" -> "myTime").asJava
+    assertEquals(
+      expectedMapping,
+      SchemaValidator.deriveFieldMapping(props, Optional.of(inputSchema)))
+
+    // test field format
+    val formatSchema = SchemaValidator.deriveFormatFields(props)
+    val expectedFormatSchema = TableSchema.builder()
+      .field("csvField", Types.STRING) // aliased
+      .field("abcField", Types.STRING)
+      .field("myTime", Types.SQL_TIMESTAMP)
+      .build()
+    assertEquals(expectedFormatSchema, formatSchema)
+  }
 }


Mime
View raw message