From user-return-14450-archive-asf-public=cust-asf.ponee.io@storm.apache.org Thu Jun 6 14:39:09 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 0221C18062B for ; Thu, 6 Jun 2019 16:39:08 +0200 (CEST) Received: (qmail 24836 invoked by uid 500); 6 Jun 2019 14:39:06 -0000 Mailing-List: contact user-help@storm.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@storm.apache.org Delivered-To: mailing list user@storm.apache.org Received: (qmail 24826 invoked by uid 99); 6 Jun 2019 14:39:05 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 06 Jun 2019 14:39:05 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 68284C005E for ; Thu, 6 Jun 2019 14:39:05 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.801 X-Spam-Level: * X-Spam-Status: No, score=1.801 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 4OASjYPYpMY4 for ; Thu, 6 Jun 2019 14:39:01 +0000 (UTC) Received: from mail-oi1-f170.google.com (mail-oi1-f170.google.com [209.85.167.170]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 0F2B65F354 for ; Thu, 6 Jun 2019 14:39:01 +0000 (UTC) Received: by mail-oi1-f170.google.com with SMTP id t76so1734056oih.4 for ; Thu, 06 Jun 2019 07:39:00 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:references:in-reply-to:from:date:message-id:subject:to; bh=PB3/Kmw0v2yq3PoFH5g00gdNJ61TRbOqDyNkkokMBGA=; b=iQtj41Fo6JkJUkSL2PnwZni1Gx1VXyVm1n3XFD7e9iMo4VJ6DMPTXsOwbLwmuCWDG2 a9iycKG8Bq4s2RP11cHLSJJ4aYR5wrqT2SOce4dcTFRIIYSV9GVGEGlfUjZKw7YmJXOE And0jsi4J4zLGytJS0UsTDFN/F3/z/2txtFBHcUUOPYoh31QN+1arUswVNqygt0evlMj z8+284JifuxLAzinPE/eNrcS4V3Kp1sfbNbbKz8HGXwbKWRdtgKB2I5trvYqi6B/+GxN WNH76iRfzQw+9uOQRoKtvzTad68ba+QI+2Hz7Va0gVN2Shx9kAKsTHeltSaV3Hg6XRXf 66SA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to; bh=PB3/Kmw0v2yq3PoFH5g00gdNJ61TRbOqDyNkkokMBGA=; b=JhbAO7gFCbMKmCTGKYQIXgtxHrHfd8Oj9YUq9A0zEAswprwJQyrCJUAhGmwLYI9gs+ dFyllhfk/bD0UxxrdNl+GKW90HuE0CZDxUkaORw9EjErPX6Hl52UnPY2c6k+LGsh76z+ Q7vTSWCg6b/EAn0fZ4WraUUl+opcVdjX/x55uwlSmZCh+oO7etkm/WzNJm+fkIuagb0h 2xGtbHZZ9q+mg+KifmfBzKpEi/+Q1j/REaW5h7SosIOpwo9jc0RUHLnYgexuX4yyiPkF xVR+LtGFH2amsLavjbkRZH8EygrFSCd0D5tkboMqqyLQwsMR5yJuTsUc5pGbhd17EXj1 kvjQ== X-Gm-Message-State: APjAAAWfwSNOKzhwUzJvWdwE3eLtd80SWYHY38Q3ngHj0UigeXAyaNg/ N2dx72d1GqbSFHiPTEzaOGbnxJYazfRTmY7eZ9Eagg== X-Google-Smtp-Source: APXvYqzz0LomS10xpnkpF4zjDTZo0cvmlIIed7rheBtY8QcJZf/8fgx0d94vx/eLJjqB0waePivKZdOdiiood46dDIo= X-Received: by 2002:aca:6c6:: with SMTP id 189mr197982oig.167.1559831933817; Thu, 06 Jun 2019 07:38:53 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: =?UTF-8?Q?Stig_Rohde_D=C3=B8ssing?= Date: Thu, 6 Jun 2019 16:38:43 +0200 Message-ID: Subject: Re: Upgrading Kafka from 0.8.x to 0.10.x with Storm 1.1.x To: user@storm.apache.org Content-Type: multipart/alternative; boundary="000000000000531e5a058aa8ace0" --000000000000531e5a058aa8ace0 Content-Type: text/plain; charset="UTF-8" I don't see anything wrong with the code you posted. Could you post the full AntConfigurableTopology code? It's hard to tell from that snippet what your topology setup looks like. Den tor. 6. jun. 2019 kl. 12.33 skrev aurelien violette < aurelien.v@webgroup-limited.com>: > Hello, > > I was sucessfully using Kafka 0.8.x in a storm topology based on Storm > Crawler. I needed though to upgrade to Kafka 0.10.x > > I tried to simulate my enviroment using a Docker environment : > Storm 1.1 and Kafka 2.11-0.10.2.2 > > Unfortunately, at the deploy, I get an error on : > > Caused by: java.lang.IllegalArgumentException: Invalid lambda > deserialization > at > c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24) > ~[stormjar.jar:?] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_212] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_212] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_212] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212] > at > java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) > ~[?:1.8.0_212] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_212] > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_212] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_212] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212] > at > java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) > ~[?:1.8.0_212] > at > java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) > ~[?:1.8.0_212] > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) > ~[?:1.8.0_212] > at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253) > ~[storm-core-1.1.3.jar:1.1.3] > > Where my ConfigurableTopology is only gathering some config utils for > building topology. In particular, it defines the SpoutConfig. > > /** > * Get a spout config by topic, define the scheme. > * @param topic > * @param deserializer deserializer to use from bytes to value. > * @return > */ > KafkaSpoutConfig getSpoutConfig( > String topic, > Object deserializer) > { > > String topic = (String) this.getConf().get(topic); > > // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder > KafkaSpoutConfig kafkaConfig = KafkaSpoutConfig.builder(bootstrapServers, topic) > // Consummer will start from the latest uncommitted offset, or the earliest offset if any. > .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST) > // Setup serializers from bytes to string. > // careful the key is dropped from here. > .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") > .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer") > // Setup deserialization to fields : (String key, String json value) => (String Key, Unpacked object from json) > // .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> new Values(r.value()), new Fields("FieldNames"))) > .build(); > > > return kafkaConfig; > > } > > I don't understand the origin of the issue. My Maven sets java to 1.8. > Any idea on this issue ? > > Actually, I wanted to set up a RecordTranslator to handle the transition > from the input JSON String to my deserialized JSON object. Deserialization > is handled by Gson. > > Thank you for your help, > BR, > Aurelien > > > -- > BR, > Aurelien Violette > --000000000000531e5a058aa8ace0 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
I don't see anything wrong with the code you posted. C= ould you post the full AntConfigurableTopology code? It's hard to tell = from that snippet what your topology setup looks like.

Den tor. 6. jun. = 2019 kl. 12.33 skrev aurelien violette <aurelien.v@webgroup-limited.com>:
Hello,=C2=A0

I was sucessf= ully using Kafka 0.8.x in a storm topology based on Storm Crawler. I needed= though to upgrade to Kafka 0.10.x

I tried to simulate my enviroment using a = Docker environment :=C2=A0
Storm 1.1 and Kafka=C2=A02.11-0.10.2.2

Unfortunately, at the deploy, I get an error on :=C2=A0

Caused b= y: java.lang.IllegalArgumentException: Invalid lambda deserialization
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at c.a.b.storm.topolo= gies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:= 24) ~[stormjar.jar:?]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at sun.reflect.NativeMethodA= ccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at sun.reflect.NativeMethodA= ccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at sun.reflect.DelegatingMet= hodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.lang.reflect.Method.= invoke(Method.java:498) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.lang.invoke.Serializ= edLambda.readResolve(SerializedLambda.java:230) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at sun.reflect.NativeMethodA= ccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at sun.reflect.NativeMethodA= ccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at sun.reflect.DelegatingMet= hodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.lang.reflect.Method.= invoke(Method.java:498) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectStreamClass= .invokeReadResolve(ObjectStreamClass.java:1260) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readOrdinaryObject(ObjectInputStream.java:2078) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .defaultReadFields(ObjectInputStream.java:2287) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readSerialData(ObjectInputStream.java:2211) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readOrdinaryObject(ObjectInputStream.java:2069) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .defaultReadFields(ObjectInputStream.java:2287) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readSerialData(ObjectInputStream.java:2211) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readOrdinaryObject(ObjectInputStream.java:2069) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .defaultReadFields(ObjectInputStream.java:2287) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readSerialData(ObjectInputStream.java:2211) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readOrdinaryObject(ObjectInputStream.java:2069) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at java.io.ObjectInputStream= .readObject(ObjectInputStream.java:431) ~[?:1.8.0_212]
=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0=C2=A0at org.apache.storm.utils.Ut= ils.javaDeserialize(Utils.java:253) ~[storm-core-1.1.3.jar:1.1.3]

