flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [5/5] flink git commit: [FLINK-8538][table]Add a Kafka table source factory with JSON format support
Date Tue, 27 Feb 2018 19:25:20 GMT
[FLINK-8538][table]Add a Kafka table source factory with JSON format support


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1d26062d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1d26062d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1d26062d

Branch: refs/heads/release-1.5
Commit: 1d26062de130c05fdbe7701b55766b4a8d433418
Parents: a269f85
Author: Xingcan Cui <xingcanc@gmail.com>
Authored: Mon Feb 12 18:11:36 2018 +0800
Committer: Timo Walther <twalthr@apache.org>
Committed: Tue Feb 27 20:23:00 2018 +0100

----------------------------------------------------------------------
 .../kafka/Kafka010JsonTableSourceFactory.java   |  36 +++
 ...pache.flink.table.sources.TableSourceFactory |  16 ++
 .../resources/tableSourceConverter.properties   |  29 +++
 .../kafka/Kafka010TableSourceFactoryTest.java   |  41 ++++
 .../kafka/Kafka011JsonTableSourceFactory.java   |  36 +++
 ...pache.flink.table.sources.TableSourceFactory |  16 ++
 .../resources/tableSourceConverter.properties   |  29 +++
 .../kafka/Kafka011TableSourceFactoryTest.java   |  41 ++++
 .../kafka/Kafka08JsonTableSourceFactory.java    |  36 +++
 ...pache.flink.table.sources.TableSourceFactory |  16 ++
 .../resources/tableSourceConverter.properties   |  29 +++
 .../kafka/Kafka08TableSourceFactoryTest.java    |  42 ++++
 .../kafka/Kafka09JsonTableSourceFactory.java    |  36 +++
 ...pache.flink.table.sources.TableSourceFactory |  16 ++
 .../resources/tableSourceConverter.properties   |  29 +++
 .../kafka/Kafka09TableSourceFactoryTest.java    |  41 ++++
 .../flink-connector-kafka-base/pom.xml          |  14 ++
 .../connectors/kafka/KafkaJsonTableSource.java  |  17 ++
 .../kafka/KafkaJsonTableSourceFactory.java      | 227 +++++++++++++++++++
 .../connectors/kafka/KafkaTableSource.java      |  30 +++
 .../apache/flink/table/descriptors/Kafka.java   | 199 ++++++++++++++++
 .../flink/table/descriptors/KafkaValidator.java | 193 ++++++++++++++++
 .../KafkaJsonTableFromDescriptorTestBase.java   | 127 +++++++++++
 .../src/test/resources/kafka-json-schema.json   |  35 +++
 .../apache/flink/table/api/TableSchema.scala    |   5 +
 25 files changed, 1336 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
