kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6728) Kafka Connect Header Null Pointer Exception
Date Tue, 03 Apr 2018 15:50:00 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6728?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16424197#comment-16424197
] 

ASF GitHub Bot commented on KAFKA-6728:
---------------------------------------

ewencp closed pull request #4815: KAFKA-6728: Corrected the worker’s instantiation of the
HeaderConverter
URL: https://github.com/apache/kafka/pull/4815
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 0a895f67cf8..fd05af57a64 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -76,7 +76,9 @@
     public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
     public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
     public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header converter class";
-    public static final String HEADER_CONVERTER_CLASS_DEFAULT = WorkerConfig.HEADER_CONVERTER_CLASS_DEFAULT;
+    // The Connector config should not have a default for the header converter, since the
absence of a config property means that
+    // the worker config settings should be used. Thus, we set the default to null here.
+    public static final String HEADER_CONVERTER_CLASS_DEFAULT = null;
 
     public static final String TASKS_MAX_CONFIG = "tasks.max";
     private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this
connector.";
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index e3d9cf45901..1c6465855ff 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -397,12 +397,21 @@ public boolean startTask(
             );
             if (keyConverter == null) {
                 keyConverter = plugins.newConverter(config, WorkerConfig.KEY_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.PLUGINS);
+                log.info("Set up the key converter {} for task {} using the worker config",
keyConverter.getClass(), id);
+            } else {
+                log.info("Set up the key converter {} for task {} using the connector config",
keyConverter.getClass(), id);
             }
             if (valueConverter == null) {
                 valueConverter = plugins.newConverter(config, WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.PLUGINS);
+                log.info("Set up the value converter {} for task {} using the worker config",
valueConverter.getClass(), id);
+            } else {
+                log.info("Set up the value converter {} for task {} using the connector config",
valueConverter.getClass(), id);
             }
             if (headerConverter == null) {
                 headerConverter = plugins.newHeaderConverter(config, WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
ClassLoaderUsage.PLUGINS);
+                log.info("Set up the header converter {} for task {} using the worker config",
headerConverter.getClass(), id);
+            } else {
+                log.info("Set up the header converter {} for task {} using the connector
config", headerConverter.getClass(), id);
             }
 
             workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState,
keyConverter, valueConverter, headerConverter, connectorLoader);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index 94f27717080..f4cd2ba14b0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -234,6 +234,8 @@ public Converter newConverter(AbstractConfig config, String classPropertyName,
C
         // Configure the Converter using only the old configuration mechanism ...
         String configPrefix = classPropertyName + ".";
         Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix);
+        log.debug("Configuring the {} converter with configuration:{}{}",
+                  isKeyConverter ? "key" : "value", System.lineSeparator(), converterConfig);
         plugin.configure(converterConfig, isKeyConverter);
         return plugin;
     }
