kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Remove deprecated streams config (#4906)
Date Thu, 26 Apr 2018 20:16:57 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8725e36  MINOR: Remove deprecated streams config (#4906)
8725e36 is described below

commit 8725e3604b31d0bf822959fc6d870512411c7a05
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu Apr 26 13:16:51 2018 -0700

    MINOR: Remove deprecated streams config (#4906)
    
    Removed the following: "zookeeper.connect", "key.serde", "value.serde", "timestamp.extractor"
    
    Reviewers: Bill Bejeck <bill@confluent.io>, John Roesler <john@confluent.io>,
Jason Gustafson <jason@confluent.io>
---
 .../org/apache/kafka/streams/StreamsConfig.java    | 105 ++-------------------
 .../apache/kafka/streams/StreamsConfigTest.java    |  47 +--------
 2 files changed, 8 insertions(+), 144 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index e46d6d0..23e69c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -279,14 +279,6 @@ public class StreamsConfig extends AbstractConfig {
     public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor";
     private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp
extractor class that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code>
interface.";
 
-    /**
-     * {@code key.serde}
-     * @deprecated Use {@link #DEFAULT_KEY_SERDE_CLASS_CONFIG} instead.
-     */
-    @Deprecated
-    public static final String KEY_SERDE_CLASS_CONFIG = "key.serde";
-    private static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for
key that implements the <code>org.apache.kafka.common.serialization.Serde</code>
interface. This config is deprecated, use <code>" + DEFAULT_KEY_SERDE_CLASS_CONFIG +
"</code> instead";
-
     /** {@code metadata.max.age.ms} */
     public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
 
@@ -363,40 +355,16 @@ public class StreamsConfig extends AbstractConfig {
     public static final String STATE_DIR_CONFIG = "state.dir";
     private static final String STATE_DIR_DOC = "Directory location for state store.";
 
-    /**
-     * {@code timestamp.extractor}
-     * @deprecated Use {@link #DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG} instead.
-     */
-    @Deprecated
-    public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor";
-    private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class
that implements the <code>org.apache.kafka.streams.processor.TimestampExtractor</code>
interface. This config is deprecated, use <code>" + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG
+ "</code> instead";
-
     /** {@code upgrade.from} */
     public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
     public static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1
to version 1.2 (or newer) in a backward compatible way. " +
         "When upgrading from 1.2 to a newer version it is not required to specify this config."
+
         "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101
+ "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10
+ "\", \"" + UPGRADE_FROM_11 + "\" (for upgrading from the corresponding old version).";
 
-    /**
-     * {@code value.serde}
-     * @deprecated Use {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG} instead.
-     */
-    @Deprecated
-    public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde";
-    private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class
for value that implements the <code>org.apache.kafka.common.serialization.Serde</code>
interface. This config is deprecated, use <code>" + DEFAULT_VALUE_SERDE_CLASS_CONFIG
+ "</code> instead";
-
     /** {@code windowstore.changelog.additional.retention.ms} */
     public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms";
     private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added
to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for
clock drift. Default is 1 day";
 
-    /**
-     * {@code zookeeper.connect}
-     * @deprecated Kafka Streams does not use Zookeeper anymore and this parameter will be
ignored.
-     */
-    @Deprecated
-    public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
-    private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka
topics management. This config is deprecated and will be ignored as Streams API does not use
Zookeeper anymore.";
-
     private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS = new String[]
{ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG};
     private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS = new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG};
     private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS = new String[] {ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
@@ -609,30 +577,7 @@ public class StreamsConfig extends AbstractConfig {
                     Type.LONG,
                     24 * 60 * 60 * 1000L,
                     Importance.LOW,
-                    WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC)
-
-            // @deprecated
-
-            .define(KEY_SERDE_CLASS_CONFIG,
-                    Type.CLASS,
-                    null,
-                    Importance.LOW,
-                    KEY_SERDE_CLASS_DOC)
-            .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
-                    Type.CLASS,
-                    null,
-                    Importance.LOW,
-                    TIMESTAMP_EXTRACTOR_CLASS_DOC)
-            .define(VALUE_SERDE_CLASS_CONFIG,
-                    Type.CLASS,
-                    null,
-                    Importance.LOW,
-                    VALUE_SERDE_CLASS_DOC)
-            .define(ZOOKEEPER_CONNECT_CONFIG,
-                    Type.STRING,
-                    "",
-                    Importance.LOW,
-                    ZOOKEEPER_CONNECT_DOC);
+                    WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC);
     }
 
     // this is the list of configs for underlying clients
@@ -773,8 +718,6 @@ public class StreamsConfig extends AbstractConfig {
 
         // bootstrap.servers should be from StreamsConfig
         consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG));
-        // remove deprecate ZK config
-        consumerProps.remove(ZOOKEEPER_CONNECT_CONFIG);
 
         return consumerProps;
     }
