flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From twal...@apache.org
Subject [2/2] flink git commit: [FLINK-8118] [table] Allow to specify the offsets of KafkaTableSources
Date Thu, 23 Nov 2017 13:49:44 GMT
[FLINK-8118] [table] Allow to specify the offsets of KafkaTableSources

This closes #5056.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4083c70d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4083c70d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4083c70d

Branch: refs/heads/master
Commit: 4083c70dc88e0022daaab807d67b922d426fb533
Parents: 458c909
Author: Xingcan Cui <xingcanc@gmail.com>
Authored: Thu Nov 23 00:00:39 2017 +0800
Committer: twalthr <twalthr@apache.org>
Committed: Thu Nov 23 14:49:14 2017 +0100

----------------------------------------------------------------------
 docs/dev/table/sourceSinks.md                   |  50 +++++++-
 .../kafka/Kafka010AvroTableSource.java          |   2 +-
 .../kafka/Kafka010JsonTableSource.java          |   2 +-
 .../connectors/kafka/Kafka010TableSource.java   |   2 +-
 .../kafka/Kafka011AvroTableSource.java          |   2 +-
 .../kafka/Kafka011JsonTableSource.java          |   2 +-
 .../connectors/kafka/Kafka011TableSource.java   |   2 +-
 .../kafka/Kafka08AvroTableSource.java           |   2 +-
 .../kafka/Kafka08JsonTableSource.java           |   2 +-
 .../connectors/kafka/Kafka08TableSource.java    |   2 +-
 .../kafka/Kafka09AvroTableSource.java           |   2 +-
 .../kafka/Kafka09JsonTableSource.java           |   2 +-
 .../connectors/kafka/Kafka09TableSource.java    |   2 +-
 .../connectors/kafka/KafkaAvroTableSource.java  |   2 +-
 .../connectors/kafka/KafkaJsonTableSource.java  |   2 +-
 .../connectors/kafka/KafkaTableSource.java      | 126 ++++++++++++++++++-
 .../kafka/KafkaTableSourceTestBase.java         |  44 +++++++
 17 files changed, 230 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/docs/dev/table/sourceSinks.md
----------------------------------------------------------------------
diff --git a/docs/dev/table/sourceSinks.md b/docs/dev/table/sourceSinks.md
index 7387358..aaf23bc 100644
--- a/docs/dev/table/sourceSinks.md
+++ b/docs/dev/table/sourceSinks.md
@@ -145,7 +145,7 @@ val source: TableSource[_] = Kafka010JsonTableSource.builder()
 </div>
 </div>
 
-* **Missing Field Handling** By default, a missing JSON field is set to `null`. You can enable
strict JSON parsing that will cancel the source (and query) if a field is missing.
+* **Missing Field Handling:** By default, a missing JSON field is set to `null`. You can
enable strict JSON parsing that will cancel the source (and query) if a field is missing.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -169,6 +169,30 @@ val source: TableSource[_] = Kafka010JsonTableSource.builder()
 </div>
 </div>
 
