From user-return-14448-archive-asf-public=cust-asf.ponee.io@storm.apache.org Thu Jun 6 10:33:09 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6E4F418062B for ; Thu, 6 Jun 2019 12:33:09 +0200 (CEST) Received: (qmail 89655 invoked by uid 500); 6 Jun 2019 10:33:07 -0000 Mailing-List: contact user-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.apache.org Delivered-To: mailing list user@storm.apache.org Received: (qmail 89637 invoked by uid 99); 6 Jun 2019 10:33:07 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Jun 2019 10:33:07 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 7385B181022 for ; Thu, 6 Jun 2019 10:33:06 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.801 X-Spam-Level: * X-Spam-Status: No, score=1.801 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=webgroup-limited.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id GJOteTMo95lb for ; Thu, 6 Jun 2019 10:33:02 +0000 (UTC) Received: from mail-yw1-f45.google.com (mail-yw1-f45.google.com [209.85.161.45]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 7DE1F5FB07 for ; Thu, 6 Jun 2019 10:33:02 +0000 (UTC) Received: by mail-yw1-f45.google.com with SMTP id y185so651443ywy.8 for ; Thu, 06 Jun 2019 03:33:02 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=webgroup-limited.com; s=google; h=mime-version:from:date:message-id:subject:to; bh=jXzNBKUazoSyChPVnmFdBKKmLKvMQ15pA6/dwxorJ1c=; b=aG3pcwXr5Vk/g2rYM95iV9CeXkJ2Lpo4dGX9aU1Dz8Wjld/HFxNmKTTFWBgKPXDA8O g9Lh2aIsLkdcPft/kD8pCCUUZYLT4cnD0qoqJQCuUgGk7H0j9vlU6iMgMGMl+rup5Eos b1d5Cy3Am5AgXzusUT7ZyKwfkeibh/sI5UMAXuH6DP4uCyvkFWY+GYtG8uXGKji8nIsW zwYYiSZv6Gx1rNhmMXvcibqbfJZzP+AnfQRYd1CTLDJExQFC7T1vsO3AXthgRQs37Iij juEMUGosqWnpqYEqHe5PfXrzt1avoY6b5I7sGLgPqhwkuydoZy13qQq3j1B+wPdoCFNR vgag== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=jXzNBKUazoSyChPVnmFdBKKmLKvMQ15pA6/dwxorJ1c=; b=AG73GasADEB8j2ji+gJw90X4zALE9LOWTJbZL2A0cVviBvYyPRtzQuKAtwnFu641kt FpE3aT6BASqJ4VdRjy8U5D6eolttzc3LaVRlAufz3n4dPFPnh5JSRaFyzYvNeHkepfQY iFvMUHlOenDNJc4+o+taTOdeJkliXtWXUsF13zqySJuQ6YW4r7jr91VaSgVTHK8iD0gr Y1njkvBO+5XbDkc+n6he+I4sWXVcMener3o+YPYd9I53uXPrQzdmsl8mQr67yH+nL22m iTPd59CB9GPVicGHltJVzyDZxQVdhkGaNwKptQlPMYNHKvduZ6tsGBM1Gz6Pny1aChDQ AsdQ== X-Gm-Message-State: APjAAAXfNgGC805Nn0Oo+wPq4Ud0FK1k9IXqYpYTT/J2X3HLuE6B8UBc 7cauZZMl91ybGTdct4A6GbrV9nmiuWByHsCmTCJ56n7DH1A= X-Google-Smtp-Source: APXvYqyQBR2Lhy1o/w5s3kL/CpT4yy/J1hyTRWssO2In/wSbXYi9LSHGbixWUPTaJezkH2uxBZWiuFsdUoATrX5Anp0= X-Received: by 2002:a81:308b:: with SMTP id w133mr11169999yww.12.1559817175601; Thu, 06 Jun 2019 03:32:55 -0700 (PDT) MIME-Version: 1.0 From: aurelien violette Date: Thu, 6 Jun 2019 12:32:44 +0200 Message-ID: Subject: Upgrading Kafka from 0.8.x to 0.10.x with Storm 1.1.x To: user@storm.apache.org Content-Type: multipart/alternative; boundary="000000000000aab55d058aa53c72" --000000000000aab55d058aa53c72 Content-Type: text/plain; charset="UTF-8" Hello, I was sucessfully using Kafka 0.8.x in a storm topology based on Storm Crawler. I needed though to upgrade to Kafka 0.10.x I tried to simulate my enviroment using a Docker environment : Storm 1.1 and Kafka 2.11-0.10.2.2 Unfortunately, at the deploy, I get an error on : Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization at c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24) ~[stormjar.jar:?] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212] at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) ~[?:1.8.0_212] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212] at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260) ~[?:1.8.0_212] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) ~[?:1.8.0_212] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) ~[?:1.8.0_212] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) ~[?:1.8.0_212] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) ~[?:1.8.0_212] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) ~[?:1.8.0_212] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) ~[?:1.8.0_212] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) ~[?:1.8.0_212] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212] at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) ~[?:1.8.0_212] at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) ~[?:1.8.0_212] at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) ~[?:1.8.0_212] at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212] at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_212] at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253) ~[storm-core-1.1.3.jar:1.1.3] Where my ConfigurableTopology is only gathering some config utils for building topology. In particular, it defines the SpoutConfig. /** * Get a spout config by topic, define the scheme. * @param topic * @param deserializer deserializer to use from bytes to value. * @return */ KafkaSpoutConfig getSpoutConfig( String topic, Object deserializer) { String topic = (String) this.getConf().get(topic); // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder KafkaSpoutConfig kafkaConfig = KafkaSpoutConfig.builder(bootstrapServers, topic) // Consummer will start from the latest uncommitted offset, or the earliest offset if any. .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) // Setup serializers from bytes to string. // careful the key is dropped from here. .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") // Setup deserialization to fields : (String key, String json value) => (String Key, Unpacked object from json) // .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> new Values(r.value()), new Fields("FieldNames"))) .build(); return kafkaConfig; } I don't understand the origin of the issue. My Maven sets java to 1.8. Any idea on this issue ? Actually, I wanted to set up a RecordTranslator to handle the transition from the input JSON String to my deserialized JSON object. Deserialization is handled by Gson. Thank you for your help, BR, Aurelien -- BR, Aurelien Violette --000000000000aab55d058aa53c72 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hello,=C2=A0

I was sucessfully using Kafka 0.8.x in a storm topology b= ased on Storm Crawler. I needed though to upgrade to Kafka 0.10.x

I tried to s= imulate my enviroment using a Docker environment :=C2=A0
Storm 1.1 and Kafka=C2= =A0Unfortunately, at the deploy, I get = an error on :=C2=A0

Caused by: java.lang.IllegalArgumentException: Invali= d lambda deserialization
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at c.a.b.storm.topolo= gies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:= 24) ~[stormjar.jar:?]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at sun.reflect.NativeMethodA= ccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at sun.reflect.NativeMethodA= ccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at sun.reflect.DelegatingMet= hodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.lang.reflect.Method.= invoke(Method.java:498) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.lang.invoke.Serializ= edLambda.readResolve(SerializedLambda.java:230) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at sun.reflect.NativeMethodA= ccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at sun.reflect.NativeMethodA= ccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at sun.reflect.DelegatingMet= hodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.lang.reflect.Method.= invoke(Method.java:498) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectStreamClass= .invokeReadResolve(ObjectStreamClass.java:1260) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readOrdinaryObject(ObjectInputStream.java:2078) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .defaultReadFields(ObjectInputStream.java:2287) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readSerialData(ObjectInputStream.java:2211) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readOrdinaryObject(ObjectInputStream.java:2069) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .defaultReadFields(ObjectInputStream.java:2287) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readSerialData(ObjectInputStream.java:2211) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readOrdinaryObject(ObjectInputStream.java:2069) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .defaultReadFields(ObjectInputStream.java:2287) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readSerialData(ObjectInputStream.java:2211) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readOrdinaryObject(ObjectInputStream.java:2069) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readObject(ObjectInputStream.java:431) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org.apache.storm.utils.Ut= ils.javaDeserialize(Utils.java:253) ~[storm-core-1.1.3.jar:1.1.3]

