flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [flink] 03/03: [FLINK-18029][kafka] Add more ITCases for Kafka with new formats (avro, csv, json)
Date Mon, 08 Jun 2020 02:14:56 GMT
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e9a49b7da4d7edcad8ca9c991da3fe1ab3547b9e
Author: Jark Wu <jark@apache.org>
AuthorDate: Thu Jun 4 20:02:21 2020 +0800

    [FLINK-18029][kafka] Add more ITCases for Kafka with new formats (avro, csv, json)
    
    This closes #12471
---
 .../flink-connector-kafka-0.10/pom.xml             | 13 ++++
 .../flink-connector-kafka-0.11/pom.xml             | 13 ++++
 .../connectors/kafka/table/KafkaTableTestBase.java | 81 ++++++++++++++++------
 flink-connectors/flink-connector-kafka/pom.xml     | 13 ++++
 4 files changed, 100 insertions(+), 20 deletions(-)

diff --git a/flink-connectors/flink-connector-kafka-0.10/pom.xml b/flink-connectors/flink-connector-kafka-0.10/pom.xml
index 71c17ff..7e5d375 100644
--- a/flink-connectors/flink-connector-kafka-0.10/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.10/pom.xml
@@ -204,12 +204,25 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<!-- Kafka SQL IT test with formats -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-json</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-csv</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 
 		<dependency>
 			<!-- Required for org.apache.flink.streaming.connectors.kafka.Kafka010SecuredRunITCase
