kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: updated configs to use one try/catch for serdes
Date Wed, 26 Jul 2017 19:58:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 23c51495d -> c1f583e67


MINOR: updated configs to use one try/catch for serdes

removed `try/catch` from `keySerde` and `valueSerde` methods so only the `try\catch` blocks
in `defaultKeySerde` and `defaultValueSerde` perform error handling resulting in correct error
message.

Author: Bill Bejeck <bill@confluent.io>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3568 from bbejeck/MINOR_ensure_correct_error_messages_for_configs

(cherry picked from commit f8498ec9e27ca0f08e3791d7a19ad8c6a97e210f)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c1f583e6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c1f583e6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c1f583e6

Branch: refs/heads/0.11.0
Commit: c1f583e671ace7f96f247ddba5aad94078f60d3b
Parents: 23c5149
Author: Bill Bejeck <bill@confluent.io>
Authored: Wed Jul 26 12:58:33 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jul 26 12:58:41 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/StreamsConfig.java | 20 +++----
 .../apache/kafka/streams/StreamsConfigTest.java | 55 ++++++++++++++++++++
 2 files changed, 63 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c1f583e6/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index cf103e7..b72cf29 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -713,11 +713,7 @@ public class StreamsConfig extends AbstractConfig {
      */
     @Deprecated
     public Serde keySerde() {
-        try {
-            return defaultKeySerde();
-        } catch (final Exception e) {
-            throw new StreamsException(String.format("Failed to configure key serde %s",
get(KEY_SERDE_CLASS_CONFIG)), e);
-        }
+        return defaultKeySerde();
     }
 
     /**
@@ -727,16 +723,18 @@ public class StreamsConfig extends AbstractConfig {
      * @return an configured instance of key Serde class
      */
     public Serde defaultKeySerde() {
+        Object keySerdeConfigSetting = get(KEY_SERDE_CLASS_CONFIG);
         try {
             Serde<?> serde = getConfiguredInstance(KEY_SERDE_CLASS_CONFIG, Serde.class);
             if (serde == null) {
+                keySerdeConfigSetting = get(DEFAULT_KEY_SERDE_CLASS_CONFIG);
                 serde = getConfiguredInstance(DEFAULT_KEY_SERDE_CLASS_CONFIG, Serde.class);
             }
             serde.configure(originals(), true);
             return serde;
         } catch (final Exception e) {
             throw new StreamsException(
-                String.format("Failed to configure key serde %s", get(DEFAULT_KEY_SERDE_CLASS_CONFIG)),
e);
+                String.format("Failed to configure key serde %s", keySerdeConfigSetting),
e);
         }
     }
 
@@ -748,11 +746,7 @@ public class StreamsConfig extends AbstractConfig {
      */
     @Deprecated
     public Serde valueSerde() {
-        try {
-            return defaultValueSerde();
-        } catch (final Exception e) {
-            throw new StreamsException(String.format("Failed to configure value serde %s",
get(VALUE_SERDE_CLASS_CONFIG)), e);
-        }
+        return defaultValueSerde();
     }
 
     /**
@@ -762,9 +756,11 @@ public class StreamsConfig extends AbstractConfig {
      * @return an configured instance of value Serde class
      */
     public Serde defaultValueSerde() {
+        Object valueSerdeConfigSetting = get(VALUE_SERDE_CLASS_CONFIG);
         try {
             Serde<?> serde = getConfiguredInstance(VALUE_SERDE_CLASS_CONFIG, Serde.class);
             if (serde == null) {
+                valueSerdeConfigSetting = get(DEFAULT_VALUE_SERDE_CLASS_CONFIG);
                 serde = getConfiguredInstance(DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serde.class);
             }
             serde.configure(originals(), false);
@@ -772,7 +768,7 @@ public class StreamsConfig extends AbstractConfig {
             return serde;
         } catch (final Exception e) {
             throw new StreamsException(
-                String.format("Failed to configure value serde %s", get(DEFAULT_VALUE_SERDE_CLASS_CONFIG)),
e);
+                String.format("Failed to configure value serde %s", valueSerdeConfigSetting),
e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c1f583e6/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index 9f0f67a..3bbd69e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -50,6 +50,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class StreamsConfigTest {
 
@@ -428,6 +429,60 @@ public class StreamsConfigTest {
         assertTrue(config.defaultTimestampExtractor() instanceof FailOnInvalidTimestamp);
     }
 
+    @Test
+    public void shouldSpecifyCorrectKeySerdeClassOnErrorUsingDeprecatedConfigs() {
+        final Properties props = minimalStreamsConfig();
+        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+        final StreamsConfig config = new StreamsConfig(props);
+        try {
+            config.keySerde();
+            fail("Test should throw a StreamsException");
+        } catch (StreamsException e) {
+            assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde",
e.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldSpecifyCorrectKeySerdeClassOnError() {
+        final Properties props = minimalStreamsConfig();
+        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+        final StreamsConfig config = new StreamsConfig(props);
+        try {
+            config.keySerde();
+            fail("Test should throw a StreamsException");
+        } catch (StreamsException e) {
+            assertEquals("Failed to configure key serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde",
e.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldSpecifyCorrectValueSerdeClassOnErrorUsingDeprecatedConfigs() {
+        final Properties props = minimalStreamsConfig();
+        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+        final StreamsConfig config = new StreamsConfig(props);
+        try {
+            config.valueSerde();
+            fail("Test should throw a StreamsException");
+        } catch (StreamsException e) {
+            assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde",
e.getMessage());
+        }
+    }
+
+    @Test
+    public void shouldSpecifyCorrectValueSerdeClassOnError() {
+        final Properties props = minimalStreamsConfig();
+        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
+        final StreamsConfig config = new StreamsConfig(props);
+        try {
+            config.valueSerde();
+            fail("Test should throw a StreamsException");
+        } catch (StreamsException e) {
+            assertEquals("Failed to configure value serde class org.apache.kafka.streams.StreamsConfigTest$MisconfiguredSerde",
e.getMessage());
+        }
+    }
+
+
+
     static class MisconfiguredSerde implements Serde {
         @Override
         public void configure(final Map configs, final boolean isKey) {


Mime
View raw message