storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <stigdoess...@gmail.com>
Subject Re: Upgrading Kafka from 0.8.x to 0.10.x with Storm 1.1.x
Date Thu, 06 Jun 2019 14:38:43 GMT
I don't see anything wrong with the code you posted. Could you post the
full AntConfigurableTopology code? It's hard to tell from that snippet what
your topology setup looks like.

Den tor. 6. jun. 2019 kl. 12.33 skrev aurelien violette <
aurelien.v@webgroup-limited.com>:

> Hello,
>
> I was sucessfully using Kafka 0.8.x in a storm topology based on Storm
> Crawler. I needed though to upgrade to Kafka 0.10.x
>
> I tried to simulate my enviroment using a Docker environment :
> Storm 1.1 and Kafka 2.11-0.10.2.2
>
> Unfortunately, at the deploy, I get an error on :
>
> Caused by: java.lang.IllegalArgumentException: Invalid lambda
> deserialization
>        at
> c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24)
> ~[stormjar.jar:?]
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_212]
>        at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_212]
>        at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_212]
>        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
>        at
> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
> ~[?:1.8.0_212]
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_212]
>        at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_212]
>        at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_212]
>        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
>        at
> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
> ~[?:1.8.0_212]
>        at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
> ~[?:1.8.0_212]
>        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
> ~[?:1.8.0_212]
>        at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253)
> ~[storm-core-1.1.3.jar:1.1.3]
>
> Where my ConfigurableTopology is only gathering some config utils for
> building topology. In particular, it defines the SpoutConfig.
>
> /**
>  * Get a spout config by topic, define the scheme.
>  * @param topic
>  * @param deserializer deserializer to use from bytes to value.
>  * @return
>  */
> KafkaSpoutConfig getSpoutConfig(
>         String topic,
>         Object deserializer)
> {
>
>   String topic = (String) this.getConf().get(topic);
>
>   // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
>   KafkaSpoutConfig kafkaConfig = KafkaSpoutConfig.builder(bootstrapServers, topic)
>           // Consummer will start from the latest uncommitted offset, or the earliest
offset if any.
>           .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>           // Setup serializers from bytes to string.
>           // careful the key is dropped from here.
>           .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringSerializer")
>           .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
>           // Setup deserialization to fields : (String key, String json value) =>
(String Key, Unpacked object from json)
>           // .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> new Values(r.value()),
new Fields("FieldNames")))
>           .build();
>
>
>   return kafkaConfig;
>
> }
>
> I don't understand the origin of the issue. My Maven sets java to 1.8.
> Any idea on this issue ?
>
> Actually, I wanted to set up a RecordTranslator to handle the transition
> from the input JSON String to my deserialized JSON object. Deserialization
> is handled by Gson.
>
> Thank you for your help,
> BR,
> Aurelien
>
>
> --
> BR,
> Aurelien Violette
>

Mime
View raw message