From commits-return-8810-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Jan 30 19:19:00 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 420A718061A for ; Tue, 30 Jan 2018 19:19:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 31B4D160C53; Tue, 30 Jan 2018 18:19:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 54499160C2A for ; Tue, 30 Jan 2018 19:18:59 +0100 (CET) Received: (qmail 89421 invoked by uid 500); 30 Jan 2018 18:18:58 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 89412 invoked by uid 99); 30 Jan 2018 18:18:58 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 Jan 2018 18:18:58 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 6A7458205D; Tue, 30 Jan 2018 18:18:57 +0000 (UTC) Date: Tue, 30 Jan 2018 18:18:57 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-6166: Streams configuration requires consumer. and producer. in order to be read (#4434) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <151733633701.18254.1273756771430695915@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 710aa678b7a6409296fb3c852a47c1876d8fa8e9 X-Git-Newrev: cb93d764613d801a1185989f09ce2d6b76009020 X-Git-Rev: cb93d764613d801a1185989f09ce2d6b76009020 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new cb93d76 KAFKA-6166: Streams configuration requires consumer. and producer. in order to be read (#4434) cb93d76 is described below commit cb93d764613d801a1185989f09ce2d6b76009020 Author: Filipe Agapito AuthorDate: Tue Jan 30 18:18:51 2018 +0000 KAFKA-6166: Streams configuration requires consumer. and producer. in order to be read (#4434) * Implement method to get custom properties * Add custom properties to getConsumerConfigs and getProducerConfigs * Add tests Reviewers: Matthias J. Sax , Guozhang Wang --- .../org/apache/kafka/streams/StreamsConfig.java | 28 ++++++++++++++++- .../apache/kafka/streams/StreamsConfigTest.java | 36 +++++++++++++++++++--- 2 files changed, 59 insertions(+), 5 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 0feb48d..1393223 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -693,6 +693,7 @@ public class StreamsConfig extends AbstractConfig { checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS); final Map consumerProps = new HashMap<>(eosEnabled ? CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES); + consumerProps.putAll(getClientCustomProps()); consumerProps.putAll(clientProvidedProps); // bootstrap.servers should be from StreamsConfig @@ -832,6 +833,7 @@ public class StreamsConfig extends AbstractConfig { // generate producer configs from original properties and overridden maps final Map props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES : PRODUCER_DEFAULT_OVERRIDES); + props.putAll(getClientCustomProps()); props.putAll(clientProvidedProps); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG)); @@ -847,7 +849,11 @@ public class StreamsConfig extends AbstractConfig { * @return Map of the admin client configuration. */ public Map getAdminConfigs(final String clientId) { - final Map props = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()); + final Map clientProvidedProps = getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames()); + + final Map props = new HashMap<>(); + props.putAll(getClientCustomProps()); + props.putAll(clientProvidedProps); // add client id with stream client id prefix props.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-admin"); @@ -863,6 +869,26 @@ public class StreamsConfig extends AbstractConfig { } /** + * Get a map of custom configs by removing from the originals all the Streams, Consumer, Producer, and AdminClient configs. + * Prefixed properties are also removed because they are already added by {@link #getClientPropsWithPrefix(String, Set)}. + * This allows to set a custom property for a specific client alone if specified using a prefix, or for all + * when no prefix is used. + * + * @return a map with the custom properties + */ + private Map getClientCustomProps() { + final Map props = originals(); + props.keySet().removeAll(CONFIG.names()); + props.keySet().removeAll(ConsumerConfig.configNames()); + props.keySet().removeAll(ProducerConfig.configNames()); + props.keySet().removeAll(AdminClientConfig.configNames()); + props.keySet().removeAll(originalsWithPrefix(CONSUMER_PREFIX, false).keySet()); + props.keySet().removeAll(originalsWithPrefix(PRODUCER_PREFIX, false).keySet()); + props.keySet().removeAll(originalsWithPrefix(ADMIN_CLIENT_PREFIX, false).keySet()); + return props; + } + + /** * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #KEY_SERDE_CLASS_CONFIG key Serde * class}. This method is deprecated. Use {@link #defaultKeySerde()} method instead. * diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 1d6b5a5..cc072d5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -45,6 +45,7 @@ import java.util.Properties; import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED; import static org.apache.kafka.common.requests.IsolationLevel.READ_UNCOMMITTED; import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; +import static org.apache.kafka.streams.StreamsConfig.adminClientPrefix; import static org.apache.kafka.streams.StreamsConfig.consumerPrefix; import static org.apache.kafka.streams.StreamsConfig.producerPrefix; import static org.apache.kafka.test.StreamsTestUtils.minimalStreamsConfig; @@ -66,7 +67,6 @@ public class StreamsConfigTest { props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); - props.put("DUMMY", "dummy"); props.put("key.deserializer.encoding", "UTF8"); props.put("value.deserializer.encoding", "UTF-16"); streamsConfig = new StreamsConfig(props); @@ -90,7 +90,6 @@ public class StreamsConfigTest { final Map returnedProps = streamsConfig.getProducerConfigs(clientId); assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer"); assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100"); - assertNull(returnedProps.get("DUMMY")); } @Test @@ -101,7 +100,6 @@ public class StreamsConfigTest { assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer"); assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId); assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000"); - assertNull(returnedProps.get("DUMMY")); } @Test @@ -148,7 +146,6 @@ public class StreamsConfigTest { final Map returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId); assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer"); assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG)); - assertNull(returnedProps.get("DUMMY")); } @Test @@ -265,6 +262,37 @@ public class StreamsConfigTest { } @Test + public void shouldForwardCustomConfigsWithNoPrefixToAllClients() { + final StreamsConfig streamsConfig = new StreamsConfig(props); + props.put("custom.property.host", "host"); + final Map consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId"); + final Map restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); + final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); + final Map adminConfigs = streamsConfig.getAdminConfigs("clientId"); + assertEquals("host", consumerConfigs.get("custom.property.host")); + assertEquals("host", restoreConsumerConfigs.get("custom.property.host")); + assertEquals("host", producerConfigs.get("custom.property.host")); + assertEquals("host", adminConfigs.get("custom.property.host")); + } + + @Test + public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() { + final StreamsConfig streamsConfig = new StreamsConfig(props); + props.put("custom.property.host", "host0"); + props.put(consumerPrefix("custom.property.host"), "host1"); + props.put(producerPrefix("custom.property.host"), "host2"); + props.put(adminClientPrefix("custom.property.host"), "host3"); + final Map consumerConfigs = streamsConfig.getConsumerConfigs("groupId", "clientId"); + final Map restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId"); + final Map producerConfigs = streamsConfig.getProducerConfigs("clientId"); + final Map adminConfigs = streamsConfig.getAdminConfigs("clientId"); + assertEquals("host1", consumerConfigs.get("custom.property.host")); + assertEquals("host1", restoreConsumerConfigs.get("custom.property.host")); + assertEquals("host2", producerConfigs.get("custom.property.host")); + assertEquals("host3", adminConfigs.get("custom.property.host")); + } + + @Test public void shouldSupportNonPrefixedAdminConfigs() { props.put(AdminClientConfig.RETRIES_CONFIG, 10); final StreamsConfig streamsConfig = new StreamsConfig(props); -- To stop receiving notification emails like this one, please contact guozhang@apache.org.