From commits-return-9423-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Thu Apr 26 22:17:00 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 297A8180648 for ; Thu, 26 Apr 2018 22:16:59 +0200 (CEST) Received: (qmail 34473 invoked by uid 500); 26 Apr 2018 20:16:58 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 34464 invoked by uid 99); 26 Apr 2018 20:16:58 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Apr 2018 20:16:58 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 717C5852EE; Thu, 26 Apr 2018 20:16:57 +0000 (UTC) Date: Thu, 26 Apr 2018 20:16:57 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: MINOR: Remove deprecated streams config (#4906) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152477381689.18064.2192618759115757413@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 885abbfcd40aab57acec278d976956f07be15090 X-Git-Newrev: 8725e3604b31d0bf822959fc6d870512411c7a05 X-Git-Rev: 8725e3604b31d0bf822959fc6d870512411c7a05 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 8725e36 MINOR: Remove deprecated streams config (#4906) 8725e36 is described below commit 8725e3604b31d0bf822959fc6d870512411c7a05 Author: Guozhang Wang AuthorDate: Thu Apr 26 13:16:51 2018 -0700 MINOR: Remove deprecated streams config (#4906) Removed the following: "zookeeper.connect", "key.serde", "value.serde", "timestamp.extractor" Reviewers: Bill Bejeck , John Roesler , Jason Gustafson --- .../org/apache/kafka/streams/StreamsConfig.java | 105 ++------------------- .../apache/kafka/streams/StreamsConfigTest.java | 47 +-------- 2 files changed, 8 insertions(+), 144 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index e46d6d0..23e69c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -279,14 +279,6 @@ public class StreamsConfig extends AbstractConfig { public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor"; private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractor interface."; - /** - * {@code key.serde} - * @deprecated Use {@link #DEFAULT_KEY_SERDE_CLASS_CONFIG} instead. - */ - @Deprecated - public static final String KEY_SERDE_CLASS_CONFIG = "key.serde"; - private static final String KEY_SERDE_CLASS_DOC = "Serializer / deserializer class for key that implements the org.apache.kafka.common.serialization.Serde interface. This config is deprecated, use " + DEFAULT_KEY_SERDE_CLASS_CONFIG + " instead"; - /** {@code metadata.max.age.ms} */ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; @@ -363,40 +355,16 @@ public class StreamsConfig extends AbstractConfig { public static final String STATE_DIR_CONFIG = "state.dir"; private static final String STATE_DIR_DOC = "Directory location for state store."; - /** - * {@code timestamp.extractor} - * @deprecated Use {@link #DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG} instead. - */ - @Deprecated - public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; - private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractor interface. This config is deprecated, use " + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + " instead"; - /** {@code upgrade.from} */ public static final String UPGRADE_FROM_CONFIG = "upgrade.from"; public static final String UPGRADE_FROM_DOC = "Allows upgrading from versions 0.10.0/0.10.1/0.10.2/0.11.0/1.0/1.1 to version 1.2 (or newer) in a backward compatible way. " + "When upgrading from 1.2 to a newer version it is not required to specify this config." + "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" + UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" + UPGRADE_FROM_11 + "\" (for upgrading from the corresponding old version)."; - /** - * {@code value.serde} - * @deprecated Use {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG} instead. - */ - @Deprecated - public static final String VALUE_SERDE_CLASS_CONFIG = "value.serde"; - private static final String VALUE_SERDE_CLASS_DOC = "Serializer / deserializer class for value that implements the org.apache.kafka.common.serialization.Serde interface. This config is deprecated, use " + DEFAULT_VALUE_SERDE_CLASS_CONFIG + " instead"; - /** {@code windowstore.changelog.additional.retention.ms} */ public static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG = "windowstore.changelog.additional.retention.ms"; private static final String WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC = "Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift. Default is 1 day"; - /** - * {@code zookeeper.connect} - * @deprecated Kafka Streams does not use Zookeeper anymore and this parameter will be ignored. - */ - @Deprecated - public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect"; - private static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string for Kafka topics management. This config is deprecated and will be ignored as Streams API does not use Zookeeper anymore."; - private static final String[] NON_CONFIGURABLE_CONSUMER_DEFAULT_CONFIGS = new String[] {ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG}; private static final String[] NON_CONFIGURABLE_CONSUMER_EOS_CONFIGS = new String[] {ConsumerConfig.ISOLATION_LEVEL_CONFIG}; private static final String[] NON_CONFIGURABLE_PRODUCER_EOS_CONFIGS = new String[] {ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, @@ -609,30 +577,7 @@ public class StreamsConfig extends AbstractConfig { Type.LONG, 24 * 60 * 60 * 1000L, Importance.LOW, - WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC) - - // @deprecated - - .define(KEY_SERDE_CLASS_CONFIG, - Type.CLASS, - null, - Importance.LOW, - KEY_SERDE_CLASS_DOC) - .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, - Type.CLASS, - null, - Importance.LOW, - TIMESTAMP_EXTRACTOR_CLASS_DOC) - .define(VALUE_SERDE_CLASS_CONFIG, - Type.CLASS, - null, - Importance.LOW, - VALUE_SERDE_CLASS_DOC) - .define(ZOOKEEPER_CONNECT_CONFIG, - Type.STRING, - "", - Importance.LOW, - ZOOKEEPER_CONNECT_DOC); + WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC); } // this is the list of configs for underlying clients @@ -773,8 +718,6 @@ public class StreamsConfig extends AbstractConfig { // bootstrap.servers should be from StreamsConfig consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, originals().get(BOOTSTRAP_SERVERS_CONFIG)); - // remove deprecate ZK config - consumerProps.remove(ZOOKEEPER_CONNECT_CONFIG); return consumerProps; } @@ -969,30 +912,15 @@ public class StreamsConfig extends AbstractConfig { } /** - * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #KEY_SERDE_CLASS_CONFIG key Serde - * class}. This method is deprecated. Use {@link #defaultKeySerde()} method instead. - * - * @return an configured instance of key Serde class - */ - @Deprecated - public Serde keySerde() { - return defaultKeySerde(); - } - - /** * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_KEY_SERDE_CLASS_CONFIG key Serde * class}. * * @return an configured instance of key Serde class */ public Serde defaultKeySerde() { - Object keySerdeConfigSetting = get(KEY_SERDE_CLASS_CONFIG); + Object keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG); try { - Serde serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class); - if (serde == null) { - keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG); - serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class); - } + Serde serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class); serde.configure(originals(), true); return serde; } catch (final Exception e) { @@ -1002,32 +930,16 @@ public class StreamsConfig extends AbstractConfig { } /** - * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #VALUE_SERDE_CLASS_CONFIG value - * Serde class}. This method is deprecated. Use {@link #defaultValueSerde()} instead. - * - * @return an configured instance of value Serde class - */ - @Deprecated - public Serde valueSerde() { - return defaultValueSerde(); - } - - /** * Return an {@link Serde#configure(Map, boolean) configured} instance of {@link #DEFAULT_VALUE_SERDE_CLASS_CONFIG value * Serde class}. * * @return an configured instance of value Serde class */ public Serde defaultValueSerde() { - Object valueSerdeConfigSetting = get(VALUE_SERDE_CLASS_CONFIG); + Object valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG); try { - Serde serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class); - if (serde == null) { - valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG); - serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class); - } + Serde serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class); serde.configure(originals(), false); - return serde; } catch (final Exception e) { throw new StreamsException( @@ -1035,13 +947,8 @@ public class StreamsConfig extends AbstractConfig { } } - public TimestampExtractor defaultTimestampExtractor() { - TimestampExtractor timestampExtractor = getConfiguredInstance(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); - if (timestampExtractor == null) { - return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); - } - return timestampExtractor; + return getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); } public DeserializationExceptionHandler defaultDeserializationExceptionHandler() { diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index ef5e5a8..ac82c04 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -454,20 +454,6 @@ public class StreamsConfigTest { assertThat(streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG), equalTo(commitIntervalMs)); } - @SuppressWarnings("deprecation") - @Test - public void shouldBeBackwardsCompatibleWithDeprecatedConfigs() { - final Properties props = minimalStreamsConfig(); - props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Double().getClass()); - props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Double().getClass()); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); - - final StreamsConfig config = new StreamsConfig(props); - assertTrue(config.defaultKeySerde() instanceof Serdes.DoubleSerde); - assertTrue(config.defaultValueSerde() instanceof Serdes.DoubleSerde); - assertTrue(config.defaultTimestampExtractor() instanceof MockTimestampExtractor); - } - @Test public void shouldUseNewConfigsWhenPresent() { final Properties props = minimalStreamsConfig(); @@ -489,28 +475,13 @@ public class StreamsConfigTest { assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp); } - @SuppressWarnings("deprecation") - @Test - public void shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() { - final Properties props = minimalStreamsConfig(); - props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); - final StreamsConfig config = new StreamsConfig(props); - try { - config.keySerde(); - fail("Test should throw a StreamsException"); - } catch (StreamsException e) { - assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage()); - } - } - - @SuppressWarnings("deprecation") @Test public void shouldSpecifyCorrectKeySerdeClassOnError() { final Properties props = minimalStreamsConfig(); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); final StreamsConfig config = new StreamsConfig(props); try { - config.keySerde(); + config.defaultKeySerde(); fail("Test should throw a StreamsException"); } catch (StreamsException e) { assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage()); @@ -519,26 +490,12 @@ public class StreamsConfigTest { @SuppressWarnings("deprecation") @Test - public void shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() { - final Properties props = minimalStreamsConfig(); - props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); - final StreamsConfig config = new StreamsConfig(props); - try { - config.valueSerde(); - fail("Test should throw a StreamsException"); - } catch (StreamsException e) { - assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage()); - } - } - - @SuppressWarnings("deprecation") - @Test public void shouldSpecifyCorrectValueSerdeClassOnError() { final Properties props = minimalStreamsConfig(); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class); final StreamsConfig config = new StreamsConfig(props); try { - config.valueSerde(); + config.defaultValueSerde(); fail("Test should throw a StreamsException"); } catch (StreamsException e) { assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde", e.getMessage()); -- To stop receiving notification emails like this one, please contact guozhang@apache.org.