W= here my ConfigurableTopology is only gathering some config utils for buildi= ng topology. In particular, it defines the SpoutConfig.=C2=A0

/**
* Get a spout config by topi= c, define the scheme.
* @param topic
* @param deserializer deserializer to use = from bytes to value.
* @return
*/
KafkaSpoutConfig getSpoutConfig(<= br> String topic,
Object deserializer)
{

Stri= ng topic= =3D (String) th= is.getConf().get(topic);

// With Kafka 0.10.x, we use the KafkaSpoutConfig.bu= ilder
= KafkaSpoutConfig kafkaConfig =3D KafkaSpoutConfig.builder(bootstrapServers, topic)
// Consummer will start from the lates= t uncommitted offset, or the earliest offset if any.
.setFirstPol= lOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST= )
// Setup serializers from bytes to string.
// careful the key is drop= ped from here.
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG= , "org.apa= che.kafka.common.serialization.StringSerializer")
= .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.seri= alization.StringSerializer")
// Setup deserialization to fields : (S= tring key, String json value) =3D> (String Key, Unpacked object from jso= n)
= // = .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> n= ew Values(r.value()), new Fields("FieldNames")))
.build();

return kafka= Config;
}

I don't understand the origin of the issue. = My Maven sets java to 1.8.
Any idea on this issue ?=C2=A0

Actually, I wanted to se= t up a RecordTranslator to handle the transition from the input JSON String= to my deserialized JSON object. Deserialization is handled by Gson.=C2=A0<= /div>

=
Tha= nk you for your help,
BR,
Aurelien


--
BR,
Aurelien Violette
--000000000000aab55d058aa53c72--