Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7413B200B8F for ; Fri, 30 Sep 2016 16:47:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 72B95160AD9; Fri, 30 Sep 2016 14:47:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 40F8A160AC4 for ; Fri, 30 Sep 2016 16:47:17 +0200 (CEST) Received: (qmail 62779 invoked by uid 500); 30 Sep 2016 14:47:16 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 62770 invoked by uid 99); 30 Sep 2016 14:47:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Sep 2016 14:47:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 520FFE03CE; Fri, 30 Sep 2016 14:47:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Message-Id: <3b7cbef2dde148fe92f7079fa55acd16@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls Date: Fri, 30 Sep 2016 14:47:16 +0000 (UTC) archived-at: Fri, 30 Sep 2016 14:47:18 -0000 Repository: flink Updated Branches: refs/heads/release-1.1 caa0fbb21 -> 90d77594f [FLINK-4702] [kafka connector] Commit offsets to Kafka asynchronously and don't block on polls Letting the Kafka commit block on polls means that 'notifyCheckpointComplete()' may take very long. This is mostly relevant for low-throughput Kafka topics. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90d77594 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90d77594 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90d77594 Branch: refs/heads/release-1.1 Commit: 90d77594fffda1d8d15658d363c478ea6430514e Parents: caa0fbb Author: Stephan Ewen Authored: Thu Sep 29 18:09:51 2016 +0200 Committer: Stephan Ewen Committed: Fri Sep 30 12:39:53 2016 +0200 ---------------------------------------------------------------------- .../kafka/internal/Kafka09Fetcher.java | 73 +++-- .../connectors/kafka/Kafka09FetcherTest.java | 304 +++++++++++++++++++ 2 files changed, 355 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/90d77594/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java index 9c861c9..1da2259 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java @@ -37,6 +37,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.TopicPartition; @@ -50,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; /** * A fetcher that fetches data from Kafka brokers via the Kafka 0.9 consumer API. @@ -74,18 +76,24 @@ public class Kafka09Fetcher extends AbstractFetcher implem /** The maximum number of milliseconds to wait for a fetch batch */ private final long pollTimeout; - /** Mutex to guard against concurrent access to the non-threadsafe Kafka consumer */ - private final Object consumerLock = new Object(); + /** The next offsets that the main thread should commit */ + private final AtomicReference> nextOffsetsToCommit; + + /** The callback invoked by Kafka once an offset commit is complete */ + private final OffsetCommitCallback offsetCommitCallback; /** Reference to the Kafka consumer, once it is created */ private volatile KafkaConsumer consumer; - + /** Reference to the proxy, forwarding exceptions from the fetch thread to the main thread */ private volatile ExceptionProxy errorHandler; /** Flag to mark the main work loop as alive */ private volatile boolean running = true; + /** Flag tracking whether the latest commit request has completed */ + private volatile boolean commitInProgress; + // ------------------------------------------------------------------------ public Kafka09Fetcher( @@ -105,6 +113,8 @@ public class Kafka09Fetcher extends AbstractFetcher implem this.runtimeContext = runtimeContext; this.kafkaProperties = kafkaProperties; this.pollTimeout = pollTimeout; + this.nextOffsetsToCommit = new AtomicReference<>(); + this.offsetCommitCallback = new CommitCallback(); // if checkpointing is enabled, we are not automatically committing to Kafka. kafkaProperties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, @@ -203,19 +213,23 @@ public class Kafka09Fetcher extends AbstractFetcher implem // main fetch loop while (running) { + + // check if there is something to commit + final Map toCommit = nextOffsetsToCommit.getAndSet(null); + if (toCommit != null && !commitInProgress) { + // reset the work-to-be committed, so we don't repeatedly commit the same + // also record that a commit is already in progress + commitInProgress = true; + consumer.commitAsync(toCommit, offsetCommitCallback); + } + // get the next batch of records final ConsumerRecords records; - synchronized (consumerLock) { - try { - records = consumer.poll(pollTimeout); - } - catch (WakeupException we) { - if (running) { - throw we; - } else { - continue; - } - } + try { + records = consumer.poll(pollTimeout); + } + catch (WakeupException we) { + continue; } // get the records for each topic partition @@ -252,10 +266,9 @@ public class Kafka09Fetcher extends AbstractFetcher implem } finally { try { - synchronized (consumerLock) { - consumer.close(); - } - } catch (Throwable t) { + consumer.close(); + } + catch (Throwable t) { LOG.warn("Error while closing Kafka 0.9 consumer", t); } } @@ -283,10 +296,14 @@ public class Kafka09Fetcher extends AbstractFetcher implem } } - if (this.consumer != null) { - synchronized (consumerLock) { - this.consumer.commitSync(offsetsToCommit); - } + // record the work to be committed by the main consumer thread and make sure the consumer notices that + if (nextOffsetsToCommit.getAndSet(offsetsToCommit) != null) { + LOG.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + + "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " + + "This does not compromise Flink's checkpoint integrity."); + } + if (consumer != null) { + consumer.wakeup(); } } @@ -301,4 +318,16 @@ public class Kafka09Fetcher extends AbstractFetcher implem } return result; } + + private class CommitCallback implements OffsetCommitCallback { + + @Override + public void onComplete(Map offsets, Exception ex) { + commitInProgress = false; + + if (ex != null) { + LOG.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex); + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/90d77594/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java new file mode 100644 index 0000000..4fd6c9f --- /dev/null +++ b/flink-streaming-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09FetcherTest.java @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.core.testutils.MultiShotLatch; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; +import org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher; +import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; +import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; +import org.apache.flink.streaming.util.serialization.SimpleStringSchema; + +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; +import org.apache.kafka.common.TopicPartition; + +import org.junit.Test; +import org.junit.runner.RunWith; + +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.powermock.api.mockito.PowerMockito.doAnswer; +import static org.powermock.api.mockito.PowerMockito.mock; +import static org.powermock.api.mockito.PowerMockito.when; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +/** + * Unit tests for the {@link Kafka09Fetcher}. + */ +@RunWith(PowerMockRunner.class) +@PrepareForTest(Kafka09Fetcher.class) +public class Kafka09FetcherTest { + + @Test + public void testCommitDoesNotBlock() throws Exception { + + // test data + final KafkaTopicPartition testPartition = new KafkaTopicPartition("test", 42); + final Map testCommitData = new HashMap<>(); + testCommitData.put(testPartition, 11L); + + // to synchronize when the consumer is in its blocking method + final OneShotLatch sync = new OneShotLatch(); + + // ----- the mock consumer with blocking poll calls ---- + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { + + @Override + public ConsumerRecords answer(InvocationOnMock invocation) throws InterruptedException { + sync.trigger(); + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext sourceContext = mock(SourceContext.class); + List topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + + final Kafka09Fetcher fetcher = new Kafka09Fetcher<>( + sourceContext, topics, null, null, context, schema, new Properties(), 0L, false); + + // ----- run the fetcher ----- + + final AtomicReference error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // wait until the fetcher has reached the method of interest + sync.await(); + + // ----- trigger the offset commit ----- + + final AtomicReference commitError = new AtomicReference<>(); + final Thread committer = new Thread("committer runner") { + @Override + public void run() { + try { + fetcher.commitSpecificOffsetsToKafka(testCommitData); + } catch (Throwable t) { + commitError.set(t); + } + } + }; + committer.start(); + + // ----- ensure that the committer finishes in time ----- + committer.join(30000); + assertFalse("The committer did not finish in time", committer.isAlive()); + + // ----- test done, wait till the fetcher is done for a clean shutdown ----- + fetcher.cancel(); + fetcherRunner.join(); + + // check that there were no errors in the fetcher + final Throwable fetcherError = error.get(); + if (fetcherError != null) { + throw new Exception("Exception in the fetcher", fetcherError); + } + final Throwable committerError = commitError.get(); + if (committerError != null) { + throw new Exception("Exception in the committer", committerError); + } + } + + @Test + public void ensureOffsetsGetCommitted() throws Exception { + + // test data + final KafkaTopicPartition testPartition1 = new KafkaTopicPartition("test", 42); + final KafkaTopicPartition testPartition2 = new KafkaTopicPartition("another", 99); + + final Map testCommitData1 = new HashMap<>(); + testCommitData1.put(testPartition1, 11L); + testCommitData1.put(testPartition2, 18L); + + final Map testCommitData2 = new HashMap<>(); + testCommitData2.put(testPartition1, 19L); + testCommitData2.put(testPartition2, 28L); + + final BlockingQueue> commitStore = new LinkedBlockingQueue<>(); + + + // ----- the mock consumer with poll(), wakeup(), and commit(A)sync calls ---- + + final MultiShotLatch blockerLatch = new MultiShotLatch(); + + KafkaConsumer mockConsumer = mock(KafkaConsumer.class); + + when(mockConsumer.poll(anyLong())).thenAnswer(new Answer>() { + @Override + public ConsumerRecords answer(InvocationOnMock invocation) throws InterruptedException { + blockerLatch.await(); + return ConsumerRecords.empty(); + } + }); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + blockerLatch.trigger(); + return null; + } + }).when(mockConsumer).wakeup(); + + doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) { + @SuppressWarnings("unchecked") + Map offsets = + (Map) invocation.getArguments()[0]; + + OffsetCommitCallback callback = (OffsetCommitCallback) invocation.getArguments()[1]; + + commitStore.add(offsets); + callback.onComplete(offsets, null); + + return null; + } + }).when(mockConsumer).commitAsync( + Mockito.>any(), any(OffsetCommitCallback.class)); + + // make sure the fetcher creates the mock consumer + whenNew(KafkaConsumer.class).withAnyArguments().thenReturn(mockConsumer); + + // ----- create the test fetcher ----- + + @SuppressWarnings("unchecked") + SourceContext sourceContext = mock(SourceContext.class); + List topics = Collections.singletonList(new KafkaTopicPartition("test", 42)); + KeyedDeserializationSchema schema = new KeyedDeserializationSchemaWrapper<>(new SimpleStringSchema()); + StreamingRuntimeContext context = mock(StreamingRuntimeContext.class); + + final Kafka09Fetcher fetcher = new Kafka09Fetcher<>( + sourceContext, topics, null, null, context, schema, new Properties(), 0L, false); + + // ----- run the fetcher ----- + + final AtomicReference error = new AtomicReference<>(); + final Thread fetcherRunner = new Thread("fetcher runner") { + + @Override + public void run() { + try { + fetcher.runFetchLoop(); + } catch (Throwable t) { + error.set(t); + } + } + }; + fetcherRunner.start(); + + // ----- trigger the first offset commit ----- + + fetcher.commitSpecificOffsetsToKafka(testCommitData1); + Map result1 = commitStore.take(); + + for (Entry entry : result1.entrySet()) { + TopicPartition partition = entry.getKey(); + if (partition.topic().equals("test")) { + assertEquals(42, partition.partition()); + assertEquals(11L, entry.getValue().offset()); + } + else if (partition.topic().equals("another")) { + assertEquals(99, partition.partition()); + assertEquals(18L, entry.getValue().offset()); + } + } + + // ----- trigger the second offset commit ----- + + fetcher.commitSpecificOffsetsToKafka(testCommitData2); + Map result2 = commitStore.take(); + + for (Entry entry : result2.entrySet()) { + TopicPartition partition = entry.getKey(); + if (partition.topic().equals("test")) { + assertEquals(42, partition.partition()); + assertEquals(19L, entry.getValue().offset()); + } + else if (partition.topic().equals("another")) { + assertEquals(99, partition.partition()); + assertEquals(28L, entry.getValue().offset()); + } + } + + // ----- test done, wait till the fetcher is done for a clean shutdown ----- + fetcher.cancel(); + fetcherRunner.join(); + + // check that there were no errors in the fetcher + final Throwable caughtError = error.get(); + if (caughtError != null) { + throw new Exception("Exception in the fetcher", caughtError); + } + } +}