kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6728: Corrected the worker’s instantiation of the HeaderConverter
Date Tue, 03 Apr 2018 15:48:18 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d9369de  KAFKA-6728: Corrected the worker’s instantiation of the HeaderConverter
d9369de is described below

commit d9369de8f2c6435843fb7577d313bb24e3b09cba
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Tue Apr 3 08:48:05 2018 -0700

    KAFKA-6728: Corrected the worker’s instantiation of the HeaderConverter
    
    ## Summary of the problem
    When the `header.converter` is not specified in the worker config or the connector config,
a bug in the `Plugins` test causes it to never instantiate the `HeaderConverter` instance,
even though there is a default value.
    
    This is a problem as soon as the connector deals with headers, either in records created
by a source connector or in messages on the Kafka topics consumed by a sink connector. As
soon as that happens, a NPE occurs.
    
    A workaround is to explicitly set the `header.converter` configuration property, but this
was added in AK 1.1 and thus means that upgrading to AK 1.1 will not be backward compatible
and will require this configuration change.
    
    ## The Changes
    
    The `Plugins.newHeaderConverter` methods were always returning null if the `header.converter`
configuration value was not specified in the supplied connector or worker configuration. Thus,
even though the `header.converter` property has a default, it was never being used.
    
    The fix was to only check whether a `header.converter` property was specified when the
connector configuration was being used, and if no such property exists in the connector configuration
to return null. Then, when the worker configuration is being used, the method simply gets
the `header.converter` value (or the default if no value was explicitly set).
    
    Also, the ConnectorConfig had the same default value for the `header.converter` property
as the WorkerConfig, but this resulted in very confusing log messages that implied the default
header converter should be used even when the worker config specified the `header.converter`
value. By removing the default, the log messages now make sense, and the Worker still properly
instantiates the correct header converter.
    
    Finally, updated comments and added log messages to make it more clear which converters
are being used and how they are being converted.
    
    ## Testing
    
    Several new unit tests for `Plugins.newHeaderConverter` were added to check the various
behavior. Additionally, the runtime JAR with these changes was built and inserted into an
AK 1.1 installation, and a source connector was manually tested with 8 different combinations
of settings for the `header.converter` configuration:
    
    1. default value
    1. worker configuration has `header.converter` explicitly set to the default
    1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation
in the same plugin
    1. worker configuration has `header.converter` set to a custom `HeaderConverter` implementation
in a _different_ plugin
    1. connector configuration has `header.converter` explicitly set to the default
    1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation
in the same plugin
    1. connector configuration has `header.converter` set to a custom `HeaderConverter` implementation
in a _different_ plugin
    1. worker configuration has `header.converter` explicitly set to the default, and the
connector configuration has `header.converter` set to a custom `HeaderConverter` implementation
in a _different_ plugin
    
    The worker created the correct `HeaderConverter` implementation with the correct configuration
in all of these tests.
    
    Finally, the default configuration was used with the aforementioned custom source connector
that generated records with headers, and an S3 connector that consumes the records with headers
(but didn't do anything with them). This test also passed.
    
    Author: Randall Hauch <rhauch@gmail.com>
    
    Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #4815 from rhauch/kafka-6728
---
 .../kafka/connect/runtime/ConnectorConfig.java     |  4 +-
 .../org/apache/kafka/connect/runtime/Worker.java   |  9 +++
 .../kafka/connect/runtime/isolation/Plugins.java   | 14 ++--
 .../connect/runtime/isolation/PluginsTest.java     | 76 +++++++++++++++++-----
 4 files changed, 80 insertions(+), 23 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 0a895f6..fd05af5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -76,7 +76,9 @@ public class ConnectorConfig extends AbstractConfig {
     public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
     public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
     public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header converter class";
-    public static final String HEADER_CONVERTER_CLASS_DEFAULT = WorkerConfig.HEADER_CONVERTER_CLASS_DEFAULT;
+    // The Connector config should not have a default for the header converter, since the
absence of a config property means that
+    // the worker config settings should be used. Thus, we set the default to null here.
+    public static final String HEADER_CONVERTER_CLASS_DEFAULT = null;
 
     public static final String TASKS_MAX_CONFIG = "tasks.max";
     private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this
connector.";
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index e3d9cf4..1c64658 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -397,12 +397,21 @@ public class Worker {
             );
             if (keyConverter == null) {
                 keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.PLUGINS);
+                log.info("Set up the key converter {} for task {} using the worker config",
keyConverter.getClass(), id);
+            } else {
+                log.info("Set up the key converter {} for task {} using the connector config",
keyConverter.getClass(), id);
             }
             if (valueConverter == null) {
                 valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.PLUGINS);
+                log.info("Set up the value converter {} for task {} using the worker config",
valueConverter.getClass(), id);
+            } else {
+                log.info("Set up the value converter {} for task {} using the connector config",
valueConverter.getClass(), id);
             }
             if (headerConverter == null) {
                 headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.PLUGINS);