new file mode 100644
index 0000000..1d03f6c
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_010;
+
+/**
+ * Factory for creating configured instances of {@link Kafka010JsonTableSource}.
+ */
+public class Kafka010JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
+	@Override
+	protected KafkaJsonTableSource.Builder createBuilder() {
+		return new Kafka010JsonTableSource.Builder();
+	}
+
+	@Override
+	protected String kafkaVersion() {
+		return KAFKA_VERSION_VALUE_010;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..9ef54fc
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka010JsonTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties
new file mode 100644
index 0000000..5409b49
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/resources/tableSourceConverter.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+################################################################################
+# The config file is used to specify the packages of current module where
+# to find TableSourceConverter implementation class annotated with TableType.
+# If there are multiple packages to scan, put those packages together into a
+# string separated with ',', for example, org.package1,org.package2.
+# Please notice:
+# It's better to have a tableSourceConverter.properties in each connector Module
+# which offers converters instead of put all information into the
+# tableSourceConverter.properties of flink-table module.
+################################################################################
+scan.packages=org.apache.flink.streaming.connectors.kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
new file mode 100644
index 0000000..15b89e8
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.table.descriptors.Kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_010;
+
+/**
+ * Tests for {@link Kafka010JsonTableSourceFactory}.
+ */
+public class Kafka010TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase {
+	protected String versionForTest() {
+		return KAFKA_VERSION_VALUE_010;
+	}
+
+	protected KafkaJsonTableSource.Builder builderForTest() {
+		return Kafka010JsonTableSource.builder();
+	}
+
+	@Override
+	protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) {
+		// no extra settings
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
new file mode 100644
index 0000000..ca4d6ce
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_011;
+
+/**
+ * Factory for creating configured instances of {@link Kafka011JsonTableSource}.
+ */
+public class Kafka011JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
+	@Override
+	protected KafkaJsonTableSource.Builder createBuilder() {
+		return new Kafka011JsonTableSource.Builder();
+	}
+
+	@Override
+	protected String kafkaVersion() {
+		return KAFKA_VERSION_VALUE_011;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..75135e5
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka011JsonTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties
new file mode 100644
index 0000000..5409b49
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/resources/tableSourceConverter.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+################################################################################
+# The config file is used to specify the packages of current module where
+# to find TableSourceConverter implementation class annotated with TableType.
+# If there are multiple packages to scan, put those packages together into a
+# string separated with ',', for example, org.package1,org.package2.
+# Please notice:
+# It's better to have a tableSourceConverter.properties in each connector Module
+# which offers converters instead of put all information into the
+# tableSourceConverter.properties of flink-table module.
+################################################################################
+scan.packages=org.apache.flink.streaming.connectors.kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
new file mode 100644
index 0000000..84ac39b
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.table.descriptors.Kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_011;
+
+/**
+ * Tests for {@link Kafka011JsonTableSourceFactory}.
+ */
+public class Kafka011TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase {
+	protected String versionForTest() {
+		return KAFKA_VERSION_VALUE_011;
+	}
+
+	protected KafkaJsonTableSource.Builder builderForTest() {
+		return Kafka011JsonTableSource.builder();
+	}
+
+	@Override
+	protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) {
+		// no extra settings
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
new file mode 100644
index 0000000..e4e5096
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_08;
+
+/**
+ * Factory for creating configured instances of {@link Kafka08JsonTableSource}.
+ */
+public class Kafka08JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
+	@Override
+	protected KafkaJsonTableSource.Builder createBuilder() {
+		return new Kafka08JsonTableSource.Builder();
+	}
+
+	@Override
+	protected String kafkaVersion() {
+		return KAFKA_VERSION_VALUE_08;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..9092955
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka08JsonTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties
new file mode 100644
index 0000000..5409b49
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/resources/tableSourceConverter.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+################################################################################
+# The config file is used to specify the packages of current module where
+# to find TableSourceConverter implementation class annotated with TableType.
+# If there are multiple packages to scan, put those packages together into a
+# string separated with ',', for example, org.package1,org.package2.
+# Please notice:
+# It's better to have a tableSourceConverter.properties in each connector Module
+# which offers converters instead of put all information into the
+# tableSourceConverter.properties of flink-table module.
+################################################################################
+scan.packages=org.apache.flink.streaming.connectors.kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
new file mode 100644
index 0000000..a2edc09
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceFactoryTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.table.descriptors.Kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_08;
+
+/**
+ * Tests for {@link Kafka08JsonTableSourceFactory}.
+ */
+public class Kafka08TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase {
+	protected String versionForTest() {
+		return KAFKA_VERSION_VALUE_08;
+	}
+
+	protected KafkaJsonTableSource.Builder builderForTest() {
+		return Kafka08JsonTableSource.builder();
+	}
+
+	@Override
+	protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) {
+		builder.getKafkaProps().put("zookeeper.connect", "localhost:1111");
+		kafka.zookeeperConnect("localhost:1111");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
new file mode 100644
index 0000000..bbda4ae
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_09;
+
+/**
+ * Factory for creating configured instances of {@link Kafka09JsonTableSource}.
+ */
+public class Kafka09JsonTableSourceFactory extends KafkaJsonTableSourceFactory {
+	@Override
+	protected KafkaJsonTableSource.Builder createBuilder() {
+		return new Kafka09JsonTableSource.Builder();
+	}
+
+	@Override
+	protected String kafkaVersion() {
+		return KAFKA_VERSION_VALUE_09;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
new file mode 100644
index 0000000..2f38bd0
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/META-INF/services/org.apache.flink.table.sources.TableSourceFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.streaming.connectors.kafka.Kafka09JsonTableSourceFactory

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties
new file mode 100644
index 0000000..5409b49
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/resources/tableSourceConverter.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+################################################################################
+# The config file is used to specify the packages of current module where
+# to find TableSourceConverter implementation class annotated with TableType.
+# If there are multiple packages to scan, put those packages together into a
+# string separated with ',', for example, org.package1,org.package2.
+# Please notice:
+# It's better to have a tableSourceConverter.properties in each connector Module
+# which offers converters instead of put all information into the
+# tableSourceConverter.properties of flink-table module.
+################################################################################
+scan.packages=org.apache.flink.streaming.connectors.kafka

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
new file mode 100644
index 0000000..fc85ea7
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceFactoryTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.table.descriptors.Kafka;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION_VALUE_09;
+
+/**
+ * Factory for creating configured instances of {@link Kafka09JsonTableSource}.
+ */
+public class Kafka09TableSourceFactoryTest extends KafkaJsonTableFromDescriptorTestBase {
+	protected String versionForTest() {
+		return KAFKA_VERSION_VALUE_09;
+	}
+
+	protected KafkaJsonTableSource.Builder builderForTest() {
+		return Kafka09JsonTableSource.builder();
+	}
+
+	@Override
+	protected void extraSettings(KafkaTableSource.Builder builder, Kafka kafka) {
+		// no extra settings
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/pom.xml b/flink-connectors/flink-connector-kafka-base/pom.xml
index e7412cf..2ccaa2e 100644
--- a/flink-connectors/flink-connector-kafka-base/pom.xml
+++ b/flink-connectors/flink-connector-kafka-base/pom.xml
@@ -205,6 +205,20 @@ under the License.
 			<scope>test</scope>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-scala_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
 	</dependencies>
 
 	<dependencyManagement>

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index f581e89..d2dafe7 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -27,6 +27,7 @@ import org.apache.flink.table.sources.DefinedFieldMapping;
 import org.apache.flink.table.sources.StreamTableSource;
 
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 
 /**
@@ -86,6 +87,22 @@ public abstract class KafkaJsonTableSource extends KafkaTableSource implements D
 		return "KafkaJSONTableSource";
 	}
 
+	@Override
+	public boolean equals(Object other) {
+		if (super.equals(other)) {
+			KafkaJsonTableSource otherSource = (KafkaJsonTableSource) other;
+			return Objects.equals(failOnMissingField, otherSource.failOnMissingField)
+					&& Objects.equals(jsonSchema, otherSource.jsonSchema)
+					&& Objects.equals(fieldMapping, otherSource.fieldMapping);
+		}
+		return false;
+	}
+
+	@Override
+	public int hashCode() {
+		return 31 * super.hashCode() + Objects.hash(failOnMissingField, jsonSchema, fieldMapping);
+	}
+
 	//////// SETTERS FOR OPTIONAL PARAMETERS
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
new file mode 100644
index 0000000..918b833
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSourceFactory.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.formats.json.JsonSchemaConverter;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.JsonValidator;
+import org.apache.flink.table.descriptors.KafkaValidator;
+import org.apache.flink.table.descriptors.SchemaValidator;
+import org.apache.flink.table.sources.TableSource;
+import org.apache.flink.table.sources.TableSourceFactory;
+import org.apache.flink.types.Row;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_TYPE;
+import static org.apache.flink.table.descriptors.ConnectorDescriptorValidator.CONNECTOR_VERSION;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_TYPE;
+import static org.apache.flink.table.descriptors.FormatDescriptorValidator.FORMAT_VERSION;
+import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_FAIL_ON_MISSING_FIELD;
+import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_SCHEMA_STRING;
+import static org.apache.flink.table.descriptors.JsonValidator.FORMAT_TYPE_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.GROUP_ID;
+import static org.apache.flink.table.descriptors.KafkaValidator.JSON_FIELD;
+import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION;
+import static org.apache.flink.table.descriptors.KafkaValidator.OFFSET;
+import static org.apache.flink.table.descriptors.KafkaValidator.PARTITION;
+import static org.apache.flink.table.descriptors.KafkaValidator.SPECIFIC_OFFSETS;
+import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE;
+import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_EARLIEST;
+import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_GROUP_OFFSETS;
+import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_LATEST;
+import static org.apache.flink.table.descriptors.KafkaValidator.STARTUP_MODE_VALUE_SPECIFIC_OFFSETS;
+import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_FIELD;
+import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_JSON_MAPPING;
+import static org.apache.flink.table.descriptors.KafkaValidator.TOPIC;
+import static org.apache.flink.table.descriptors.KafkaValidator.ZOOKEEPER_CONNECT;
+import static org.apache.flink.table.descriptors.SchemaValidator.PROCTIME;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA;
+import static org.apache.flink.table.descriptors.SchemaValidator.SCHEMA_VERSION;
+
+import scala.Option;
+import scala.collection.JavaConversions;
+
+/**
+ * Factory for creating configured instances of {@link KafkaJsonTableSource}.
+ */
+public abstract class KafkaJsonTableSourceFactory implements TableSourceFactory<Row> {
+	@Override
+	public Map<String, String> requiredContext() {
+		Map<String, String> context = new HashMap<>();
+		context.put(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE); // kafka connector
+		context.put(FORMAT_TYPE(), FORMAT_TYPE_VALUE()); // Json format
+		context.put(KAFKA_VERSION, kafkaVersion()); // for different implementations
+		context.put(CONNECTOR_VERSION(), "1");
+		context.put(FORMAT_VERSION(), "1");
+		context.put(SCHEMA_VERSION(), "1");
+		return context;
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		List<String> properties = new ArrayList<>();
+
+		// kafka
+		properties.add(KAFKA_VERSION);
+		properties.add(BOOTSTRAP_SERVERS);
+		properties.add(GROUP_ID);
+		properties.add(ZOOKEEPER_CONNECT);
+		properties.add(TOPIC);
+		properties.add(STARTUP_MODE);
+		properties.add(SPECIFIC_OFFSETS + ".#." + PARTITION);
+		properties.add(SPECIFIC_OFFSETS + ".#." + OFFSET);
+
+		// json format
+		properties.add(FORMAT_SCHEMA_STRING());
+		properties.add(FORMAT_FAIL_ON_MISSING_FIELD());
+
+		// table json mapping
+		properties.add(TABLE_JSON_MAPPING + ".#." + TABLE_FIELD);
+		properties.add(TABLE_JSON_MAPPING + ".#." + JSON_FIELD);
+
+		// schema
+		properties.add(SCHEMA() + ".#." + DescriptorProperties.TYPE());
+		properties.add(SCHEMA() + ".#." + DescriptorProperties.NAME());
+
+		// time attributes
+		properties.add(SCHEMA() + ".#." + PROCTIME());
+//		properties.add(SCHEMA() + ".#." + ROWTIME() + ".#." + TIMESTAMPS_CLASS());
+//		properties.add(SCHEMA() + ".#." + ROWTIME() + ".#." + TIMESTAMPS_TYPE());
+
+		return properties;
+	}
+
+	@Override
+	public TableSource<Row> create(Map<String, String> properties) {
+		DescriptorProperties params = new DescriptorProperties(true);
+		params.putProperties(properties);
+
+		// validate
+		new KafkaValidator().validate(params);
+		new JsonValidator().validate(params);
+		new SchemaValidator(true).validate(params);
+
+		// build
+		KafkaJsonTableSource.Builder builder = createBuilder();
+		Properties kafkaProps = new Properties();
+
+		// Set the required parameters.
+		String topic = params.getString(TOPIC).get();
+		TableSchema tableSchema = params.getTableSchema(SCHEMA()).get();
+
+		kafkaProps.put(BOOTSTRAP_SERVERS, params.getString(BOOTSTRAP_SERVERS).get());
+		kafkaProps.put(GROUP_ID, params.getString(GROUP_ID).get());
+
+		// Set the zookeeper connect for kafka 0.8.
+		Option<String> zkConnect = params.getString(ZOOKEEPER_CONNECT);
+		if (zkConnect.isDefined()) {
+			kafkaProps.put(ZOOKEEPER_CONNECT, zkConnect.get());
+		}
+
+		builder.withKafkaProperties(kafkaProps).forTopic(topic).withSchema(tableSchema);
+
+		// Set the startup mode.
+		String startupMode = params.getString(STARTUP_MODE).get();
+		if (null != startupMode) {
+			switch (startupMode) {
+				case STARTUP_MODE_VALUE_EARLIEST:
+					builder.fromEarliest();
+					break;
+				case STARTUP_MODE_VALUE_LATEST:
+					builder.fromLatest();
+					break;
+				case STARTUP_MODE_VALUE_GROUP_OFFSETS:
+					builder.fromGroupOffsets();
+					break;
+				case STARTUP_MODE_VALUE_SPECIFIC_OFFSETS:
+					Map<String, String> partitions = JavaConversions.
+							mapAsJavaMap(params.getIndexedProperty(SPECIFIC_OFFSETS, PARTITION));
+					Map<KafkaTopicPartition, Long> offsetMap = new HashMap<>();
+					for (int i = 0; i < partitions.size(); i++) {
+						offsetMap.put(
+								new KafkaTopicPartition(
+										topic,
+										Integer.valueOf(params.getString(
+												SPECIFIC_OFFSETS + "" + "." + i + "." + PARTITION).get())),
+								Long.valueOf(params.getString(
+										SPECIFIC_OFFSETS + "" + "." + i + "." + OFFSET).get()));
+					}
+					builder.fromSpecificOffsets(offsetMap);
+					break;
+			}
+		}
+
+		// Set whether fail on missing JSON field.
+		Option<String> failOnMissing = params.getString(FORMAT_FAIL_ON_MISSING_FIELD());
+		if (failOnMissing.isDefined()) {
+			builder.failOnMissingField(Boolean.valueOf(failOnMissing.get()));
+		}
+
+		// Set the JSON schema.
+		Option<String> jsonSchema = params.getString(FORMAT_SCHEMA_STRING());
+		if (jsonSchema.isDefined()) {
+			TypeInformation jsonSchemaType = JsonSchemaConverter.convert(jsonSchema.get());
+			builder.forJsonSchema(TableSchema.fromTypeInfo(jsonSchemaType));
+		}
+
+		// Set the table => JSON fields mapping.
+		Map<String, String>  mappingTableFields = JavaConversions.
+				mapAsJavaMap(params.getIndexedProperty(TABLE_JSON_MAPPING, TABLE_FIELD));
+
+		if (!mappingTableFields.isEmpty()) {
+			Map<String, String> tableJsonMapping = new HashMap<>();
+			for (int i = 0; i < mappingTableFields.size(); i++) {
+				tableJsonMapping.put(params.getString(TABLE_JSON_MAPPING + "." + i + "." + TABLE_FIELD).get(),
+						params.getString(TABLE_JSON_MAPPING + "." + i + "." + JSON_FIELD).get()
+				);
+			}
+			builder.withTableToJsonMapping(tableJsonMapping);
+		}
+
+		// Set the time attributes.
+		setTimeAttributes(tableSchema, params, builder);
+
+		return builder.build();
+	}
+
+	protected abstract KafkaJsonTableSource.Builder createBuilder();
+
+	protected abstract String kafkaVersion();
+
+	private void setTimeAttributes(TableSchema schema, DescriptorProperties params, KafkaJsonTableSource.Builder builder) {
+		// TODO to deal with rowtime fields
+		Option<String> proctimeField;
+		for (int i = 0; i < schema.getColumnNum(); i++) {
+			proctimeField = params.getString(SCHEMA() + "." + i + "." + PROCTIME());
+			if (proctimeField.isDefined()) {
+				builder.withProctimeAttribute(schema.getColumnName(i).get());
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index d5cda4a..9ce3b8e 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -42,6 +42,7 @@ import org.apache.flink.util.Preconditions;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Properties;
 
 import scala.Option;
@@ -138,6 +139,35 @@ public abstract class KafkaTableSource
 		return TableConnectorUtil.generateRuntimeName(this.getClass(), schema.getColumnNames());
 	}
 
+	@Override
+	public boolean equals(Object o) {
+		if (!o.getClass().equals(this.getClass())) {
+			return false;
+		}
+		KafkaTableSource other = (KafkaTableSource) o;
+		return Objects.equals(topic, other.topic)
+				&& Objects.equals(schema, other.schema)
+				&& Objects.equals(properties, other.properties)
+				&& Objects.equals(proctimeAttribute, other.proctimeAttribute)
+				&& Objects.equals(returnType, other.returnType)
+				&& Objects.equals(rowtimeAttributeDescriptors, other.rowtimeAttributeDescriptors)
+				&& Objects.equals(specificStartupOffsets, other.specificStartupOffsets)
+				&& Objects.equals(startupMode, other.startupMode);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(
+				topic,
+				schema,
+				properties,
+				proctimeAttribute,
+				returnType,
+				rowtimeAttributeDescriptors,
+				specificStartupOffsets,
+				startupMode);
+	}
+
 	/**
 	 * Returns a version-specific Kafka consumer with the start position configured.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
new file mode 100644
index 0000000..4733f6e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+
+import static org.apache.flink.table.descriptors.KafkaValidator.BOOTSTRAP_SERVERS;
+import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_TYPE_VALUE;
+import static org.apache.flink.table.descriptors.KafkaValidator.GROUP_ID;
+import static org.apache.flink.table.descriptors.KafkaValidator.JSON_FIELD;
+import static org.apache.flink.table.descriptors.KafkaValidator.KAFKA_VERSION;
+import static org.apache.flink.table.descriptors.KafkaValidator.OFFSET;
+import static org.apache.flink.table.descriptors.KafkaValidator.PARTITION;
+import static org.apache.flink.table.descriptors.KafkaValidator.SPECIFIC_OFFSETS;
+import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_FIELD;
+import static org.apache.flink.table.descriptors.KafkaValidator.TABLE_JSON_MAPPING;
+import static org.apache.flink.table.descriptors.KafkaValidator.TOPIC;
+import static org.apache.flink.table.descriptors.KafkaValidator.ZOOKEEPER_CONNECT;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+
+/**
+ * Connector descriptor for the kafka message queue.
+ */
+public class Kafka extends ConnectorDescriptor {
+
+	private Optional<String> version = Optional.empty();
+	private Optional<String> bootstrapServers = Optional.empty();
+	private Optional<String> groupId = Optional.empty();
+	private Optional<String> topic = Optional.empty();
+	private Optional<String> zookeeperConnect = Optional.empty();
+	private Optional<Map<String, String>> tableJsonMapping = Optional.empty();
+
+	private Optional<StartupMode> startupMode = Optional.empty();
+	private Optional<Map<Integer, Long>> specificOffsets = Optional.empty();
+
+	public Kafka() {
+		super(CONNECTOR_TYPE_VALUE, 1);
+	}
+
+	/**
+	 * Sets the kafka version.
+	 *
+	 * @param version
+	 * Could be {@link KafkaValidator#KAFKA_VERSION_VALUE_011},
+	 * {@link KafkaValidator#KAFKA_VERSION_VALUE_010},
+	 * {@link KafkaValidator#KAFKA_VERSION_VALUE_09},
+	 * or {@link KafkaValidator#KAFKA_VERSION_VALUE_08}.
+	 */
+	public Kafka version(String version) {
+		this.version = Optional.of(version);
+		return this;
+	}
+
+	/**
+	 * Sets the bootstrap servers for kafka.
+	 */
+	public Kafka bootstrapServers(String bootstrapServers) {
+		this.bootstrapServers = Optional.of(bootstrapServers);
+		return this;
+	}
+
+	/**
+	 * Sets the consumer group id.
+	 */
+	public Kafka groupId(String groupId) {
+		this.groupId = Optional.of(groupId);
+		return this;
+	}
+
+	/**
+	 * Sets the topic to consume.
+	 */
+	public Kafka topic(String topic) {
+		this.topic = Optional.of(topic);
+		return this;
+	}
+
+	/**
+	 * Sets the startup mode.
+	 */
+	public Kafka startupMode(StartupMode startupMode) {
+		this.startupMode = Optional.of(startupMode);
+		return this;
+	}
+
+	/**
+	 * Sets the zookeeper hosts. Only required by kafka 0.8.
+	 */
+	public Kafka zookeeperConnect(String zookeeperConnect) {
+		this.zookeeperConnect = Optional.of(zookeeperConnect);
+		return this;
+	}
+
+	/**
+	 * Sets the consume offsets for the topic set with {@link Kafka#topic(String)}.
+	 * Only works in {@link StartupMode#SPECIFIC_OFFSETS} mode.
+	 */
+	public Kafka specificOffsets(Map<Integer, Long> specificOffsets) {
+		this.specificOffsets = Optional.of(specificOffsets);
+		return this;
+	}
+
+	/**
+	 * Sets the mapping from logical table schema to json schema.
+	 */
+	public Kafka tableJsonMapping(Map<String, String> jsonTableMapping) {
+		this.tableJsonMapping = Optional.of(jsonTableMapping);
+		return this;
+	}
+
+	@Override
+	public void addConnectorProperties(DescriptorProperties properties) {
+		if (version.isPresent()) {
+			properties.putString(KAFKA_VERSION, version.get());
+		}
+		if (bootstrapServers.isPresent()) {
+			properties.putString(BOOTSTRAP_SERVERS, bootstrapServers.get());
+		}
+		if (groupId.isPresent()) {
+			properties.putString(GROUP_ID, groupId.get());
+		}
+		if (topic.isPresent()) {
+			properties.putString(TOPIC, topic.get());
+		}
+		if (zookeeperConnect.isPresent()) {
+			properties.putString(ZOOKEEPER_CONNECT, zookeeperConnect.get());
+		}
+		if (startupMode.isPresent()) {
+			Map<String, String> map = KafkaValidator.normalizeStartupMode(startupMode.get());
+			for (Map.Entry<String, String> entry : map.entrySet()) {
+				properties.putString(entry.getKey(), entry.getValue());
+			}
+		}
+		if (specificOffsets.isPresent()) {
+			List<String> propertyKeys = new ArrayList<>();
+			propertyKeys.add(PARTITION);
+			propertyKeys.add(OFFSET);
+
+			List<Seq<String>> propertyValues = new ArrayList<>(specificOffsets.get().size());
+			for (Map.Entry<Integer, Long> entry : specificOffsets.get().entrySet()) {
+				List<String> partitionOffset = new ArrayList<>(2);
+				partitionOffset.add(entry.getKey().toString());
+				partitionOffset.add(entry.getValue().toString());
+				propertyValues.add(JavaConversions.asScalaBuffer(partitionOffset).toSeq());
+			}
+			properties.putIndexedFixedProperties(
+					SPECIFIC_OFFSETS,
+					JavaConversions.asScalaBuffer(propertyKeys).toSeq(),
+					JavaConversions.asScalaBuffer(propertyValues).toSeq()
+			);
+		}
+		if (tableJsonMapping.isPresent()) {
+			List<String> propertyKeys = new ArrayList<>();
+			propertyKeys.add(TABLE_FIELD);
+			propertyKeys.add(JSON_FIELD);
+
+			List<Seq<String>> mappingFields = new ArrayList<>(tableJsonMapping.get().size());
+			for (Map.Entry<String, String> entry : tableJsonMapping.get().entrySet()) {
+				List<String> singleMapping = new ArrayList<>(2);
+				singleMapping.add(entry.getKey());
+				singleMapping.add(entry.getValue());
+				mappingFields.add(JavaConversions.asScalaBuffer(singleMapping).toSeq());
+			}
+			properties.putIndexedFixedProperties(
+					TABLE_JSON_MAPPING,
+					JavaConversions.asScalaBuffer(propertyKeys).toSeq(),
+					JavaConversions.asScalaBuffer(mappingFields).toSeq()
+			);
+		}
+	}
+
+	@Override
+	public boolean needsFormat() {
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
new file mode 100644
index 0000000..a3ca22f
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.descriptors;
+
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.table.api.ValidationException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import scala.Function0;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.runtime.AbstractFunction0;
+import scala.runtime.BoxedUnit;
+
+
+/**
+ * The validator for {@link Kafka}.
+ */
+public class KafkaValidator extends ConnectorDescriptorValidator {
+	// fields
+	public static final String CONNECTOR_TYPE_VALUE = "kafka";
+	public static final String KAFKA_VERSION = "kafka.version";
+	public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
+	public static final String GROUP_ID = "group.id";
+	public static final String TOPIC = "topic";
+	public static final String STARTUP_MODE = "startup.mode";
+	public static final String SPECIFIC_OFFSETS = "specific.offsets";
+	public static final String TABLE_JSON_MAPPING = "table.json.mapping";
+
+	public static final String PARTITION = "partition";
+	public static final String OFFSET = "offset";
+
+	public static final String TABLE_FIELD = "table.field";
+	public static final String JSON_FIELD = "json.field";
+
+	public static final String ZOOKEEPER_CONNECT = "zookeeper.connect"; // only required for 0.8
+
+	// values
+	public static final String KAFKA_VERSION_VALUE_08 = "0.8";
+	public static final String KAFKA_VERSION_VALUE_09 = "0.9";
+	public static final String KAFKA_VERSION_VALUE_010 = "0.10";
+	public static final String KAFKA_VERSION_VALUE_011 = "0.11";
+
+	public static final String STARTUP_MODE_VALUE_EARLIEST = "earliest-offset";
+	public static final String STARTUP_MODE_VALUE_LATEST = "latest-offset";
+	public static final String STARTUP_MODE_VALUE_GROUP_OFFSETS = "group-offsets";
+	public static final String STARTUP_MODE_VALUE_SPECIFIC_OFFSETS = "specific-offsets";
+
+	// utils
+	public static Map<String, String> normalizeStartupMode(StartupMode startupMode) {
+		Map<String, String> mapPair = new HashMap<>();
+		switch (startupMode) {
+			case EARLIEST:
+				mapPair.put(STARTUP_MODE, STARTUP_MODE_VALUE_EARLIEST);
+				break;
+			case LATEST:
+				mapPair.put(STARTUP_MODE, STARTUP_MODE_VALUE_LATEST);
+				break;
+			case GROUP_OFFSETS:
+				mapPair.put(STARTUP_MODE, STARTUP_MODE_VALUE_GROUP_OFFSETS);
+				break;
+			case SPECIFIC_OFFSETS:
+				mapPair.put(STARTUP_MODE, STARTUP_MODE_VALUE_SPECIFIC_OFFSETS);
+				break;
+		}
+		return mapPair;
+	}
+
+	@Override
+	public void validate(DescriptorProperties properties) {
+		super.validate(properties);
+
+		AbstractFunction0<BoxedUnit> emptyValidator = new AbstractFunction0<BoxedUnit>() {
+			@Override
+			public BoxedUnit apply() {
+				return BoxedUnit.UNIT;
+			}
+		};
+
+		properties.validateValue(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE, false);
+
+		AbstractFunction0<BoxedUnit> version08Validator = new AbstractFunction0<BoxedUnit>() {
+			@Override
+			public BoxedUnit apply() {
+				properties.validateString(ZOOKEEPER_CONNECT, false, 0, Integer.MAX_VALUE);
+				return BoxedUnit.UNIT;
+			}
+		};
+
+		Map<String, Function0<BoxedUnit>> versionValidatorMap = new HashMap<>();
+		versionValidatorMap.put(KAFKA_VERSION_VALUE_08, version08Validator);
+		versionValidatorMap.put(KAFKA_VERSION_VALUE_09, emptyValidator);
+		versionValidatorMap.put(KAFKA_VERSION_VALUE_010, emptyValidator);
+		versionValidatorMap.put(KAFKA_VERSION_VALUE_011, emptyValidator);
+		properties.validateEnum(
+				KAFKA_VERSION,
+				false,
+				toScalaImmutableMap(versionValidatorMap)
+		);
+
+		properties.validateString(BOOTSTRAP_SERVERS, false, 1, Integer.MAX_VALUE);
+		properties.validateString(GROUP_ID, false, 1, Integer.MAX_VALUE);
+		properties.validateString(TOPIC, false, 1, Integer.MAX_VALUE);
+
+		AbstractFunction0<BoxedUnit> specificOffsetsValidator = new AbstractFunction0<BoxedUnit>() {
+			@Override
+			public BoxedUnit apply() {
+				Map<String, String> partitions = JavaConversions.mapAsJavaMap(
+						properties.getIndexedProperty(SPECIFIC_OFFSETS, PARTITION));
+
+				Map<String, String> offsets = JavaConversions.mapAsJavaMap(
+						properties.getIndexedProperty(SPECIFIC_OFFSETS, OFFSET));
+				if (partitions.isEmpty() || offsets.isEmpty()) {
+					throw new ValidationException("Offsets must be set for SPECIFIC_OFFSETS mode.");
+				}
+				for (int i = 0; i < partitions.size(); ++i) {
+					properties.validateInt(
+							SPECIFIC_OFFSETS + "." + i + "." + PARTITION,
+							false,
+							0,
+							Integer.MAX_VALUE);
+					properties.validateLong(
+							SPECIFIC_OFFSETS + "." + i + "." + OFFSET,
+							false,
+							0,
+							Long.MAX_VALUE);
+				}
+				return BoxedUnit.UNIT;
+			}
+		};
+		Map<String, Function0<BoxedUnit>> startupModeValidatorMap = new HashMap<>();
+		startupModeValidatorMap.put(STARTUP_MODE_VALUE_GROUP_OFFSETS, emptyValidator);
+		startupModeValidatorMap.put(STARTUP_MODE_VALUE_EARLIEST, emptyValidator);
+		startupModeValidatorMap.put(STARTUP_MODE_VALUE_LATEST, emptyValidator);
+		startupModeValidatorMap.put(STARTUP_MODE_VALUE_SPECIFIC_OFFSETS, specificOffsetsValidator);
+
+		properties.validateEnum(STARTUP_MODE, true, toScalaImmutableMap(startupModeValidatorMap));
+		validateTableJsonMapping(properties);
+	}
+
+	private void validateTableJsonMapping(DescriptorProperties properties) {
+		Map<String, String> mappingTableField = JavaConversions.mapAsJavaMap(
+				properties.getIndexedProperty(TABLE_JSON_MAPPING, TABLE_FIELD));
+		Map<String, String> mappingJsonField = JavaConversions.mapAsJavaMap(
+				properties.getIndexedProperty(TABLE_JSON_MAPPING, JSON_FIELD));
+
+		if (mappingJsonField.size() != mappingJsonField.size()) {
+			throw new ValidationException("Table JSON mapping must be one to one.");
+		}
+
+		for (int i = 0; i < mappingTableField.size(); i++) {
+			properties.validateString(
+					TABLE_JSON_MAPPING + "." + i + "." + TABLE_FIELD,
+					false,
+					1,
+					Integer.MAX_VALUE);
+			properties.validateString(
+					TABLE_JSON_MAPPING + "." + i + "." + JSON_FIELD,
+					false,
+					1,
+					Integer.MAX_VALUE);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private <K, V> scala.collection.immutable.Map<K, V> toScalaImmutableMap(Map<K, V> javaMap) {
+		final java.util.List<scala.Tuple2<K, V>> list = new java.util.ArrayList<>(javaMap.size());
+		for (final java.util.Map.Entry<K, V> entry : javaMap.entrySet()) {
+			list.add(scala.Tuple2.apply(entry.getKey(), entry.getValue()));
+		}
+		final scala.collection.Seq<Tuple2<K, V>> seq =
+				scala.collection.JavaConverters.asScalaBufferConverter(list).asScala().toSeq();
+		return (scala.collection.immutable.Map<K, V>) scala.collection.immutable.Map$.MODULE$.apply(seq);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java
new file mode 100644
index 0000000..964a624
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableFromDescriptorTestBase.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.descriptors.Kafka;
+
+import org.mockito.Mockito;
+
+/**
+ * Tests for {@link KafkaJsonTableSourceFactory}.
+ */
+public abstract class KafkaJsonTableFromDescriptorTestBase {
+	private static final String GROUP_ID = "test-group";
+	private static final String BOOTSTRAP_SERVERS = "localhost:1234";
+	private static final String TOPIC = "test-topic";
+
+	protected abstract String versionForTest();
+
+	protected abstract KafkaJsonTableSource.Builder builderForTest();
+
+	protected abstract void extraSettings(KafkaTableSource.Builder builder, Kafka kafka);
+
+	private static StreamExecutionEnvironment env = Mockito.mock(StreamExecutionEnvironment.class);
+	private static StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+//	@Test
+//	public void buildJsonTableSourceTest() throws Exception {
+//		final URL url = getClass().getClassLoader().getResource("kafka-json-schema.json");
+//		Objects.requireNonNull(url);
+//		final String schema = FileUtils.readFileUtf8(new File(url.getFile()));
+//
+//		Map<String, String> tableJsonMapping = new HashMap<>();
+//		tableJsonMapping.put("fruit-name", "name");
+//		tableJsonMapping.put("fruit-count", "count");
+//		tableJsonMapping.put("event-time", "time");
+//
+//		// Construct with the builder.
+//		Properties props = new Properties();
+//		props.put("group.id", GROUP_ID);
+//		props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
+//
+//		Map<KafkaTopicPartition, Long> specificOffsets = new HashMap<>();
+//		specificOffsets.put(new KafkaTopicPartition(TOPIC, 0), 100L);
+//		specificOffsets.put(new KafkaTopicPartition(TOPIC, 1), 123L);
+//
+//		KafkaTableSource.Builder builder = builderForTest()
+//				.forJsonSchema(TableSchema.fromTypeInfo(JsonSchemaConverter.convert(schema)))
+//				.failOnMissingField(true)
+//				.withTableToJsonMapping(tableJsonMapping)
+//				.withKafkaProperties(props)
+//				.forTopic(TOPIC)
+//				.fromSpecificOffsets(specificOffsets)
+//				.withSchema(
+//						TableSchema.builder()
+//								.field("fruit-name", Types.STRING)
+//								.field("fruit-count", Types.INT)
+//								.field("event-time", Types.LONG)
+//								.field("proc-time", Types.SQL_TIMESTAMP)
+//								.build())
+//				.withProctimeAttribute("proc-time");
+//
+//		// Construct with the descriptor.
+//		Map<Integer, Long> offsets = new HashMap<>();
+//		offsets.put(0, 100L);
+//		offsets.put(1, 123L);
+//		Kafka kafka = new Kafka()
+//				.version(versionForTest())
+//				.groupId(GROUP_ID)
+//				.bootstrapServers(BOOTSTRAP_SERVERS)
+//				.topic(TOPIC)
+//				.startupMode(StartupMode.SPECIFIC_OFFSETS)
+//				.specificOffsets(offsets)
+//				.tableJsonMapping(tableJsonMapping);
+//		extraSettings(builder, kafka);
+//
+//		TableSource source = tEnv
+//				.from(kafka)
+//				.withFormat(
+//						new Json()
+//								.schema(schema)
+//								.failOnMissingField(true))
+//				.withSchema(new Schema()
+//						.field("fruit-name", Types.STRING)
+//						.field("fruit-count", Types.INT)
+//						.field("event-time", Types.LONG)
+//						.field("proc-time", Types.SQL_TIMESTAMP).proctime())
+//				.toTableSource();
+//
+//		Assert.assertEquals(builder.build(), source);
+//	}
+
+//	@Test(expected = TableException.class)
+//	public void buildJsonTableSourceFailTest() {
+//		tEnv.from(
+//				new Kafka()
+//						.version(versionForTest())
+//						.groupId(GROUP_ID)
+//						.bootstrapServers(BOOTSTRAP_SERVERS)
+//						.topic(TOPIC)
+//						.startupMode(StartupMode.SPECIFIC_OFFSETS)
+//						.specificOffsets(new HashMap<>()))
+//				.withFormat(
+//						new Json()
+//								.schema("")
+//								.failOnMissingField(true))
+//				.toTableSource();
+//	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json
new file mode 100644
index 0000000..5167e5e
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-base/src/test/resources/kafka-json-schema.json
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+  "title": "Fruit",
+  "type": "object",
+  "properties": {
+    "name": {
+      "type": "string"
+    },
+    "count": {
+      "type": "integer"
+    },
+    "time": {
+      "description": "Age in years",
+      "type": "number"
+    }
+  },
+  "required": ["name", "count", "time"]
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1d26062d/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
index 534ef39..1e88d93 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableSchema.scala
@@ -92,6 +92,11 @@ class TableSchema(
   }
 
   /**
+    * Returns the number of columns.
+    */
+  def getColumnNum: Int = columnNames.length
+
+  /**
     * Returns all column names as an array.
     */
   def getColumnNames: Array[String] = columnNames


Mime
View raw message