flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-3440][Kafka 0.8] Commit also offsets retrieved from Kafka into the OffsetStore (ZK)
Date Tue, 23 Feb 2016 13:47:38 GMT
Repository: flink
Updated Branches:
  refs/heads/master de2163060 -> 8626971a3


[FLINK-3440][Kafka 0.8] Commit also offsets retrieved from Kafka into the OffsetStore (ZK)

This closes #1692


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8626971a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8626971a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8626971a

Branch: refs/heads/master
Commit: 8626971a36d37fc33a077e7cadcc3f9e6911e96e
Parents: de21630
Author: Robert Metzger <rmetzger@apache.org>
Authored: Mon Feb 22 14:48:37 2016 +0100
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Feb 23 14:46:54 2016 +0100

----------------------------------------------------------------------
 .../kafka/internals/LegacyFetcher.java          |   8 ++
 .../connectors/kafka/Kafka08ITCase.java         | 130 ++++++++++++++++++-
 2 files changed, 137 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8626971a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
index c96d35f..55c94cb 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/LegacyFetcher.java
@@ -711,6 +711,14 @@ public class LegacyFetcher implements Fetcher {
 			if (partitionsToGetOffsetsFor.size() > 0) {
 				getLastOffset(consumer, partitionsToGetOffsetsFor, getInvalidOffsetBehavior(config));
 				LOG.info("No prior offsets found for some partitions. Fetched the following start offsets
{}", partitionsToGetOffsetsFor);
+
+				// setting the fetched offset also in the offset state.
+				// we subtract -1 from the offset
+				synchronized (sourceContext.getCheckpointLock()) {
+					for(FetchPartition fp: partitionsToGetOffsetsFor) {
+						this.offsetsState.put(new KafkaTopicPartition(fp.topic, fp.partition), fp.nextOffsetToRead
- 1L);
+					}
+				}
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8626971a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
index 51f6d6b..feeb3de 100644
--- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
+++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08ITCase.java
@@ -18,12 +18,24 @@
 package org.apache.flink.streaming.connectors.kafka;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.TreeCache;
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.runtime.client.JobCancellationException;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.internals.ZookeeperOffsetHandler;
+import org.apache.flink.streaming.connectors.kafka.testutils.DiscardingSink;
+import org.apache.flink.streaming.connectors.kafka.testutils.JobManagerCommunicationUtils;
+import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertTrue;
 
@@ -246,7 +258,7 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"),
topicName, 0);
 		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"),
topicName, 1);
 		long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"),
topicName, 2);
-
+		curatorFramework.close();
 		LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3);
 
 		// ensure that the offset has been committed
@@ -257,4 +269,120 @@ public class Kafka08ITCase extends KafkaConsumerTestBase {
 
 		deleteTestTopic(topicName);
 	}
+
+	/**
+	 * This test ensures that when the consumers retrieve some start offset from kafka (earliest,
latest), that this offset
+	 * is committed to Zookeeper, even if some partitions are not read
+	 *
+	 * Test:
+	 * - Create 3 topics
+	 * - write 50 messages into each.
+	 * - Start three consumers with auto.offset.reset='latest' and wait until they committed
into ZK.
+	 * - Check if the offsets in ZK are set to 50 for the three partitions
+	 *
+	 * See FLINK-3440 as well
+	 */
+	@Test(timeout = 60000)
+	public void testKafkaOffsetRetrievalToZookeeper() throws Exception {
+		final String topicName = "testKafkaOffsetToZk";
+		final int parallelism = 3;
+		createTestTopic(topicName, parallelism, 1);
+		StreamExecutionEnvironment env1 = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
flinkPort);
+		env1.getConfig().disableSysoutLogging();
+		env1.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env1.setParallelism(parallelism);
+
+		// write a sequence from 0 to 49 to each of the 3 partitions.
+		writeSequence(env1, topicName, 50, parallelism);
+
+
+		StreamExecutionEnvironment env2 = StreamExecutionEnvironment.createRemoteEnvironment("localhost",
flinkPort);
+
+		// enable checkpointing
+		env2.getConfig().disableSysoutLogging();
+		env2.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+		env2.setParallelism(parallelism);
+		env2.enableCheckpointing(200);
+
+		Properties readProps = new Properties();
+		readProps.putAll(standardProps);
+		readProps.setProperty("auto.offset.reset", "latest");
+
+		DataStream<String> stream = env2.addSource(kafkaServer.getConsumer(topicName, new
SimpleStringSchema(), readProps));
+		stream.addSink(new DiscardingSink<String>());
+
+		CuratorFramework curatorFramework = ((KafkaTestEnvironmentImpl)kafkaServer).createCuratorClient();
+		String consumerGroupDir = standardProps.getProperty("group.id");
+		TreeCache tc1 = new TreeCache(curatorFramework, "/consumers/" + consumerGroupDir + "/offsets/"
+ topicName + "/0");
+		TreeCache tc2 = new TreeCache(curatorFramework, "/consumers/" + consumerGroupDir + "/offsets/"
+ topicName + "/1");
+		TreeCache tc3 = new TreeCache(curatorFramework, "/consumers/" + consumerGroupDir + "/offsets/"
+ topicName + "/2");
+
+		// add listener to wait until first partition is updated in ZK
+		TreeCacheListener stopListener = new TreeCacheListener() {
+			AtomicInteger counter = new AtomicInteger(0);
+			@Override
+			public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception
{
+				LOG.info("Updated {}", event);
+				if (event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) {
+					if(counter.incrementAndGet() == 3) {
+						// cancel job, node has been created
+						LOG.info("Cancelling job after all three ZK nodes were updated");
+						JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+					}
+				}
+			}
+		};
+		tc1.getListenable().addListener(stopListener);
+		tc1.start();
+		tc2.getListenable().addListener(stopListener);
+		tc2.start();
+		tc3.getListenable().addListener(stopListener);
+		tc3.start();
+
+		// the curator listener is not always working properly. Stop job after 10 seconds
+		final Tuple1<Throwable> error = new Tuple1<>();
+		Thread canceller = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				try {
+					Thread.sleep(10_000L);
+					LOG.info("Cancelling job after 10 seconds");
+					JobManagerCommunicationUtils.cancelCurrentJob(flink.getLeaderGateway(timeout));
+				} catch (Throwable t) {
+					if (!(t instanceof InterruptedException)) {
+						error.f0 = t;
+					}
+				}
+			}
+		});
+		canceller.start();
+
+		try {
+			env2.execute("Idlying Kafka source");
+		} catch( Throwable thr) {
+			if(!(thr.getCause() instanceof JobCancellationException)) {
+				throw thr;
+			}
+		}
+		tc1.close();
+		tc2.close();
+		tc3.close();
+
+		canceller.interrupt();
+		canceller.join();
+		if(error.f0 != null) {
+			throw new RuntimeException("Delayed cancelling thread had an error", error.f0);
+		}
+
+		// check if offsets are correctly in ZK
+		long o1 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"),
topicName, 0);
+		long o2 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"),
topicName, 1);
+		long o3 = ZookeeperOffsetHandler.getOffsetFromZooKeeper(curatorFramework, standardProps.getProperty("group.id"),
topicName, 2);
+		Assert.assertEquals(49L, o1);
+		Assert.assertEquals(49L, o2);
+		Assert.assertEquals(49L, o3);
+
+		curatorFramework.close();
+
+	}
 }


Mime
View raw message