+                log.info("Set up the header converter {} for task {} using the worker config",
headerConverter.getClass(), id);
+            } else {
+                log.info("Set up the header converter {} for task {} using the connector
config", headerConverter.getClass(), id);
             }
 
             workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState,
keyConverter, valueConverter, headerConverter, connectorLoader);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 94f2771..f4cd2ba 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -234,6 +234,8 @@ public class Plugins {
         // Configure the Converter using only the old configuration mechanism ...
         String configPrefix = classPropertyName + ".";
         Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix);
+        log.debug("Configuring the {} converter with configuration:{}{}",
+                  isKeyConverter ? "key" : "value", System.lineSeparator(), converterConfig);
         plugin.configure(converterConfig, isKeyConverter);
         return plugin;
     }
@@ -249,20 +251,21 @@ public class Plugins {
      * @throws ConnectException if the {@link HeaderConverter} implementation class could
not be found
      */
     public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName,
ClassLoaderUsage classLoaderUsage) {
-        if (!config.originals().containsKey(classPropertyName)) {
-            // This configuration does not define the header converter via the specified
property name
-            return null;
-        }
         HeaderConverter plugin = null;
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
+                if (!config.originals().containsKey(classPropertyName)) {
+                    // This connector configuration does not define the header converter
via the specified property name
+                    return null;
+                }
                 // Attempt to load first with the current classloader, and plugins as a fallback.
                 // Note: we can't use config.getConfiguredInstance because we have to remove
the property prefixes
                 // before calling config(...)
                 plugin = getInstance(config, classPropertyName, HeaderConverter.class);
                 break;
             case PLUGINS:
-                // Attempt to load with the plugin class loader, which uses the current classloader
as a fallback
+                // Attempt to load with the plugin class loader, which uses the current classloader
as a fallback.
+                // Note that there will always be at least a default header converter for
the worker
                 String converterClassOrAlias = config.getClass(classPropertyName).getName();
                 Class<? extends HeaderConverter> klass;
                 try {
@@ -288,6 +291,7 @@ public class Plugins {
         String configPrefix = classPropertyName + ".";
         Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix);
         converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName());
+        log.debug("Configuring the header converter with configuration:{}{}", System.lineSeparator(),
converterConfig);
         plugin.configure(converterConfig);
         return plugin;
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 6de92ee..a9a944f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.ConverterConfig;
 import org.apache.kafka.connect.storage.ConverterType;
 import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.SimpleHeaderConverter;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -39,18 +40,31 @@ import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class PluginsTest {
 
-    private static Map<String, String> props;
+    private static Map<String, String> pluginProps;
     private static Plugins plugins;
+    private Map<String, String> props;
     private AbstractConfig config;
     private TestConverter converter;
     private TestHeaderConverter headerConverter;
 
     @BeforeClass
     public static void beforeAll() {
-        props = new HashMap<>();
+        pluginProps = new HashMap<>();
+
+        // Set up the plugins to have no additional plugin directories.
+        // This won't allow us to test classpath isolation, but it will allow us to test
some of the utility methods.
+        pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, "");
+        plugins = new Plugins(pluginProps);
+    }
+
+    @Before
+    public void setup() {
+        props = new HashMap<>(pluginProps);
         props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
         props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
         props.put("key.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true");
@@ -66,14 +80,10 @@ public class PluginsTest {
         props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderConverter.class.getName());
         props.put("header.converter.extra.config", "baz");
 
-        // Set up the plugins to have no additional plugin directories.
-        // This won't allow us to test classpath isolation, but it will allow us to test
some of the utility methods.
-        props.put(WorkerConfig.PLUGIN_PATH_CONFIG, "");
-        plugins = new Plugins(props);
+        createConfig();
     }
 
-    @Before
-    public void setup() {
+    protected void createConfig() {
         this.config = new TestableWorkerConfig(props);
     }
 
@@ -104,11 +114,48 @@ public class PluginsTest {
     }
 
     @Test
-    public void shouldInstantiateAndConfigureHeaderConverter() {
-        instantiateAndConfigureHeaderConverter(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
+    public void shouldInstantiateAndConfigureExplicitlySetHeaderConverterWithCurrentClassLoader()
{
+        assertNotNull(props.get(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG));
+        HeaderConverter headerConverter = plugins.newHeaderConverter(config,
+                                                                     WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                                     ClassLoaderUsage.CURRENT_CLASSLOADER);
+        assertNotNull(headerConverter);
+        assertTrue(headerConverter instanceof TestHeaderConverter);
+        this.headerConverter = (TestHeaderConverter) headerConverter;
+
+        // Validate extra configs got passed through to overridden converters
+        assertConverterType(ConverterType.HEADER, this.headerConverter.configs);
+        assertEquals("baz", this.headerConverter.configs.get("extra.config"));
+
+        headerConverter = plugins.newHeaderConverter(config,
+                                                     WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                     ClassLoaderUsage.PLUGINS);
+        assertNotNull(headerConverter);
+        assertTrue(headerConverter instanceof TestHeaderConverter);
+        this.headerConverter = (TestHeaderConverter) headerConverter;
+
         // Validate extra configs got passed through to overridden converters
-        assertConverterType(ConverterType.HEADER, headerConverter.configs);
-        assertEquals("baz", headerConverter.configs.get("extra.config"));
+        assertConverterType(ConverterType.HEADER, this.headerConverter.configs);
+        assertEquals("baz", this.headerConverter.configs.get("extra.config"));
+    }
+
+    @Test
+    public void shouldInstantiateAndConfigureDefaultHeaderConverter() {
+        props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
+        createConfig();
+
+        // Because it's not explicitly set on the supplied configuration, the logic to use
the current classloader for the connector
+        // will exit immediately, and so this method always returns null
+        HeaderConverter headerConverter = plugins.newHeaderConverter(config,
+                                                                     WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                                     ClassLoaderUsage.CURRENT_CLASSLOADER);
+        assertNull(headerConverter);
+        // But we should always find it (or the worker's default) when using the plugins
classloader ...
+        headerConverter = plugins.newHeaderConverter(config,
+                                                     WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                     ClassLoaderUsage.PLUGINS);
+        assertNotNull(headerConverter);
+        assertTrue(headerConverter instanceof SimpleHeaderConverter);
     }
 
     protected void instantiateAndConfigureConverter(String configPropName, ClassLoaderUsage
classLoaderUsage) {
@@ -116,11 +163,6 @@ public class PluginsTest {
         assertNotNull(converter);
     }
 
-    protected void instantiateAndConfigureHeaderConverter(String configPropName) {
-        headerConverter = (TestHeaderConverter) plugins.newHeaderConverter(config, configPropName,
ClassLoaderUsage.CURRENT_CLASSLOADER);
-        assertNotNull(headerConverter);
-    }
-
     protected void assertConverterType(ConverterType type, Map<String, ?> props) {
         assertEquals(type.getName(), props.get(ConverterConfig.TYPE_CONFIG));
     }

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.

Mime
View raw message