@@ -969,30 +912,15 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
-     * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #KEY_SERDE_CLASS_CONFIG
key Serde
-     * class}. This method is deprecated. Use {@link #defaultKeySerde()} method instead.
-     *
-     * @return an configured instance of key Serde class
-     */
-    @Deprecated
-    public Serde keySerde() {
-        return defaultKeySerde();
-    }
-
-    /**
      * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_KEY_SERDE_CLASS_CONFIG
key Serde
      * class}.
      *
      * @return an configured instance of key Serde class
      */
     public Serde defaultKeySerde() {
-        Object keySerdeConfigSetting = get(KEY_SERDE_CLASS_CONFIG);
+        Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
         try {
-            Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class);
-            if (serde == null) {
-                keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
-                serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
-            }
+            Serde<?> serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serde.class);
             serde.configure(originals(), true);
             return serde;
         } catch (final Exception e) {
@@ -1002,32 +930,16 @@ public class StreamsConfig extends AbstractConfig {
     }
 
     /**
-     * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #VALUE_SERDE_CLASS_CONFIG
value
-     * Serde class}. This method is deprecated. Use {@link #defaultValueSerde()} instead.
-     *
-     * @return an configured instance of value Serde class
-     */
-    @Deprecated
-    public Serde valueSerde() {
-        return defaultValueSerde();
-    }
-
-    /**
      * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG
value
      * Serde class}.
      *
      * @return an configured instance of value Serde class
      */
     public Serde defaultValueSerde() {
-        Object valueSerdeConfigSetting = get(VALUE_SERDE_CLASS_CONFIG);
+        Object valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
         try {
-            Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
-            if (serde == null) {
-                valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
-                serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
-            }
+            Serde<?> serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serde.class);
             serde.configure(originals(), false);
-
             return serde;
         } catch (final Exception e) {
             throw new StreamsException(
@@ -1035,13 +947,8 @@ public class StreamsConfig extends AbstractConfig {
         }
     }
 
-
     public TimestampExtractor defaultTimestampExtractor() {
-        TimestampExtractor timestampExtractor = getConfiguredInstance(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
TimestampExtractor.class);
-        if (timestampExtractor == null) {
-            return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
-        }
-        return timestampExtractor;
+        return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class);
     }
 
     public DeserializationExceptionHandler defaultDeserializationExceptionHandler() {
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index ef5e5a8..ac82c04 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -454,20 +454,6 @@ public class StreamsConfigTest {
         assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(commitIntervalMs));
     }
 
-    @SuppressWarnings("deprecation")
-    @Test
-    public void shouldBeBackwardsCompatibleWithDeprecatedConfigs() {
-        final Properties props = minimalStreamsConfig();
-        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Double().getClass());
-        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass());
-        props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class);
-
-        final StreamsConfig config = new StreamsConfig(props);
-        assertTrue(config.defaultKeySerde() instanceof Serdes.DoubleSerde);
-        assertTrue(config.defaultValueSerde() instanceof Serdes.DoubleSerde);
-        assertTrue(config.defaultTimestampExtractor() instanceof MockTimestampExtractor);
-    }
-
     @Test
     public void shouldUseNewConfigsWhenPresent() {
         final Properties props = minimalStreamsConfig();
@@ -489,28 +475,13 @@ public class StreamsConfigTest {
         assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
     }
 
-    @SuppressWarnings("deprecation")
-    @Test
-    public void shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() {
-        final Properties props = minimalStreamsConfig();
-        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
-        final StreamsConfig config = new StreamsConfig(props);
-        try {
-            config.keySerde();
-            fail("Test should throw a StreamsException");
-        } catch (StreamsException e) {
-            assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde",
e.getMessage());
-        }
-    }
-
-    @SuppressWarnings("deprecation")
     @Test
     public void shouldSpecifyCorrectKeySerdeClassOnError() {
         final Properties props = minimalStreamsConfig();
         props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig config = new StreamsConfig(props);
         try {
-            config.keySerde();
+            config.defaultKeySerde();
             fail("Test should throw a StreamsException");
         } catch (StreamsException e) {
             assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde",
e.getMessage());
@@ -519,26 +490,12 @@ public class StreamsConfigTest {
 
     @SuppressWarnings("deprecation")
     @Test
-    public void shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() {
-        final Properties props = minimalStreamsConfig();
-        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
-        final StreamsConfig config = new StreamsConfig(props);
-        try {
-            config.valueSerde();
-            fail("Test should throw a StreamsException");
-        } catch (StreamsException e) {
-            assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde",
e.getMessage());
-        }
-    }
-
-    @SuppressWarnings("deprecation")
-    @Test
     public void shouldSpecifyCorrectValueSerdeClassOnError() {
         final Properties props = minimalStreamsConfig();
         props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig config = new StreamsConfig(props);
         try {
-            config.valueSerde();
+            config.defaultValueSerde();
             fail("Test should throw a StreamsException");
         } catch (StreamsException e) {
             assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde",
e.getMessage());

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message