Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E9DB4200C44 for ; Mon, 27 Mar 2017 19:30:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E88E1160B85; Mon, 27 Mar 2017 17:30:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8EEB5160B7B for ; Mon, 27 Mar 2017 19:30:46 +0200 (CEST) Received: (qmail 11457 invoked by uid 500); 27 Mar 2017 17:30:45 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 11448 invoked by uid 99); 27 Mar 2017 17:30:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Mar 2017 17:30:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 980E3DFE1E; Mon, 27 Mar 2017 17:30:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-4881: add internal leave.group.on.close config to consumer Date: Mon, 27 Mar 2017 17:30:45 +0000 (UTC) archived-at: Mon, 27 Mar 2017 17:30:48 -0000 Repository: kafka Updated Branches: refs/heads/trunk d27e09e60 -> 1abed91bd KAFKA-4881: add internal leave.group.on.close config to consumer Author: Damian Guy Reviewers: Ismael Juma, Guozhang Wang Closes #2650 from dguy/consumer-leave-group-config Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1abed91b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1abed91b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1abed91b Branch: refs/heads/trunk Commit: 1abed91bd2915226cf6320395e1a5877ea0705d7 Parents: d27e09e Author: Damian Guy Authored: Mon Mar 27 10:30:38 2017 -0700 Committer: Guozhang Wang Committed: Mon Mar 27 10:30:38 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/ConsumerConfig.java | 18 ++++- .../kafka/clients/consumer/KafkaConsumer.java | 31 ++++----- .../consumer/internals/AbstractCoordinator.java | 9 ++- .../consumer/internals/ConsumerCoordinator.java | 20 +++--- .../apache/kafka/common/config/ConfigDef.java | 33 ++++++++-- .../clients/consumer/KafkaConsumerTest.java | 3 +- .../internals/AbstractCoordinatorTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 69 +++++++++++--------- .../kafka/common/config/ConfigDefTest.java | 22 +++++++ .../runtime/distributed/WorkerCoordinator.java | 17 ++--- 10 files changed, 152 insertions(+), 72 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 26a7d5d..abb8255 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -222,7 +222,18 @@ public class ConsumerConfig extends AbstractConfig { private static final String EXCLUDE_INTERNAL_TOPICS_DOC = "Whether records from internal topics (such as offsets) should be exposed to the consumer. " + "If set to true the only way to receive records from an internal topic is subscribing to it."; public static final boolean DEFAULT_EXCLUDE_INTERNAL_TOPICS = true; - + + /** + * internal.leave.group.on.close + * Whether or not the consumer should leave the group on close. If set to false then a rebalance + * won't occur until session.timeout.ms expires. + * + *

