flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tzuli...@apache.org
Subject [2/4] flink git commit: [hotfix] [kafka, tests] Commit read offsets in Kafka integration tests
Date Mon, 24 Jul 2017 03:55:15 GMT
[hotfix] [kafka, tests] Commit read offsets in Kafka integration tests

Previously offsets were not commited so the same records could be read more then once.
It was not a big issue, because so far this methods were used only for at-least-once tests.

This closes #4310.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c65afdbe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c65afdbe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c65afdbe

Branch: refs/heads/master
Commit: c65afdbe6a711c8df0677c11285dc317dac3046e
Parents: 58b5374
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Wed Jul 12 14:01:49 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Committed: Mon Jul 24 10:25:21 2017 +0800

----------------------------------------------------------------------
 .../kafka/KafkaTestEnvironmentImpl.java         | 33 +++++++++++---------
 .../kafka/KafkaTestEnvironmentImpl.java         | 24 +++++++-------
 .../kafka/KafkaTestEnvironmentImpl.java         | 26 ++++++++-------
 3 files changed, 45 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c65afdbe/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index f437060..d3b45a9 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -127,23 +127,26 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	@Override
 	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties
properties, String topic, int partition, long timeout) {
 		List<ConsumerRecord<K, V>> result = new ArrayList<>();
-		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
-		consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
-
-		while (true) {
-			boolean processedAtLeastOneRecord = false;
-
-			// wait for new records with timeout and break the loop if we didn't get any
-			Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
-			while (iterator.hasNext()) {
-				ConsumerRecord<K, V> record = iterator.next();
-				result.add(record);
-				processedAtLeastOneRecord = true;
-			}
 
-			if (!processedAtLeastOneRecord) {
-				break;
+		try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
+			consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+
+			while (true) {
+				boolean processedAtLeastOneRecord = false;
+
+				// wait for new records with timeout and break the loop if we didn't get any
+				Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+				while (iterator.hasNext()) {
+					ConsumerRecord<K, V> record = iterator.next();
+					result.add(record);
+					processedAtLeastOneRecord = true;
+				}
+
+				if (!processedAtLeastOneRecord) {
+					break;
+				}
 			}
+			consumer.commitSync();
 		}
 
 		return UnmodifiableList.decorate(result);

http://git-wip-us.apache.org/repos/asf/flink/blob/c65afdbe/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 4791716..ab976e1 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -118,19 +118,21 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	@Override
 	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties
properties, String topic, int partition, long timeout) {
 		List<ConsumerRecord<K, V>> result = new ArrayList<>();
-		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
-		consumer.subscribe(new TopicPartition(topic, partition));
+		try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
+			consumer.subscribe(new TopicPartition(topic, partition));
 
-		while (true) {
-			Map<String, ConsumerRecords<K, V>> topics = consumer.poll(timeout);
-			if (topics == null || !topics.containsKey(topic)) {
-				break;
-			}
-			List<ConsumerRecord<K, V>> records = topics.get(topic).records(partition);
-			result.addAll(records);
-			if (records.size() == 0) {
-				break;
+			while (true) {
+				Map<String, ConsumerRecords<K, V>> topics = consumer.poll(timeout);
+				if (topics == null || !topics.containsKey(topic)) {
+					break;
+				}
+				List<ConsumerRecord<K, V>> records = topics.get(topic).records(partition);
+				result.addAll(records);
+				if (records.size() == 0) {
+					break;
+				}
 			}
+			consumer.commit(true);
 		}
 
 		return UnmodifiableList.decorate(result);

http://git-wip-us.apache.org/repos/asf/flink/blob/c65afdbe/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index 710d917..df95420 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -110,22 +110,24 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 	@Override
 	public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties
properties, String topic, int partition, long timeout) {
 		List<ConsumerRecord<K, V>> result = new ArrayList<>();
-		KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties);
-		consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
+		try (KafkaConsumer<K, V> consumer = new KafkaConsumer<>(properties)) {
+			consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
 
-		while (true) {
-			boolean processedAtLeastOneRecord = false;
+			while (true) {
+				boolean processedAtLeastOneRecord = false;
 
-			Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
-			while (iterator.hasNext()) {
-				ConsumerRecord<K, V> record = iterator.next();
-				result.add(record);
-				processedAtLeastOneRecord = true;
-			}
+				Iterator<ConsumerRecord<K, V>> iterator = consumer.poll(timeout).iterator();
+				while (iterator.hasNext()) {
+					ConsumerRecord<K, V> record = iterator.next();
+					result.add(record);
+					processedAtLeastOneRecord = true;
+				}
 
-			if (!processedAtLeastOneRecord) {
-				break;
+				if (!processedAtLeastOneRecord) {
+					break;
+				}
 			}
+			consumer.commitSync();
 		}
 
 		return UnmodifiableList.decorate(result);


Mime
View raw message