W= here my ConfigurableTopology is only gathering some config utils for buildi= ng topology. In particular, it defines the SpoutConfig.=C2=A0

/**
* Get a spout config by topi= c, define the scheme.
* @param topic
* @param deserializer deserializer to use = from bytes to value.
* @return
*/
KafkaSpoutConfig getSpoutConfig(<= br> String topic,
Object deserializer)
{

Stri= ng topic= =3D (String) th= is.getConf().get(topic);

// With Kafka 0.10.x, we use the KafkaSpoutConfig.bu= ilder
= KafkaSpoutConfig kafkaConfig =3D KafkaSpoutConfig.builder(bootstrapServers, topic)
// Consummer will start from the lates= t uncommitted offset, or the earliest offset if any.
.setFirstPol= lOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST= )
// Setup serializers from bytes to string.
// careful the key is drop= ped from here.
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG= , "org.apa= che.kafka.common.serialization.StringSerializer")
= .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.seri= alization.StringSerializer")
// Setup deserialization to fields : (S= tring key, String json value) =3D> (String Key, Unpacked object from jso= n)
= // = .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> n= ew Values(r.value()), new Fields("FieldNames")))
.build();

return kafka= Config;
}

I don't understand the origin of the issue. = My Maven sets java to 1.8.
Any idea on this issue ?=C2=A0

Actually, I wanted to se= t up a RecordTranslator to handle the transition from the input JSON String= to my deserialized JSON object. Deserialization is handled by Gson.=C2=A0<= /div>

=
Tha= nk you for your help,
BR,
Aurelien


--
BR,
Aurelien Violette
--000000000000531e5a058aa8ace0--