+ * Note: this is an internal configuration and could be changed in the future in a backward incompatible way + * + */ + static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close"; + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, @@ -390,7 +401,10 @@ public class ConsumerConfig extends AbstractConfig { DEFAULT_EXCLUDE_INTERNAL_TOPICS, Importance.MEDIUM, EXCLUDE_INTERNAL_TOPICS_DOC) - + .defineInternal(LEAVE_GROUP_ON_CLOSE_CONFIG, + Type.BOOLEAN, + true, + Importance.LOW) // security support .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 3894b5f..01d9463 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -680,21 +680,22 @@ public class KafkaConsumer implements Consumer { ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, PartitionAssignor.class); this.coordinator = new ConsumerCoordinator(this.client, - config.getString(ConsumerConfig.GROUP_ID_CONFIG), - config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), - config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), - config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), - assignors, - this.metadata, - this.subscriptions, - metrics, - metricGrpPrefix, - this.time, - retryBackoffMs, - config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), - config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), - this.interceptors, - config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG)); + config.getString(ConsumerConfig.GROUP_ID_CONFIG), + config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG), + config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG), + config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG), + assignors, + this.metadata, + this.subscriptions, + metrics, + metricGrpPrefix, + this.time, + retryBackoffMs, + config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), + config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), + this.interceptors, + config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG), + config.getBoolean(ConsumerConfig.LEAVE_GROUP_ON_CLOSE_CONFIG)); this.fetcher = new Fetcher<>(this.client, config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG), config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG), http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index db665b6..1ebce76 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -102,6 +102,7 @@ public abstract class AbstractCoordinator implements Closeable { protected final int rebalanceTimeoutMs; private final int sessionTimeoutMs; + private final boolean leaveGroupOnClose; private final GroupCoordinatorMetrics sensors; private final Heartbeat heartbeat; protected final String groupId; @@ -130,12 +131,14 @@ public abstract class AbstractCoordinator implements Closeable { Metrics metrics, String metricGrpPrefix, Time time, - long retryBackoffMs) { + long retryBackoffMs, + boolean leaveGroupOnClose) { this.client = client; this.time = time; this.groupId = groupId; this.rebalanceTimeoutMs = rebalanceTimeoutMs; this.sessionTimeoutMs = sessionTimeoutMs; + this.leaveGroupOnClose = leaveGroupOnClose; this.heartbeat = new Heartbeat(sessionTimeoutMs, heartbeatIntervalMs, rebalanceTimeoutMs, retryBackoffMs); this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix); this.retryBackoffMs = retryBackoffMs; @@ -677,7 +680,9 @@ public abstract class AbstractCoordinator implements Closeable { // Synchronize after closing the heartbeat thread since heartbeat thread // needs this lock to complete and terminate after close flag is set. synchronized (this) { - maybeLeaveGroup(); + if (leaveGroupOnClose) { + maybeLeaveGroup(); + } // At this point, there may be pending commits (async commits or sync commits that were // interrupted using wakeup) and the leave group request which have been queued, but not http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index ba19146..fa2e31a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -108,16 +108,18 @@ public final class ConsumerCoordinator extends AbstractCoordinator { boolean autoCommitEnabled, int autoCommitIntervalMs, ConsumerInterceptors interceptors, - boolean excludeInternalTopics) { + boolean excludeInternalTopics, + final boolean leaveGroupOnClose) { super(client, - groupId, - rebalanceTimeoutMs, - sessionTimeoutMs, - heartbeatIntervalMs, - metrics, - metricGrpPrefix, - time, - retryBackoffMs); + groupId, + rebalanceTimeoutMs, + sessionTimeoutMs, + heartbeatIntervalMs, + metrics, + metricGrpPrefix, + time, + retryBackoffMs, + leaveGroupOnClose); this.metadata = metadata; this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch()); this.subscriptions = subscriptions; http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 3396f63..c4eac78 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -132,7 +132,7 @@ public class ConfigDef { */ public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, String group, int orderInGroup, Width width, String displayName, List dependents, Recommender recommender) { - return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender)); + return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender, false)); } /** @@ -382,6 +382,19 @@ public class ConfigDef { } /** + * Define a new internal configuration. Internal configuration won't show up in the docs and aren't + * intended for general use. + * @param name The name of the config parameter + * @param type The type of the config + * @param defaultValue The default value to use if this config isn't present + * @param importance + * @return This ConfigDef so you can chain calls + */ + public ConfigDef defineInternal(final String name, final Type type, final Object defaultValue, final Importance importance) { + return define(new ConfigKey(name, type, defaultValue, null, importance, "", "", -1, Width.NONE, name, Collections.emptyList(), null, true)); + } + + /** * Get the configuration keys * @return a map containing all configuration keys */ @@ -900,11 +913,13 @@ public class ConfigDef { public final String displayName; public final List dependents; public final Recommender recommender; + public final boolean internalConfig; public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, String group, int orderInGroup, Width width, String displayName, - List dependents, Recommender recommender) { + List dependents, Recommender recommender, + boolean internalConfig) { this.name = name; this.type = type; this.defaultValue = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type); @@ -919,6 +934,7 @@ public class ConfigDef { this.width = width; this.displayName = displayName; this.recommender = recommender; + this.internalConfig = internalConfig; } public boolean hasDefault() { @@ -971,6 +987,9 @@ public class ConfigDef { } b.append("\n"); for (ConfigKey key : configs) { + if (key.internalConfig) { + continue; + } b.append("\n"); // print column values for (String headerName : headers()) { @@ -991,6 +1010,9 @@ public class ConfigDef { public String toRst() { StringBuilder b = new StringBuilder(); for (ConfigKey key : sortedConfigs()) { + if (key.internalConfig) { + continue; + } getConfigKeyRst(key, b); b.append("\n"); } @@ -1006,6 +1028,9 @@ public class ConfigDef { String lastKeyGroupName = ""; for (ConfigKey key : sortedConfigs()) { + if (key.internalConfig) { + continue; + } if (key.group != null) { if (!lastKeyGroupName.equalsIgnoreCase(key.group)) { b.append(key.group).append("\n"); @@ -1114,8 +1139,8 @@ public class ConfigDef { key.width, key.displayName, embeddedDependents(keyPrefix, key.dependents), - embeddedRecommender(keyPrefix, key.recommender) - )); + embeddedRecommender(keyPrefix, key.recommender), + key.internalConfig)); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 7fb3552..ec60209 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1516,7 +1516,8 @@ public class KafkaConsumerTest { autoCommitEnabled, autoCommitIntervalMs, interceptors, - excludeInternalTopics); + excludeInternalTopics, + true); Fetcher fetcher = new Fetcher<>( consumerClient, http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index 45ee29a..62801d0 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -492,7 +492,7 @@ public class AbstractCoordinatorTest { Metrics metrics, Time time) { super(client, GROUP_ID, REBALANCE_TIMEOUT_MS, SESSION_TIMEOUT_MS, HEARTBEAT_INTERVAL_MS, metrics, - METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS); + METRIC_GROUP_PREFIX, time, RETRY_BACKOFF_MS, false); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 8bf8a63..f0b289f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -129,7 +129,7 @@ public class ConsumerCoordinatorTest { this.partitionAssignor.clear(); client.setNode(node); - this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled); + this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled, true); } @After @@ -861,7 +861,7 @@ public class ConsumerCoordinatorTest { @Test public void testIncludeInternalTopicsConfigOption() { - coordinator = buildCoordinator(new Metrics(), assignors, false, false); + coordinator = buildCoordinator(new Metrics(), assignors, false, false, true); subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener); metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.emptySet(), time.milliseconds()); @@ -955,7 +955,7 @@ public class ConsumerCoordinatorTest { final String consumerId = "consumer"; ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); subscriptions.subscribe(singleton(topic1), rebalanceListener); @@ -980,7 +980,7 @@ public class ConsumerCoordinatorTest { final String consumerId = "consumer"; ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); subscriptions.subscribe(singleton(topic1), rebalanceListener); @@ -1007,7 +1007,7 @@ public class ConsumerCoordinatorTest { @Test public void testAutoCommitManualAssignment() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); subscriptions.assignFromUser(singleton(t1p)); subscriptions.seek(t1p, 100); @@ -1025,7 +1025,7 @@ public class ConsumerCoordinatorTest { @Test public void testAutoCommitManualAssignmentCoordinatorUnknown() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true); subscriptions.assignFromUser(singleton(t1p)); subscriptions.seek(t1p, 100); @@ -1376,7 +1376,7 @@ public class ConsumerCoordinatorTest { try (Metrics metrics = new Metrics(time)) { ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.asList(roundRobin, range), - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true); List metadata = coordinator.metadata(); assertEquals(2, metadata.size()); assertEquals(roundRobin.name(), metadata.get(0).name()); @@ -1385,7 +1385,7 @@ public class ConsumerCoordinatorTest { try (Metrics metrics = new Metrics(time)) { ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.asList(range, roundRobin), - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true); List metadata = coordinator.metadata(); assertEquals(2, metadata.size()); assertEquals(range.name(), metadata.get(0).name()); @@ -1395,19 +1395,25 @@ public class ConsumerCoordinatorTest { @Test public void testCloseDynamicAssignment() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); gracefulCloseTest(coordinator, true); } @Test public void testCloseManualAssignment() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true); + gracefulCloseTest(coordinator, false); + } + + @Test + public void shouldNotLeaveGroupWhenLeaveGroupFlagIsFalse() throws Exception { + final ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, false); gracefulCloseTest(coordinator, false); } @Test public void testCloseCoordinatorNotKnownManualAssignment() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(false, true, true); makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP); time.sleep(autoCommitIntervalMs); closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000); @@ -1415,14 +1421,14 @@ public class ConsumerCoordinatorTest { @Test public void testCloseCoordinatorNotKnownNoCommits() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true); makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP); closeVerifyTimeout(coordinator, 1000, 60000, 0, 0); } @Test public void testCloseCoordinatorNotKnownWithCommits() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR_FOR_GROUP); time.sleep(autoCommitIntervalMs); closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000); @@ -1430,14 +1436,14 @@ public class ConsumerCoordinatorTest { @Test public void testCloseCoordinatorUnavailableNoCommits() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true); makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE); closeVerifyTimeout(coordinator, 1000, 60000, 0, 0); } @Test public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE); time.sleep(autoCommitIntervalMs); closeVerifyTimeout(coordinator, 1000, 60000, 1000, 1000); @@ -1445,7 +1451,7 @@ public class ConsumerCoordinatorTest { @Test public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); makeCoordinatorUnknown(coordinator, Errors.GROUP_COORDINATOR_NOT_AVAILABLE); time.sleep(autoCommitIntervalMs); closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); @@ -1453,20 +1459,20 @@ public class ConsumerCoordinatorTest { @Test public void testCloseNoResponseForCommit() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); time.sleep(autoCommitIntervalMs); closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); } @Test public void testCloseNoResponseForLeaveGroup() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, false, true); closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000, 60000, 60000); } @Test public void testCloseNoWait() throws Exception { - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); time.sleep(autoCommitIntervalMs); closeVerifyTimeout(coordinator, 0, 60000, 0, 0); } @@ -1474,7 +1480,7 @@ public class ConsumerCoordinatorTest { @Test public void testHeartbeatThreadClose() throws Exception { groupId = "testCloseTimeoutWithHeartbeatThread"; - ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true); + ConsumerCoordinator coordinator = prepareCoordinatorForCloseTest(true, true, true); coordinator.ensureActiveGroup(); time.sleep(heartbeatIntervalMs + 100); Thread.yield(); // Give heartbeat thread a chance to attempt heartbeat @@ -1485,10 +1491,12 @@ public class ConsumerCoordinatorTest { assertFalse("Heartbeat thread active after close", threads[i].getName().contains(groupId)); } - private ConsumerCoordinator prepareCoordinatorForCloseTest(boolean useGroupManagement, boolean autoCommit) { + private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGroupManagement, + final boolean autoCommit, + final boolean leaveGroup) { final String consumerId = "consumer"; ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, - ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit); + ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommit, leaveGroup); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); coordinator.ensureCoordinatorReady(); if (useGroupManagement) { @@ -1547,7 +1555,7 @@ public class ConsumerCoordinatorTest { } } - private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean dynamicAssignment) throws Exception { + private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean shouldLeaveGroup) throws Exception { final AtomicBoolean commitRequested = new AtomicBoolean(); final AtomicBoolean leaveGroupRequested = new AtomicBoolean(); client.prepareResponse(new MockClient.RequestMatcher() { @@ -1569,14 +1577,14 @@ public class ConsumerCoordinatorTest { coordinator.close(); assertTrue("Commit not requested", commitRequested.get()); - if (dynamicAssignment) - assertTrue("Leave group not requested", leaveGroupRequested.get()); + assertEquals("leaveGroupRequested should be " + shouldLeaveGroup, shouldLeaveGroup, leaveGroupRequested.get()); } - private ConsumerCoordinator buildCoordinator(Metrics metrics, - List assignors, - boolean excludeInternalTopics, - boolean autoCommitEnabled) { + private ConsumerCoordinator buildCoordinator(final Metrics metrics, + final List assignors, + final boolean excludeInternalTopics, + final boolean autoCommitEnabled, + final boolean leaveGroup) { return new ConsumerCoordinator( consumerClient, groupId, @@ -1593,7 +1601,8 @@ public class ConsumerCoordinatorTest { autoCommitEnabled, autoCommitIntervalMs, null, - excludeInternalTopics); + excludeInternalTopics, + leaveGroup); } private GroupCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) { http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index 10271ca..11e5803 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -35,6 +35,7 @@ import java.util.Properties; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; public class ConfigDefTest { @@ -323,6 +324,27 @@ public class ConfigDefTest { } } + @Test + public void testCanAddInternalConfig() throws Exception { + final String configName = "internal.config"; + final ConfigDef configDef = new ConfigDef().defineInternal(configName, Type.STRING, "", Importance.LOW); + final HashMap properties = new HashMap<>(); + properties.put(configName, "value"); + final List results = configDef.validate(properties); + final ConfigValue configValue = results.get(0); + assertEquals("value", configValue.value()); + assertEquals(configName, configValue.name()); + } + + @Test + public void testInternalConfigDoesntShowUpInDocs() throws Exception { + final String name = "my.config"; + final ConfigDef configDef = new ConfigDef().defineInternal(name, Type.STRING, "", Importance.LOW); + assertFalse(configDef.toHtmlTable().contains("my.config")); + assertFalse(configDef.toEnrichedRst().contains("my.config")); + assertFalse(configDef.toRst().contains("my.config")); + } + private static class IntegerRecommender implements ConfigDef.Recommender { private boolean hasParent; http://git-wip-us.apache.org/repos/asf/kafka/blob/1abed91b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 0dae54b..0252ae9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -74,14 +74,15 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos ConfigBackingStore configStorage, WorkerRebalanceListener listener) { super(client, - groupId, - rebalanceTimeoutMs, - sessionTimeoutMs, - heartbeatIntervalMs, - metrics, - metricGrpPrefix, - time, - retryBackoffMs); + groupId, + rebalanceTimeoutMs, + sessionTimeoutMs, + heartbeatIntervalMs, + metrics, + metricGrpPrefix, + time, + retryBackoffMs, + true); this.restUrl = restUrl; this.configStorage = configStorage; this.assignmentSnapshot = null;