@@ -249,20 +251,21 @@ public Converter newConverter(AbstractConfig config, String classPropertyName,
C
      * @throws ConnectException if the {@link HeaderConverter} implementation class could
not be found
      */
     public HeaderConverter newHeaderConverter(AbstractConfig config, String classPropertyName,
ClassLoaderUsage classLoaderUsage) {
-        if (!config.originals().containsKey(classPropertyName)) {
-            // This configuration does not define the header converter via the specified
property name
-            return null;
-        }
         HeaderConverter plugin = null;
         switch (classLoaderUsage) {
             case CURRENT_CLASSLOADER:
+                if (!config.originals().containsKey(classPropertyName)) {
+                    // This connector configuration does not define the header converter
via the specified property name
+                    return null;
+                }
                 // Attempt to load first with the current classloader, and plugins as a fallback.
                 // Note: we can't use config.getConfiguredInstance because we have to remove
the property prefixes
                 // before calling config(...)
                 plugin = getInstance(config, classPropertyName, HeaderConverter.class);
                 break;
             case PLUGINS:
-                // Attempt to load with the plugin class loader, which uses the current classloader
as a fallback
+                // Attempt to load with the plugin class loader, which uses the current classloader
as a fallback.
+                // Note that there will always be at least a default header converter for
the worker
                 String converterClassOrAlias = config.getClass(classPropertyName).getName();
                 Class<? extends HeaderConverter> klass;
                 try {
@@ -288,6 +291,7 @@ public HeaderConverter newHeaderConverter(AbstractConfig config, String
classPro
         String configPrefix = classPropertyName + ".";
         Map<String, Object> converterConfig = config.originalsWithPrefix(configPrefix);
         converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName());
+        log.debug("Configuring the header converter with configuration:{}{}", System.lineSeparator(),
converterConfig);
         plugin.configure(converterConfig);
         return plugin;
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
index 6de92eedd34..a9a944fa360 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java
@@ -29,6 +29,7 @@
 import org.apache.kafka.connect.storage.ConverterConfig;
 import org.apache.kafka.connect.storage.ConverterType;
 import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.SimpleHeaderConverter;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -39,18 +40,31 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class PluginsTest {
 
-    private static Map<String, String> props;
+    private static Map<String, String> pluginProps;
     private static Plugins plugins;
+    private Map<String, String> props;
     private AbstractConfig config;
     private TestConverter converter;
     private TestHeaderConverter headerConverter;
 
     @BeforeClass
     public static void beforeAll() {
-        props = new HashMap<>();
+        pluginProps = new HashMap<>();
+
+        // Set up the plugins to have no additional plugin directories.
+        // This won't allow us to test classpath isolation, but it will allow us to test
some of the utility methods.
+        pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, "");
+        plugins = new Plugins(pluginProps);
+    }
+
+    @Before
+    public void setup() {
+        props = new HashMap<>(pluginProps);
         props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
         props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
         props.put("key.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "true");
@@ -66,14 +80,10 @@ public static void beforeAll() {
         props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderConverter.class.getName());
         props.put("header.converter.extra.config", "baz");
 
-        // Set up the plugins to have no additional plugin directories.
-        // This won't allow us to test classpath isolation, but it will allow us to test
some of the utility methods.
-        props.put(WorkerConfig.PLUGIN_PATH_CONFIG, "");
-        plugins = new Plugins(props);
+        createConfig();
     }
 
-    @Before
-    public void setup() {
+    protected void createConfig() {
         this.config = new TestableWorkerConfig(props);
     }
 
@@ -104,11 +114,48 @@ public void shouldInstantiateAndConfigureInternalConverters() {
     }
 
     @Test
-    public void shouldInstantiateAndConfigureHeaderConverter() {
-        instantiateAndConfigureHeaderConverter(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
+    public void shouldInstantiateAndConfigureExplicitlySetHeaderConverterWithCurrentClassLoader()
{
+        assertNotNull(props.get(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG));
+        HeaderConverter headerConverter = plugins.newHeaderConverter(config,
+                                                                     WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                                     ClassLoaderUsage.CURRENT_CLASSLOADER);
+        assertNotNull(headerConverter);
+        assertTrue(headerConverter instanceof TestHeaderConverter);
+        this.headerConverter = (TestHeaderConverter) headerConverter;
+
+        // Validate extra configs got passed through to overridden converters
+        assertConverterType(ConverterType.HEADER, this.headerConverter.configs);
+        assertEquals("baz", this.headerConverter.configs.get("extra.config"));
+
+        headerConverter = plugins.newHeaderConverter(config,
+                                                     WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                     ClassLoaderUsage.PLUGINS);
+        assertNotNull(headerConverter);
+        assertTrue(headerConverter instanceof TestHeaderConverter);
+        this.headerConverter = (TestHeaderConverter) headerConverter;
+
         // Validate extra configs got passed through to overridden converters
-        assertConverterType(ConverterType.HEADER, headerConverter.configs);
-        assertEquals("baz", headerConverter.configs.get("extra.config"));
+        assertConverterType(ConverterType.HEADER, this.headerConverter.configs);
+        assertEquals("baz", this.headerConverter.configs.get("extra.config"));
+    }
+
+    @Test
+    public void shouldInstantiateAndConfigureDefaultHeaderConverter() {
+        props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG);
+        createConfig();
+
+        // Because it's not explicitly set on the supplied configuration, the logic to use
the current classloader for the connector
+        // will exit immediately, and so this method always returns null
+        HeaderConverter headerConverter = plugins.newHeaderConverter(config,
+                                                                     WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                                     ClassLoaderUsage.CURRENT_CLASSLOADER);
+        assertNull(headerConverter);
+        // But we should always find it (or the worker's default) when using the plugins
classloader ...
+        headerConverter = plugins.newHeaderConverter(config,
+                                                     WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG,
+                                                     ClassLoaderUsage.PLUGINS);
+        assertNotNull(headerConverter);
+        assertTrue(headerConverter instanceof SimpleHeaderConverter);
     }
 
     protected void instantiateAndConfigureConverter(String configPropName, ClassLoaderUsage
classLoaderUsage) {
@@ -116,11 +163,6 @@ protected void instantiateAndConfigureConverter(String configPropName,
ClassLoad
         assertNotNull(converter);
     }
 
-    protected void instantiateAndConfigureHeaderConverter(String configPropName) {
-        headerConverter = (TestHeaderConverter) plugins.newHeaderConverter(config, configPropName,
ClassLoaderUsage.CURRENT_CLASSLOADER);
-        assertNotNull(headerConverter);
-    }
-
     protected void assertConverterType(ConverterType type, Map<String, ?> props) {
         assertEquals(type.getName(), props.get(ConverterConfig.TYPE_CONFIG));
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Kafka Connect Header Null Pointer Exception
> -------------------------------------------
>
>                 Key: KAFKA-6728
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6728
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 1.1.0
>         Environment: Linux Mint
>            Reporter: Philippe Hong
>            Assignee: Randall Hauch
>            Priority: Critical
>             Fix For: 1.2.0, 1.1.1
>
>
> I am trying to use the newly released Kafka Connect that supports headers by using the
standalone connector to write to a text file (so in this case I am only using the sink component)
> I am sadly greeted by a NullPointerException :
> {noformat}
> ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and unrecoverable exception
(org.apache.kafka.connect.runtime.WorkerTask:172)
> java.lang.NullPointerException
>     at org.apache.kafka.connect.runtime.WorkerSinkTask.convertHeadersFor(WorkerSinkTask.java:501)
>     at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
>     at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
>     at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
>     at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     at java.lang.Thread.run(Thread.java:748)
> {noformat}
> I launched zookeeper and kafka 1.1.0 locally and sent a ProducerRecord[String, Array[Byte]]
using a KafkaProducer[String, Array[Byte]] with a header that has a key and value.
> I can read the record with a console consumer as well as using a KafkaConsumer (where
in this case I can see the content of the header of the record I sent previously) so no problem
here.
> I only made two changes to the kafka configuration:
>      - I used the StringConverter for the key and the ByteArrayConverter for the value.

>      - I also changed the topic where the sink would connect to.
> If I forgot something please tell me so as it is the first time I am creating an issue
on Jira.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message