kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5096; Log invalid user configs and use defaults
Date Wed, 02 Aug 2017 13:11:28 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk edcefccfd -> b7684b47b


KAFKA-5096; Log invalid user configs and use defaults

Kafka Streams does not allow users to modify some consumer configurations.
Currently, it does not allow modifying the value of 'enable.auto.commit'.
If the user modifies this property, currently an exception is thrown.
The following changes were made in this patch:
- Defined a new array 'NON_CONFIGURABLE_CONSUMER_CONFIGS' to hold the names
  of the configuration parameters that is not allowed to be modified
- Defined a new method 'checkIfUnexpectedUserSpecifiedConsumerConfig' to
  check if user overwrote the values of any of the non configurable configuration
  parameters. If so, then log a warning message and reset the default values
- Updated the javadoc to include the configuration parameters that cannot be
  modified by users.
- Updated the corresponding tests in StreamsConfigTest.java to reflect the changes
  made in StreamsConfig.java

Author: Mariam John <mariamj@us.ibm.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Eno Thereska <eno.thereska@gmail.com>,
Guozhang Wang <wangguoz@gmail.com>, Damian Guy <damian.guy@gmail.com>

Closes #2990 from johnma14/bug/kafka-5096


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b7684b47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b7684b47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b7684b47

Branch: refs/heads/trunk
Commit: b7684b47b80a2a7d604632bb325fde49e570480f
Parents: edcefcc
Author: Mariam John <mariamj@us.ibm.com>
Authored: Wed Aug 2 14:11:23 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Wed Aug 2 14:11:23 2017 +0100

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java | 103 ++++++++++++-------
 .../apache/kafka/streams/StreamsConfigTest.java |  48 +++++----
 2 files changed, 96 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b7684b47/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 000076e..6b0e245 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -52,7 +51,7 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
 
 /**
  * Configuration for a {@link KafkaStreams} instance.
- * Can also be use to configure the Kafka Streams internal {@link KafkaConsumer} and {@link
KafkaProducer}.
+ * Can also be used to configure the Kafka Streams internal {@link KafkaConsumer} and {@link
KafkaProducer}.
  * To avoid consumer/producer property conflicts, you should prefix those properties using
  * {@link #consumerPrefix(String)} and {@link #producerPrefix(String)}, respectively.
  * <p>
@@ -73,10 +72,25 @@ import static org.apache.kafka.common.requests.IsolationLevel.READ_COMMITTED;
  *
  * StreamsConfig streamsConfig = new StreamsConfig(streamsProperties);
  * }</pre>
- * Kafka Streams required to set at least properties {@link #APPLICATION_ID_CONFIG "application.id"}
and
- * {@link #BOOTSTRAP_SERVERS_CONFIG "bootstrap.servers"}.
- * Furthermore, it is not allowed to enable {@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG
"enable.auto.commit"} that
- * is disabled by Kafka Streams by default.
+ * 
+ * Kafka Streams requires at least the following properties to be set:
+ * <ul>
+ *  <li>{@link #APPLICATION_ID_CONFIG "application.id"}</li>
+ *  <li>{@link #BOOTSTRAP_SERVERS_CONFIG "bootstrap.servers"}</li>
+ * </ul>
+ * 
+ * By default, Kafka Streams does not allow users to overwrite the following properties (Streams
setting shown in parentheses):
+ * <ul>
+ *   <li>{@link ConsumerConfig#ENABLE_AUTO_COMMIT_CONFIG "enable.auto.commit"} (false)
- Streams client will always disable/turn off auto committing</li>
+ * </ul>
+ * 
+ * If {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} is set to {@link #EXACTLY_ONCE
"exactly_once"}, Kafka Streams does not allow users to overwrite the following properties
(Streams setting shown in parentheses):
+ * <ul>
+ *   <li>{@link ConsumerConfig#ISOLATION_LEVEL_CONFIG "isolation.level"} (read_committed)
- Consumers will always read committed data only</li>
+ *   <li>{@link ProducerConfig#ENABLE_IDEMPOTENCE_CONFIG "enable.idempotence"} (true)
- Producer will always have idempotency enabled</li>
+ *   <li>{@link ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION "max.in.flight.requests.per.connection"}
(1) - Producer will always have one in-flight request per connection</li>
+ * </ul>
+ *
  *
  * @see KafkaStreams#KafkaStreams(org.apache.kafka.streams.Topology, StreamsConfig)
  * @see ConsumerConfig
@@ -285,6 +299,11 @@ public class StreamsConfig extends AbstractConfig {
     public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
     private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka
topics management. This config is deprecated and will be ignored as Streams API does not use
Zookeeper anymore.";
 
+    private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS = new String[]
{ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG};
+    private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS = new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG};
+    private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS = new String[] {ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
+                                                                                        ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION};
+
     static {
         CONFIG = new ConfigDef()
 
@@ -617,22 +636,12 @@ public class StreamsConfig extends AbstractConfig {
         return configUpdates;
     }
 
-    private Map<String, Object> getCommonConsumerConfigs() throws ConfigException {
+    private Map<String, Object> getCommonConsumerConfigs() {
         final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(CONSUMER_PREFIX,
ConsumerConfig.configNames());
 
-        // disable auto commit and throw exception if there is user overridden values,
-        // this is necessary for streams commit semantics
-        if (clientProvidedProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
-            throw new ConfigException("Unexpected user-specified consumer config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
-                + ", as the streams client will always turn off auto committing.");
-        }
-        if (eosEnabled) {
-            if (clientProvidedProps.containsKey(ConsumerConfig.ISOLATION_LEVEL_CONFIG)) {
-                throw new ConfigException("Unexpected user-specified consumer config " +
ConsumerConfig.ISOLATION_LEVEL_CONFIG
-                    + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE
+ "' consumers will always read committed data only.");
-            }
-        }
-
+        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS);
+        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS);
+        
         final Map<String, Object> consumerProps = new HashMap<>(eosEnabled ?
CONSUMER_EOS_OVERRIDES : CONSUMER_DEFAULT_OVERRIDES);
         consumerProps.putAll(clientProvidedProps);
 
@@ -643,7 +652,43 @@ public class StreamsConfig extends AbstractConfig {
 
         return consumerProps;
     }
+                 
+    private void checkIfUnexpectedUserSpecifiedConsumerConfig(final Map<String, Object>
clientProvidedProps, final String[] nonConfigurableConfigs) {
+        // Streams does not allow users to configure certain consumer/producer configurations,
for example,
+        // enable.auto.commit. In cases where user tries to override such non-configurable
+        // consumer/producer configurations, log a warning and remove the user defined value
from the Map.
+        // Thus the default values for these consumer/producer configurations that are suitable
for
+        // Streams will be used instead.
+        for (final String config: nonConfigurableConfigs) {
+            if (clientProvidedProps.containsKey(config)) {
+                final String eosMessage =  PROCESSING_GUARANTEE_CONFIG + " is set to " +
EXACTLY_ONCE + ". Hence, ";
+                final String nonConfigurableConfigMessage = "Unexpected user-specified %s
config: %s found. %sUser setting (%s) will be ignored and the Streams default setting (%s)
will be used ";
+
+                if (CONSUMER_DEFAULT_OVERRIDES.containsKey(config)) {
+                    if (!clientProvidedProps.get(config).equals(CONSUMER_DEFAULT_OVERRIDES.get(config)))
{
+                        log.warn(String.format(nonConfigurableConfigMessage, "consumer",
config, "", clientProvidedProps.get(config),  CONSUMER_DEFAULT_OVERRIDES.get(config)));
+                        clientProvidedProps.remove(config);
+                    }
+                } else if (eosEnabled) {
+                    if (CONSUMER_EOS_OVERRIDES.containsKey(config)) {
+                        if (!clientProvidedProps.get(config).equals(CONSUMER_EOS_OVERRIDES.get(config)))
{
+                            log.warn(String.format(nonConfigurableConfigMessage,
+                                    "consumer", config, eosMessage, clientProvidedProps.get(config),
CONSUMER_EOS_OVERRIDES.get(config)));
+                            clientProvidedProps.remove(config);
+                        }
+                    } else if (PRODUCER_EOS_OVERRIDES.containsKey(config)) {
+                        if (!clientProvidedProps.get(config).equals(PRODUCER_EOS_OVERRIDES.get(config)))
{
+                            log.warn(String.format(nonConfigurableConfigMessage,
+                                    "producer", config, eosMessage, clientProvidedProps.get(config),
PRODUCER_EOS_OVERRIDES.get(config)));
+                            clientProvidedProps.remove(config);
+                        }
+                    }
+                }
+            }
 
+        }
+    }
+    
     /**
      * Get the configs to the {@link KafkaConsumer consumer}.
      * Properties using the prefix {@link #CONSUMER_PREFIX} will be used in favor over their
non-prefixed versions
@@ -654,11 +699,10 @@ public class StreamsConfig extends AbstractConfig {
      * @param groupId      consumer groupId
      * @param clientId     clientId
      * @return Map of the consumer configuration.
-     * @throws ConfigException if {@code "enable.auto.commit"} was set to {@code false} by
the user
      */
     public Map<String, Object> getConsumerConfigs(final StreamThread streamThread,
                                                   final String groupId,
-                                                  final String clientId) throws ConfigException
{
+                                                  final String clientId) {
         final Map<String, Object> consumerProps = getCommonConsumerConfigs();
 
         // add client id with stream client id prefix, and group id
@@ -685,9 +729,8 @@ public class StreamsConfig extends AbstractConfig {
      *
      * @param clientId clientId
      * @return Map of the consumer configuration.
-     * @throws ConfigException if {@code "enable.auto.commit"} was set to {@code false} by
the user
      */
-    public Map<String, Object> getRestoreConsumerConfigs(final String clientId) throws
ConfigException {
+    public Map<String, Object> getRestoreConsumerConfigs(final String clientId) {
         final Map<String, Object> consumerProps = getCommonConsumerConfigs();
 
         // no need to set group id for a restore consumer
@@ -710,17 +753,7 @@ public class StreamsConfig extends AbstractConfig {
     public Map<String, Object> getProducerConfigs(final String clientId) {
         final Map<String, Object> clientProvidedProps = getClientPropsWithPrefix(PRODUCER_PREFIX,
ProducerConfig.configNames());
 
-        if (eosEnabled) {
-            if (clientProvidedProps.containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
{
-                throw new ConfigException("Unexpected user-specified consumer config " +
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG
-                    + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE
+ "' producer will always have idempotency enabled.");
-            }
-
-            if (clientProvidedProps.containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION))
{
-                throw new ConfigException("Unexpected user-specified consumer config " +
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION
-                    + "; because " + PROCESSING_GUARANTEE_CONFIG + " is set to '" + EXACTLY_ONCE
+ "' producer will always have only one in-flight request per connection.");
-            }
-        }
+        checkIfUnexpectedUserSpecifiedConsumerConfig(clientProvidedProps, NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS);
 
         // generate producer configs from original properties and overridden maps
         final Map<String, Object> props = new HashMap<>(eosEnabled ? PRODUCER_EOS_OVERRIDES
: PRODUCER_DEFAULT_OVERRIDES);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b7684b47/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 3bbd69e..69a44a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -271,18 +271,20 @@ public class StreamsConfigTest {
         assertEquals("10", consumerConfigs.get(ConsumerConfig.MAX_POLL_RECORDS_CONFIG));
     }
 
-    @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception {
+    @Test
+    public void shouldResetToDefaultIfConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
"true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        streamsConfig.getConsumerConfigs(null, "a", "b");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null,
"a", "b");
+        assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
-    @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception
{
+    @Test
+    public void shouldResetToDefaultIfRestoreConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
"true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        streamsConfig.getRestoreConsumerConfigs("client");
+        final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("client");
+        assertEquals("false", consumerConfigs.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
     @Test
@@ -312,50 +314,56 @@ public class StreamsConfigTest {
         new StreamsConfig(props);
     }
 
-    @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfConsumerIsolationLevelIsOverriddenIfEosEnabled() {
+    @Test
+    public void shouldResetToDefaultIfConsumerIsolationLevelIsOverriddenIfEosEnabled() {
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "anyValue");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null,
"groupId", "clientId");
+        assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_COMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
     @Test
     public void shouldAllowSettingConsumerIsolationLevelIfEosDisabled() {
         props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT));
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
+        final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null,
"groupId", "clientrId");
+        assertThat((String) consumerConfigs.get(ConsumerConfig.ISOLATION_LEVEL_CONFIG), equalTo(READ_UNCOMMITTED.name().toLowerCase(Locale.ROOT)));
     }
 
 
-    @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfProducerEnableIdempotenceIsOverriddenIfEosEnabled()
{
+    @Test
+    public void shouldResetToDefaultIfProducerEnableIdempotenceIsOverriddenIfEosEnabled()
{
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "anyValue");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        assertTrue((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG));
     }
 
     @Test
     public void shouldAllowSettingProducerEnableIdempotenceIfEosDisabled() {
-        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
+        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, false);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        assertThat((Boolean) producerConfigs.get(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG),
equalTo(false));
     }
 
-    @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled()
{
+    @Test
+    public void shouldResetToDefaultIfProducerMaxInFlightRequestPerConnectionsIsOverriddenIfEosEnabled()
{
         props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, EXACTLY_ONCE);
         props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "anyValue");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
equalTo(1));
     }
 
     @Test
     public void shouldAllowSettingProducerMaxInFlightRequestPerConnectionsWhenEosDisabled()
{
-        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "anyValue");
+        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 2);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
-        streamsConfig.getProducerConfigs("clientId");
+        final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
+        assertThat((Integer) producerConfigs.get(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
equalTo(2));
     }
 
     @Test


Mime
View raw message