kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-6166: Streams configuration requires consumer. and producer. in order to be read (#4434)
Date Tue, 30 Jan 2018 18:29:54 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 10312ed  KAFKA-6166: Streams configuration requires consumer. and producer. in order
to be read (#4434)
10312ed is described below

commit 10312ed805164643852a8fb21f705db81b952d8e
Author: Filipe Agapito <filipe.agapito@gmail.com>
AuthorDate: Tue Jan 30 18:18:51 2018 +0000

    KAFKA-6166: Streams configuration requires consumer. and producer. in order to be read
(#4434)
    
    * Implement method to get custom properties
    * Add custom properties to getConsumerConfigs and getProducerConfigs
    * Add tests
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/streams/StreamsConfig.java    | 20 ++++++++++++++++
 .../apache/kafka/streams/StreamsConfigTest.java    | 28 ++++++++++++++++++----
 2 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 7fc3d5c..42e65df 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -644,6 +644,7 @@ public class StreamsConfig extends AbstractConfig {
         checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
         
         final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ?
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
+        consumerProps.putAll(getClientCustomProps());
         consumerProps.putAll(clientProvidedProps);
 
         // bootstrap.servers should be from StreamsConfig
@@ -758,6 +759,7 @@ public class StreamsConfig extends AbstractConfig {
 
         // generate producer configs from original properties and overridden maps
         final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES
: PRODUCER_DEFAULT_OVERRIDES);
+        props.putAll(getClientCustomProps());
         props.putAll(clientProvidedProps);
 
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
@@ -775,6 +777,24 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
+     * Get a map of custom configs by removing from the originals all the Streams, Consumer,
Producer, and AdminClient configs.
+     * Prefixed properties are also removed because they are already added by {@link #getClientPropsWithPrefix(String,
Set)}.
+     * This allows to set a custom property for a specific client alone if specified using
a prefix, or for all
+     * when no prefix is used.
+     *
+     * @return a map with the custom properties
+     */
+    private Map<String, Object> getClientCustomProps() {
+        final Map<String, Object> props = originals();
+        props.keySet().removeAll(CONFIG.names());
+        props.keySet().removeAll(ConsumerConfig.configNames());
+        props.keySet().removeAll(ProducerConfig.configNames());
+        props.keySet().removeAll(originalsWithPrefix(CONSUMER_PREFIX, false).keySet());
+        props.keySet().removeAll(originalsWithPrefix(PRODUCER_PREFIX, false).keySet());
+        return props;
+    }
+
+    /**
      * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #KEY_SERDE_CLASS_CONFIG
key Serde
      * class}. This method is deprecated. Use {@link #defaultKeySerde()} method instead.
      *
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3774a8e..b217bf9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -63,7 +63,6 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        props.put("DUMMY", "dummy");
         props.put("key.deserializer.encoding", "UTF8");
         props.put("value.deserializer.encoding", "UTF-16");
         streamsConfig = new StreamsConfig(props);
@@ -87,7 +86,6 @@ public class StreamsConfigTest {
         final Map<String, Object> returnedProps = streamsConfig.getProducerConfigs(clientId);
         assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), clientId + "-producer");
         assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100");
-        assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
@@ -98,7 +96,6 @@ public class StreamsConfigTest {
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-consumer");
         assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), groupId);
         assertEquals(returnedProps.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "1000");
-        assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
@@ -107,7 +104,6 @@ public class StreamsConfigTest {
         final Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs(clientId);
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), clientId + "-restore-consumer");
         assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
-        assertNull(returnedProps.get("DUMMY"));
     }
 
     @Test
@@ -227,7 +223,31 @@ public class StreamsConfigTest {
         assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
+    @Test
+    public void shouldForwardCustomConfigsWithNoPrefixToAllClients() {
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        props.put("custom.property.host", "host");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null,
"groupId", "clientId");
+        final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        assertEquals("host", consumerConfigs.get("custom.property.host"));
+        assertEquals("host", restoreConsumerConfigs.get("custom.property.host"));
+        assertEquals("host", producerConfigs.get("custom.property.host"));
+    }
 
+    @Test
+    public void shouldOverrideNonPrefixedCustomConfigsWithPrefixedConfigs() {
+        final StreamsConfig streamsConfig = new StreamsConfig(props);
+        props.put("custom.property.host", "host0");
+        props.put(consumerPrefix("custom.property.host"), "host1");
+        props.put(producerPrefix("custom.property.host"), "host2");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null,
"groupId", "clientId");
+        final Map<String, Object> restoreConsumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        assertEquals("host1", consumerConfigs.get("custom.property.host"));
+        assertEquals("host1", restoreConsumerConfigs.get("custom.property.host"));
+        assertEquals("host2", producerConfigs.get("custom.property.host"));
+    }
 
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message