From jira-return-11471-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Apr 3 17:50:08 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id A94C6180675 for ; Tue, 3 Apr 2018 17:50:07 +0200 (CEST) Received: (qmail 3291 invoked by uid 500); 3 Apr 2018 15:50:06 -0000 Mailing-List: contact jira-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: jira@kafka.apache.org Delivered-To: mailing list jira@kafka.apache.org Received: (qmail 3280 invoked by uid 99); 3 Apr 2018 15:50:06 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Apr 2018 15:50:06 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 52C1B180318 for ; Tue, 3 Apr 2018 15:50:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -109.511 X-Spam-Level: X-Spam-Status: No, score=-109.511 tagged_above=-999 required=6.31 tests=[ENV_AND_HDR_SPF_MATCH=-0.5, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id tzHieuKXbq7N for ; Tue, 3 Apr 2018 15:50:03 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTP id 6EFCF5F260 for ; Tue, 3 Apr 2018 15:50:03 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id 14157E0D06 for ; Tue, 3 Apr 2018 15:50:02 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id D8E9F25622 for ; Tue, 3 Apr 2018 15:50:00 +0000 (UTC) Date: Tue, 3 Apr 2018 15:50:00 +0000 (UTC) From: "ASF GitHub Bot (JIRA)" To: jira@kafka.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (KAFKA-6728) Kafka Connect Header Null Pointer Exception MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/KAFKA-6728?page=3Dcom.atlassian= .jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=3D1642= 4197#comment-16424197 ]=20 ASF GitHub Bot commented on KAFKA-6728: --------------------------------------- ewencp closed pull request #4815: KAFKA-6728: Corrected the worker=E2=80=99= s instantiation of the HeaderConverter URL: https://github.com/apache/kafka/pull/4815 =20 =20 =20 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime= /ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/conn= ect/runtime/ConnectorConfig.java index 0a895f67cf8..fd05af57a64 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connec= torConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connec= torConfig.java @@ -76,7 +76,9 @@ public static final String HEADER_CONVERTER_CLASS_CONFIG =3D WorkerCon= fig.HEADER_CONVERTER_CLASS_CONFIG; public static final String HEADER_CONVERTER_CLASS_DOC =3D WorkerConfig= .HEADER_CONVERTER_CLASS_DOC; public static final String HEADER_CONVERTER_CLASS_DISPLAY =3D "Header = converter class"; - public static final String HEADER_CONVERTER_CLASS_DEFAULT =3D WorkerCo= nfig.HEADER_CONVERTER_CLASS_DEFAULT; + // The Connector config should not have a default for the header conve= rter, since the absence of a config property means that + // the worker config settings should be used. Thus, we set the default= to null here. + public static final String HEADER_CONVERTER_CLASS_DEFAULT =3D null; =20 public static final String TASKS_MAX_CONFIG =3D "tasks.max"; private static final String TASKS_MAX_DOC =3D "Maximum number of tasks= to use for this connector."; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime= /Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runti= me/Worker.java index e3d9cf45901..1c6465855ff 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker= .java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker= .java @@ -397,12 +397,21 @@ public boolean startTask( ); if (keyConverter =3D=3D null) { keyConverter =3D plugins.newConverter(config, WorkerConfig= .KEY_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + log.info("Set up the key converter {} for task {} using th= e worker config", keyConverter.getClass(), id); + } else { + log.info("Set up the key converter {} for task {} using th= e connector config", keyConverter.getClass(), id); } if (valueConverter =3D=3D null) { valueConverter =3D plugins.newConverter(config, WorkerConf= ig.VALUE_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + log.info("Set up the value converter {} for task {} using = the worker config", valueConverter.getClass(), id); + } else { + log.info("Set up the value converter {} for task {} using = the connector config", valueConverter.getClass(), id); } if (headerConverter =3D=3D null) { headerConverter =3D plugins.newHeaderConverter(config, Wor= kerConfig.HEADER_CONVERTER_CLASS_CONFIG, ClassLoaderUsage.PLUGINS); + log.info("Set up the header converter {} for task {} using= the worker config", headerConverter.getClass(), id); + } else { + log.info("Set up the header converter {} for task {} using= the connector config", headerConverter.getClass(), id); } =20 workerTask =3D buildWorkerTask(connConfig, id, task, statusLis= tener, initialState, keyConverter, valueConverter, headerConverter, connect= orLoader); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime= /isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/co= nnect/runtime/isolation/Plugins.java index 94f27717080..f4cd2ba14b0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolat= ion/Plugins.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolat= ion/Plugins.java @@ -234,6 +234,8 @@ public Converter newConverter(AbstractConfig config, St= ring classPropertyName, C // Configure the Converter using only the old configuration mechan= ism ... String configPrefix =3D classPropertyName + "."; Map converterConfig =3D config.originalsWithPrefix= (configPrefix); + log.debug("Configuring the {} converter with configuration:{}{}", + isKeyConverter ? "key" : "value", System.lineSeparator()= , converterConfig); plugin.configure(converterConfig, isKeyConverter); return plugin; } @@ -249,20 +251,21 @@ public Converter newConverter(AbstractConfig config, = String classPropertyName, C * @throws ConnectException if the {@link HeaderConverter} implementat= ion class could not be found */ public HeaderConverter newHeaderConverter(AbstractConfig config, Strin= g classPropertyName, ClassLoaderUsage classLoaderUsage) { - if (!config.originals().containsKey(classPropertyName)) { - // This configuration does not define the header converter via= the specified property name - return null; - } HeaderConverter plugin =3D null; switch (classLoaderUsage) { case CURRENT_CLASSLOADER: + if (!config.originals().containsKey(classPropertyName)) { + // This connector configuration does not define the he= ader converter via the specified property name + return null; + } // Attempt to load first with the current classloader, and= plugins as a fallback. // Note: we can't use config.getConfiguredInstance because= we have to remove the property prefixes // before calling config(...) plugin =3D getInstance(config, classPropertyName, HeaderCo= nverter.class); break; case PLUGINS: - // Attempt to load with the plugin class loader, which use= s the current classloader as a fallback + // Attempt to load with the plugin class loader, which use= s the current classloader as a fallback. + // Note that there will always be at least a default heade= r converter for the worker String converterClassOrAlias =3D config.getClass(classProp= ertyName).getName(); Class klass; try { @@ -288,6 +291,7 @@ public HeaderConverter newHeaderConverter(AbstractConfi= g config, String classPro String configPrefix =3D classPropertyName + "."; Map converterConfig =3D config.originalsWithPrefix= (configPrefix); converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEA= DER.getName()); + log.debug("Configuring the header converter with configuration:{}{= }", System.lineSeparator(), converterConfig); plugin.configure(converterConfig); return plugin; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime= /isolation/PluginsTest.java b/connect/runtime/src/test/java/org/apache/kafk= a/connect/runtime/isolation/PluginsTest.java index 6de92eedd34..a9a944fa360 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolat= ion/PluginsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolat= ion/PluginsTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.connect.storage.ConverterConfig; import org.apache.kafka.connect.storage.ConverterType; import org.apache.kafka.connect.storage.HeaderConverter; +import org.apache.kafka.connect.storage.SimpleHeaderConverter; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -39,18 +40,31 @@ =20 import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; =20 public class PluginsTest { =20 - private static Map props; + private static Map pluginProps; private static Plugins plugins; + private Map props; private AbstractConfig config; private TestConverter converter; private TestHeaderConverter headerConverter; =20 @BeforeClass public static void beforeAll() { - props =3D new HashMap<>(); + pluginProps =3D new HashMap<>(); + + // Set up the plugins to have no additional plugin directories. + // This won't allow us to test classpath isolation, but it will al= low us to test some of the utility methods. + pluginProps.put(WorkerConfig.PLUGIN_PATH_CONFIG, ""); + plugins =3D new Plugins(pluginProps); + } + + @Before + public void setup() { + props =3D new HashMap<>(pluginProps); props.put(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.c= lass.getName()); props.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter= .class.getName()); props.put("key.converter." + JsonConverterConfig.SCHEMAS_ENABLE_CO= NFIG, "true"); @@ -66,14 +80,10 @@ public static void beforeAll() { props.put(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, TestHeaderCo= nverter.class.getName()); props.put("header.converter.extra.config", "baz"); =20 - // Set up the plugins to have no additional plugin directories. - // This won't allow us to test classpath isolation, but it will al= low us to test some of the utility methods. - props.put(WorkerConfig.PLUGIN_PATH_CONFIG, ""); - plugins =3D new Plugins(props); + createConfig(); } =20 - @Before - public void setup() { + protected void createConfig() { this.config =3D new TestableWorkerConfig(props); } =20 @@ -104,11 +114,48 @@ public void shouldInstantiateAndConfigureInternalConv= erters() { } =20 @Test - public void shouldInstantiateAndConfigureHeaderConverter() { - instantiateAndConfigureHeaderConverter(WorkerConfig.HEADER_CONVERT= ER_CLASS_CONFIG); + public void shouldInstantiateAndConfigureExplicitlySetHeaderConverterW= ithCurrentClassLoader() { + assertNotNull(props.get(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG= )); + HeaderConverter headerConverter =3D plugins.newHeaderConverter(con= fig, + Worke= rConfig.HEADER_CONVERTER_CLASS_CONFIG, + Class= LoaderUsage.CURRENT_CLASSLOADER); + assertNotNull(headerConverter); + assertTrue(headerConverter instanceof TestHeaderConverter); + this.headerConverter =3D (TestHeaderConverter) headerConverter; + + // Validate extra configs got passed through to overridden convert= ers + assertConverterType(ConverterType.HEADER, this.headerConverter.con= figs); + assertEquals("baz", this.headerConverter.configs.get("extra.config= ")); + + headerConverter =3D plugins.newHeaderConverter(config, + WorkerConfig.HEADER_C= ONVERTER_CLASS_CONFIG, + ClassLoaderUsage.PLUG= INS); + assertNotNull(headerConverter); + assertTrue(headerConverter instanceof TestHeaderConverter); + this.headerConverter =3D (TestHeaderConverter) headerConverter; + // Validate extra configs got passed through to overridden convert= ers - assertConverterType(ConverterType.HEADER, headerConverter.configs)= ; - assertEquals("baz", headerConverter.configs.get("extra.config")); + assertConverterType(ConverterType.HEADER, this.headerConverter.con= figs); + assertEquals("baz", this.headerConverter.configs.get("extra.config= ")); + } + + @Test + public void shouldInstantiateAndConfigureDefaultHeaderConverter() { + props.remove(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG); + createConfig(); + + // Because it's not explicitly set on the supplied configuration, = the logic to use the current classloader for the connector + // will exit immediately, and so this method always returns null + HeaderConverter headerConverter =3D plugins.newHeaderConverter(con= fig, + Worke= rConfig.HEADER_CONVERTER_CLASS_CONFIG, + Class= LoaderUsage.CURRENT_CLASSLOADER); + assertNull(headerConverter); + // But we should always find it (or the worker's default) when usi= ng the plugins classloader ... + headerConverter =3D plugins.newHeaderConverter(config, + WorkerConfig.HEADER_C= ONVERTER_CLASS_CONFIG, + ClassLoaderUsage.PLUG= INS); + assertNotNull(headerConverter); + assertTrue(headerConverter instanceof SimpleHeaderConverter); } =20 protected void instantiateAndConfigureConverter(String configPropName,= ClassLoaderUsage classLoaderUsage) { @@ -116,11 +163,6 @@ protected void instantiateAndConfigureConverter(String= configPropName, ClassLoad assertNotNull(converter); } =20 - protected void instantiateAndConfigureHeaderConverter(String configPro= pName) { - headerConverter =3D (TestHeaderConverter) plugins.newHeaderConvert= er(config, configPropName, ClassLoaderUsage.CURRENT_CLASSLOADER); - assertNotNull(headerConverter); - } - protected void assertConverterType(ConverterType type, Map = props) { assertEquals(type.getName(), props.get(ConverterConfig.TYPE_CONFIG= )); } =20 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. =20 For queries about this service, please contact Infrastructure at: users@infra.apache.org > Kafka Connect Header Null Pointer Exception > ------------------------------------------- > > Key: KAFKA-6728 > URL: https://issues.apache.org/jira/browse/KAFKA-6728 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 1.1.0 > Environment: Linux Mint > Reporter: Philippe Hong > Assignee: Randall Hauch > Priority: Critical > Fix For: 1.2.0, 1.1.1 > > > I am trying to use the newly released Kafka Connect that supports headers= by using the standalone connector to write to a text file (so in this case= I am only using the sink component) > I am sadly greeted by a NullPointerException : > {noformat} > ERROR WorkerSinkTask{id=3Dlocal-file-sink-0} Task threw an uncaught and u= nrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172) > java.lang.NullPointerException > =C2=A0=C2=A0 =C2=A0at org.apache.kafka.connect.runtime.WorkerSinkTask.con= vertHeadersFor(WorkerSinkTask.java:501) > =C2=A0=C2=A0 =C2=A0at org.apache.kafka.connect.runtime.WorkerSinkTask.con= vertMessages(WorkerSinkTask.java:469) > =C2=A0=C2=A0 =C2=A0at org.apache.kafka.connect.runtime.WorkerSinkTask.pol= l(WorkerSinkTask.java:301) > =C2=A0=C2=A0 =C2=A0at org.apache.kafka.connect.runtime.WorkerSinkTask.ite= ration(WorkerSinkTask.java:205) > =C2=A0=C2=A0 =C2=A0at org.apache.kafka.connect.runtime.WorkerSinkTask.exe= cute(WorkerSinkTask.java:173) > =C2=A0=C2=A0 =C2=A0at org.apache.kafka.connect.runtime.WorkerTask.doRun(W= orkerTask.java:170) > =C2=A0=C2=A0 =C2=A0at org.apache.kafka.connect.runtime.WorkerTask.run(Wor= kerTask.java:214) > =C2=A0=C2=A0 =C2=A0at java.util.concurrent.Executors$RunnableAdapter.call= (Executors.java:511) > =C2=A0=C2=A0 =C2=A0at java.util.concurrent.FutureTask.run(FutureTask.java= :266) > =C2=A0=C2=A0 =C2=A0at java.util.concurrent.ThreadPoolExecutor.runWorker(T= hreadPoolExecutor.java:1149) > =C2=A0=C2=A0 =C2=A0at java.util.concurrent.ThreadPoolExecutor$Worker.run(= ThreadPoolExecutor.java:624) > =C2=A0=C2=A0 =C2=A0at java.lang.Thread.run(Thread.java:748) > {noformat} > I launched zookeeper and kafka 1.1.0 locally and sent a ProducerRecord[St= ring, Array[Byte]] using a KafkaProducer[String, Array[Byte]] with a header= that has a key and value. > I can read the record with a console consumer as well as using a KafkaCon= sumer (where in this case I can see the content of the header of the record= I sent previously) so no problem here. > I only made two changes to the kafka configuration: > =C2=A0=C2=A0=C2=A0 - I used the StringConverter for the key and the Byte= ArrayConverter for the value.=20 > =C2=A0=C2=A0=C2=A0 - I also changed the topic where the sink would conne= ct to. > If I forgot something please tell me so as it is the first time I am crea= ting an issue on Jira. -- This message was sent by Atlassian JIRA (v7.6.3#76005)