-->
diff --git a/flink-connectors/flink-connector-kafka-0.11/pom.xml b/flink-connectors/flink-connector-kafka-0.11/pom.xml
index 534d2d8..b33a86f 100644
--- a/flink-connectors/flink-connector-kafka-0.11/pom.xml
+++ b/flink-connectors/flink-connector-kafka-0.11/pom.xml
@@ -204,12 +204,25 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<!-- Kafka SQL IT test with formats -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-json</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-csv</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 
 	</dependencies>
 
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
index c1dc7ce..49d0269 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestBase.java
@@ -46,17 +46,26 @@ import static org.junit.Assert.assertEquals;
 @RunWith(Parameterized.class)
 public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink {
 
+	private static final String JSON_FORMAT = "json";
+	private static final String AVRO_FORMAT = "avro";
+	private static final String CSV_FORMAT = "csv";
+
 	@Parameterized.Parameter
 	public boolean isLegacyConnector;
 
 	@Parameterized.Parameter(1)
-	public int topicID;
+	public String format;
 
-	@Parameterized.Parameters(name = "legacy = {0}, topicId = {1}")
+	@Parameterized.Parameters(name = "legacy = {0}, format = {1}")
 	public static Object[] parameters() {
 		return new Object[][]{
-			new Object[]{true, 0},
-			new Object[]{false, 1}
+			// cover all 3 formats for new and old connector
+			new Object[]{false, JSON_FORMAT},
+			new Object[]{false, AVRO_FORMAT},
+			new Object[]{false, CSV_FORMAT},
+			new Object[]{true, JSON_FORMAT},
+			new Object[]{true, AVRO_FORMAT},
+			new Object[]{true, CSV_FORMAT}
 		};
 	}
 
@@ -87,7 +96,9 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink
{
 
 	@Test
 	public void testKafkaSourceSink() throws Exception {
-		final String topic = "tstopic" + topicID;
+		// we always use a different topic name for each parameterized topic,
+		// in order to make sure the topic can be created.
+		final String topic = "tstopic_" + format + "_" + isLegacyConnector;
 		createTestTopic(topic, 1, 1);
 
 		// ---------- Produce an event time stream into Kafka -------------------
@@ -101,6 +112,8 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink
{
 					"  `computed-price` as price + 1.0,\n" +
 					"  price decimal(38, 18),\n" +
 					"  currency string,\n" +
+					"  log_date date,\n" +
+					"  log_time time(3),\n" +
 					"  log_ts timestamp(3),\n" +
 					"  ts as log_ts + INTERVAL '1' SECOND,\n" +
 					"  watermark for ts as ts\n" +
@@ -110,18 +123,21 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink
{
 					"  'properties.bootstrap.servers' = '%s',\n" +
 					"  'properties.group.id' = '%s',\n" +
 					"  'scan.startup.mode' = 'earliest-offset',\n" +
-					"  'format' = 'json'\n" +
+					"  %s\n" +
 					")",
 				factoryIdentifier(),
 				topic,
 				bootstraps,
-				groupId);
+				groupId,
+				formatOptions());
 		} else {
 			createTable = String.format(
 				"create table kafka (\n" +
 					"  `computed-price` as price + 1.0,\n" +
 					"  price decimal(38, 18),\n" +
 					"  currency string,\n" +
+					"  log_date date,\n" +
+					"  log_time time(3),\n" +
 					"  log_ts timestamp(3),\n" +
 					"  ts as log_ts + INTERVAL '1' SECOND,\n" +
 					"  watermark for ts as ts\n" +
@@ -132,32 +148,36 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink
{
 					"  'connector.properties.bootstrap.servers' = '%s',\n" +
 					"  'connector.properties.group.id' = '%s',\n" +
 					"  'connector.startup-mode' = 'earliest-offset',\n" +
-					"  'format.type' = 'json',\n" +
-					"  'update-mode' = 'append'\n" +
+					"  'update-mode' = 'append',\n" +
+					"  %s\n" +
 					")",
 				kafkaVersion(),
 				topic,
 				bootstraps,
-				groupId);
+				groupId,
+				formatOptions());
 		}
 
 		tEnv.executeSql(createTable);
 
 		String initialValues = "INSERT INTO kafka\n" +
-			"SELECT CAST(price AS DECIMAL(10, 2)), currency, CAST(ts AS TIMESTAMP(3))\n" +
-			"FROM (VALUES (2.02,'Euro','2019-12-12 00:00:00.001001'), \n" +
-			"  (1.11,'US Dollar','2019-12-12 00:00:01.002001'), \n" +
-			"  (50,'Yen','2019-12-12 00:00:03.004001'), \n" +
-			"  (3.1,'Euro','2019-12-12 00:00:04.005001'), \n" +
-			"  (5.33,'US Dollar','2019-12-12 00:00:05.006001'), \n" +
-			"  (0,'DUMMY','2019-12-12 00:00:10'))\n" +
-			"  AS orders (price, currency, ts)";
+			"SELECT CAST(price AS DECIMAL(10, 2)), currency, " +
+			" CAST(d AS DATE), CAST(t AS TIME(0)), CAST(ts AS TIMESTAMP(3))\n" +
+			"FROM (VALUES (2.02,'Euro','2019-12-12', '00:00:01', '2019-12-12 00:00:01.001001'), \n"
+
+			"  (1.11,'US Dollar','2019-12-12', '00:00:02', '2019-12-12 00:00:02.002001'), \n" +
+			"  (50,'Yen','2019-12-12', '00:00:03', '2019-12-12 00:00:03.004001'), \n" +
+			"  (3.1,'Euro','2019-12-12', '00:00:04', '2019-12-12 00:00:04.005001'), \n" +
+			"  (5.33,'US Dollar','2019-12-12', '00:00:05', '2019-12-12 00:00:05.006001'), \n" +
+			"  (0,'DUMMY','2019-12-12', '00:00:10', '2019-12-12 00:00:10'))\n" +
+			"  AS orders (price, currency, d, t, ts)";
 		TableEnvUtil.execInsertSqlAndWaitResult(tEnv, initialValues);
 
 		// ---------- Consume stream from Kafka -------------------
 
 		String query = "SELECT\n" +
 			"  CAST(TUMBLE_END(ts, INTERVAL '5' SECOND) AS VARCHAR),\n" +
+			"  CAST(MAX(log_date) AS VARCHAR),\n" +
+			"  CAST(MAX(log_time) AS VARCHAR),\n" +
 			"  CAST(MAX(ts) AS VARCHAR),\n" +
 			"  COUNT(*),\n" +
 			"  CAST(MAX(price) AS DECIMAL(10, 2))\n" +
@@ -180,8 +200,8 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink
{
 		}
 
 		List<String> expected = Arrays.asList(
-			"+I(2019-12-12 00:00:05.000,2019-12-12 00:00:04.004,3,50.00)",
-			"+I(2019-12-12 00:00:10.000,2019-12-12 00:00:06.006,2,5.33)");
+			"+I(2019-12-12 00:00:05.000,2019-12-12,00:00:03,2019-12-12 00:00:04.004,3,50.00)",
+			"+I(2019-12-12 00:00:10.000,2019-12-12,00:00:05,2019-12-12 00:00:06.006,2,5.33)");
 
 		assertEquals(expected, TestingSinkFunction.rows);
 
@@ -190,6 +210,27 @@ public abstract class KafkaTableTestBase extends KafkaTestBaseWithFlink
{
 		deleteTestTopic(topic);
 	}
 
+	private String formatOptions() {
+		if (!isLegacyConnector) {
+			return String.format("'format' = '%s'", format);
+		} else {
+			String formatType = String.format("'format.type' = '%s'", format);
+			if (format.equals(AVRO_FORMAT)) {
+				// legacy connector requires to specify avro-schema
+				String avroSchema = "{\"type\":\"record\",\"name\":\"row_0\",\"fields\":" +
+					"[{\"name\":\"price\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\"," +
+					"\"precision\":38,\"scale\":18}},{\"name\":\"currency\",\"type\":[\"string\"," +
+					"\"null\"]},{\"name\":\"log_date\",\"type\":{\"type\":\"int\",\"logicalType\":" +
+					"\"date\"}},{\"name\":\"log_time\",\"type\":{\"type\":\"int\",\"logicalType\":" +
+					"\"time-millis\"}},{\"name\":\"log_ts\",\"type\":{\"type\":\"long\"," +
+					"\"logicalType\":\"timestamp-millis\"}}]}";
+				return formatType + String.format(", 'format.avro-schema' = '%s'", avroSchema);
+			} else {
+				return formatType;
+			}
+		}
+	}
+
 	private static final class TestingSinkFunction implements SinkFunction<RowData> {
 
 		private static final long serialVersionUID = 455430015321124493L;
diff --git a/flink-connectors/flink-connector-kafka/pom.xml b/flink-connectors/flink-connector-kafka/pom.xml
index 1d97d2d..39f5444 100644
--- a/flink-connectors/flink-connector-kafka/pom.xml
+++ b/flink-connectors/flink-connector-kafka/pom.xml
@@ -196,12 +196,25 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<!-- Kafka SQL IT test with formats -->
 		<dependency>
 			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-json</artifactId>
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-avro</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-csv</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 
 	</dependencies>
 


Mime
View raw message