+* **Specify the start reading position:** By default, the table source will start reading
data from the committed group offsets in Zookeeper or Kafka brokers. You can specify other
start positions via the builder's methods, which correspond to the configurations in section
[Kafka Consumers Start Position Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
 {% top %}
 
 ### KafkaAvroTableSource
@@ -265,6 +289,30 @@ val source: TableSource[_] = Kafka010AvroTableSource.builder()
 </div>
 </div>
 
+* **Specify the start reading position:** By default, the table source will start reading
data from the committed group offsets in Zookeeper or Kafka brokers. You can specify other
start positions via the builder's methods, which correspond to the configurations in section
[Kafka Consumers Start Position Configuration](../connectors/kafka.html#kafka-consumers-start-position-configuration).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+TableSource source = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val source: TableSource[_] = Kafka010JsonTableSource.builder()
+  // ...
+  // start reading from the earliest offset
+  .startReadingFromEarliest()
+  .build()
+{% endhighlight %}
+</div>
+</div>
+
 {% top %}
 
 ### Configuring a Processing Time Attribute

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
index fbc58ea..660162a 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java
@@ -94,7 +94,7 @@ public class Kafka010AvroTableSource extends KafkaAvroTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties,
DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties
properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
index bbdb32f..5f9984e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java
@@ -93,7 +93,7 @@ public class Kafka010JsonTableSource extends KafkaJsonTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties,
DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties
properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
index bc675eb..379c562 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java
@@ -61,7 +61,7 @@ public abstract class Kafka010TableSource extends KafkaTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties,
DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties
properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer010<>(topic, deserializationSchema, properties);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
index af3b5af..a9a109c 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java
@@ -94,7 +94,7 @@ public class Kafka011AvroTableSource extends KafkaAvroTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties,
DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties
properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
index 71158f6..cee7c61 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java
@@ -93,7 +93,7 @@ public class Kafka011JsonTableSource extends KafkaJsonTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties,
DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties
properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
index dbf980b..8c40318 100644
--- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java
@@ -61,7 +61,7 @@ public abstract class Kafka011TableSource extends KafkaTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties,
DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties
properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer011<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
index 8f45881..9105c73 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java
@@ -94,7 +94,7 @@ public class Kafka08AvroTableSource extends KafkaAvroTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties,
DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties
properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
index b3b37c6..639093d 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java
@@ -93,7 +93,7 @@ public class Kafka08JsonTableSource extends KafkaJsonTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties,
DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties
properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
index 8270b78..3bb6a94 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java
@@ -61,7 +61,7 @@ public abstract class Kafka08TableSource extends KafkaTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties,
DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties
properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer08<>(topic, deserializationSchema, properties);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
index 808be01..fb8496a 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java
@@ -94,7 +94,7 @@ public class Kafka09AvroTableSource extends KafkaAvroTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties,
DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties
properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
index a699d65..ded23b0 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java
@@ -93,7 +93,7 @@ public class Kafka09JsonTableSource extends KafkaJsonTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties,
DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties
properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
index 1d2c028..df15452 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java
@@ -61,7 +61,7 @@ public abstract class Kafka09TableSource extends KafkaTableSource {
 	}
 
 	@Override
-	FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties,
DeserializationSchema<Row> deserializationSchema) {
+	protected FlinkKafkaConsumerBase<Row> createKafkaConsumer(String topic, Properties
properties, DeserializationSchema<Row> deserializationSchema) {
 		return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
index 8cea36c..055b679 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java
@@ -44,7 +44,7 @@ import java.util.Properties;
  * A version-agnostic Kafka Avro {@link StreamTableSource}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
  */
 public abstract class KafkaAvroTableSource extends KafkaTableSource implements DefinedFieldMapping
{
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
index 9a6525c..6806673 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java
@@ -32,7 +32,7 @@ import java.util.Properties;
  * A version-agnostic Kafka JSON {@link StreamTableSource}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
  *
  * <p>The field names are used to parse the JSON file and so are the types.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
index 3291f7d..d0ee7de 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
+import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.ValidationException;
@@ -37,6 +39,7 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import scala.Option;
@@ -45,7 +48,7 @@ import scala.Option;
  * A version-agnostic Kafka {@link StreamTableSource}.
  *
  * <p>The version-specific Kafka consumers need to extend this class and
- * override {@link #getKafkaConsumer(String, Properties, DeserializationSchema)}}.
+ * override {@link #createKafkaConsumer(String, Properties, DeserializationSchema)}}.
  */
 public abstract class KafkaTableSource
 	implements StreamTableSource<Row>, DefinedProctimeAttribute, DefinedRowtimeAttributes
{
@@ -68,6 +71,12 @@ public abstract class KafkaTableSource
 	/** Descriptor for a rowtime attribute. */
 	private List<RowtimeAttributeDescriptor> rowtimeAttributeDescriptors;
 
+	/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}).
*/
+	private StartupMode startupMode;
+
+	/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}.
*/
+	private Map<KafkaTopicPartition, Long> specificStartupOffsets;
+
 	/**
 	 * Creates a generic Kafka {@link StreamTableSource}.
 	 *
@@ -121,6 +130,37 @@ public abstract class KafkaTableSource
 		return rowtimeAttributeDescriptors;
 	}
 
+	/**
+	 * Returns a version-specific Kafka consumer with the start position configured.
+	 *
+	 * @param topic                 Kafka topic to consume.
+	 * @param properties            Properties for the Kafka consumer.
+	 * @param deserializationSchema Deserialization schema to use for Kafka records.
+	 * @return The version-specific Kafka consumer
+	 */
+	protected FlinkKafkaConsumerBase<Row> getKafkaConsumer(
+			String topic,
+			Properties properties,
+			DeserializationSchema<Row> deserializationSchema) {
+		FlinkKafkaConsumerBase<Row> kafkaConsumer =
+				createKafkaConsumer(topic, properties, deserializationSchema);
+		switch (startupMode) {
+			case EARLIEST:
+				kafkaConsumer.setStartFromEarliest();
+				break;
+			case LATEST:
+				kafkaConsumer.setStartFromLatest();
+				break;
+			case GROUP_OFFSETS:
+				kafkaConsumer.setStartFromGroupOffsets();
+				break;
+			case SPECIFIC_OFFSETS:
+				kafkaConsumer.setStartFromSpecificOffsets(specificStartupOffsets);
+				break;
+		}
+		return kafkaConsumer;
+	}
+
 	//////// SETTERS FOR OPTIONAL PARAMETERS
 
 	/**
@@ -160,17 +200,35 @@ public abstract class KafkaTableSource
 		this.rowtimeAttributeDescriptors = rowtimeAttributeDescriptors;
 	}
 
+	/**
+	 * Sets the startup mode of the TableSource.
+	 *
+	 * @param startupMode The startup mode.
+	 */
+	protected void setStartupMode(StartupMode startupMode) {
+		this.startupMode = startupMode;
+	}
+
+	/**
+	 * Sets the startup offsets of the TableSource; only relevant when the startup mode is {@link
StartupMode#SPECIFIC_OFFSETS}.
+	 *
+	 * @param specificStartupOffsets The startup offsets for different partitions.
+	 */
+	protected void setSpecificStartupOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)
{
+		this.specificStartupOffsets = specificStartupOffsets;
+	}
+
 	//////// ABSTRACT METHODS FOR SUBCLASSES
 
 	/**
-	 * Returns the version-specific Kafka consumer.
+	 * Creates a version-specific Kafka consumer.
 	 *
 	 * @param topic                 Kafka topic to consume.
 	 * @param properties            Properties for the Kafka consumer.
 	 * @param deserializationSchema Deserialization schema to use for Kafka records.
 	 * @return The version-specific Kafka consumer
 	 */
-	abstract FlinkKafkaConsumerBase<Row> getKafkaConsumer(
+	protected abstract FlinkKafkaConsumerBase<Row> createKafkaConsumer(
 			String topic,
 			Properties properties,
 			DeserializationSchema<Row> deserializationSchema);
@@ -201,6 +259,13 @@ public abstract class KafkaTableSource
 
 		private RowtimeAttributeDescriptor rowtimeAttributeDescriptor;
 
+		/** The startup mode for the contained consumer (default is {@link StartupMode#GROUP_OFFSETS}).
*/
+		private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
+
+		/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}.
*/
+		private Map<KafkaTopicPartition, Long> specificStartupOffsets = null;
+
+
 		/**
 		 * Sets the topic from which the table is read.
 		 *
@@ -310,6 +375,51 @@ public abstract class KafkaTableSource
 		}
 
 		/**
+		 * Configures the TableSource to start reading from the earliest offset for all partitions.
+		 *
+		 * @see FlinkKafkaConsumerBase#setStartFromEarliest()
+		 */
+		public B fromEarliest() {
+			this.startupMode = StartupMode.EARLIEST;
+			this.specificStartupOffsets = null;
+			return builder();
+		}
+
+		/**
+		 * Configures the TableSource to start reading from the latest offset for all partitions.
+		 *
+		 * @see FlinkKafkaConsumerBase#setStartFromLatest()
+		 */
+		public B fromLatest() {
+			this.startupMode = StartupMode.LATEST;
+			this.specificStartupOffsets = null;
+			return builder();
+		}
+
+		/**
+		 * Configures the TableSource to start reading from any committed group offsets found in
Zookeeper / Kafka brokers.
+		 *
+		 * @see FlinkKafkaConsumerBase#setStartFromGroupOffsets()
+		 */
+		public B fromGroupOffsets() {
+			this.startupMode = StartupMode.GROUP_OFFSETS;
+			this.specificStartupOffsets = null;
+			return builder();
+		}
+
+		/**
+		 * Configures the TableSource to start reading partitions from specific offsets, set independently
for each partition.
+		 *
+		 * @param specificStartupOffsets the specified offsets for partitions
+		 * @see FlinkKafkaConsumerBase#setStartFromSpecificOffsets(Map)
+		 */
+		public B fromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets)
{
+			this.startupMode = StartupMode.SPECIFIC_OFFSETS;
+			this.specificStartupOffsets = Preconditions.checkNotNull(specificStartupOffsets);
+			return builder();
+		}
+
+		/**
 		 * Returns the configured topic.
 		 *
 		 * @return the configured topic.
@@ -357,6 +467,16 @@ public abstract class KafkaTableSource
 			} else {
 				tableSource.setRowtimeAttributeDescriptors(Collections.singletonList(rowtimeAttributeDescriptor));
 			}
+			tableSource.setStartupMode(startupMode);
+			switch (startupMode) {
+				case EARLIEST:
+				case LATEST:
+				case GROUP_OFFSETS:
+					break;
+				case SPECIFIC_OFFSETS:
+					tableSource.setSpecificStartupOffsets(specificStartupOffsets);
+					break;
+			}
 		}
 
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/4083c70d/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
index 7a882f4..64dac06 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.types.Row;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
@@ -44,6 +45,7 @@ import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Abstract test base for all Kafka table sources.
@@ -188,6 +190,48 @@ public abstract class KafkaTableSourceTestBase {
 		}
 	}
 
+	@Test
+	public void testKafkaTSSetConsumeOffsets() {
+		KafkaTableSource.Builder b = getBuilder();
+		configureBuilder(b);
+
+		// test the default behavior
+		KafkaTableSource source = spy(b.build());
+		when(source.createKafkaConsumer(TOPIC, PROPS, null))
+				.thenReturn(mock(getFlinkKafkaConsumer()));
+
+		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromGroupOffsets();
+
+		// test reading from earliest
+		b.fromEarliest();
+		source = spy(b.build());
+		when(source.createKafkaConsumer(TOPIC, PROPS, null))
+				.thenReturn(mock(getFlinkKafkaConsumer()));
+
+		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromEarliest();
+
+		// test reading from latest
+		b.fromLatest();
+		source = spy(b.build());
+		when(source.createKafkaConsumer(TOPIC, PROPS, null))
+				.thenReturn(mock(getFlinkKafkaConsumer()));
+		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromLatest();
+
+		// test reading from group offsets
+		b.fromGroupOffsets();
+		source = spy(b.build());
+		when(source.createKafkaConsumer(TOPIC, PROPS, null))
+				.thenReturn(mock(getFlinkKafkaConsumer()));
+		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromGroupOffsets();
+
+		// test reading from given offsets
+		b.fromSpecificOffsets(mock(Map.class));
+		source = spy(b.build());
+		when(source.createKafkaConsumer(TOPIC, PROPS, null))
+				.thenReturn(mock(getFlinkKafkaConsumer()));
+		verify(source.getKafkaConsumer(TOPIC, PROPS, null)).setStartFromSpecificOffsets(any(Map.class));
+	}
+
 	protected abstract KafkaTableSource.Builder getBuilder();
 
 	protected abstract Class<DeserializationSchema<Row>> getDeserializationSchema();


Mime
View raw message