storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <stigdoess...@gmail.com>
Subject Re: Upgrading Kafka from 0.8.x to 0.10.x with Storm 1.1.x
Date Thu, 06 Jun 2019 19:16:21 GMT
I still don't see anything wrong.

Googling the error leads to
https://bugs.eclipse.org/bugs/show_bug.cgi?id=516620. Is it possible that
you're compiling on a different JDK than the one running the cluster?

A workaround would be to eliminate the lambda from ByTopicRecordTranslator,
by using an anonymous class instead?

Den tor. 6. jun. 2019 kl. 17.25 skrev aurelien violette <
aurelien.v@webgroup-limited.com>:

> Hey,
>
> Well, here it is. It extends ConfigurableTopology from storm-crawler. And
> I've tried many simplifications to get rid of any lambda potential code.
> The only place I see it, is in the RecordTranslator defaults.
>
> Thank you for you ideas if any about where or what to search for.
>
>
> import com.digitalpebble.stormcrawler.ConfigurableTopology;
> import org.apache.kafka.clients.consumer.ConsumerConfig;
> import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
> import org.apache.storm.kafka.spout.KafkaSpoutConfig;
>
> import java.io.Serializable;
>
> public abstract class AntConfigurableTopology extends ConfigurableTopology implements
Serializable {
>
>   String topologyName;
>   // ZkHosts zkHosts;
>   String bootstrapServers = "localhost:9092"; // Default local configuration
>   int parallel = 1;
>
>   void init(String topologyId) {
>     topologyName = (String) this.getConf().get(topologyId);
>     bootstrapServers = (String) this.getConf().get("metadata.broker");
>     final Integer parallelismHint = (Integer) this.getConf().getOrDefault("parallelism",
1);
>     parallel = parallelismHint;
>     if (!this.getConf().containsKey("zkhost")) {
>       this.getConf().put("zkhost", "localhost:2181");
>     }
>   }
>
>
>   /**
>    * Get a spout config by topic
>    * @param topic
>    * @return
>    */
>   KafkaSpoutConfig getSpoutConfig(String topic) {
>     return getSpoutConfig(topic, null); //new AntmarkTupleBuilder(this.getConf()));
>   }
>
>
>   /**
>    * Get a spout config by topic, define the scheme.
>    * @param topic
>    * @param deserializer deserializer to use from bytes to value.
>    * @return
>    */
>   KafkaSpoutConfig getSpoutConfig(
>           String topic,
>           Object deserializer)
>   {
>
>     String addWeblinkTopic = (String) this.getConf().get(topic);
>
>     // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
>     KafkaSpoutConfig kafkaAddWeblinkConfig = KafkaSpoutConfig.builder(bootstrapServers,
addWeblinkTopic)
>             // Consummer will start from the latest uncommitted offset, or the earliest
offset if any.
>             .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>             // Setup serializers from bytes to string.
>             // careful the key is dropped from here.
>             .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringSerializer")
>             .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
>             // Setup deserialization to fields : (String key, String json value) =>
(String Key, Unpacked object from json)
>             // .setRecordTranslator(new ByTopicRecordTranslator<>((TargetInterface
Serializable)(r) -> new Values(r.value()), new Fields(FieldNames.ANTMARK)))
>             //.setRecordTranslator(deserializer, deserializer.getFields())
>             .build();
>
>
>     return kafkaAddWeblinkConfig;
>   }
> }
>
>
> }
>
>
> Le jeu. 6 juin 2019 à 16:39, Stig Rohde Døssing <stigdoessing@gmail.com>
> a écrit :
>
>> I don't see anything wrong with the code you posted. Could you post the
>> full AntConfigurableTopology code? It's hard to tell from that snippet what
>> your topology setup looks like.
>>
>> Den tor. 6. jun. 2019 kl. 12.33 skrev aurelien violette <
>> aurelien.v@webgroup-limited.com>:
>>
>>> Hello,
>>>
>>> I was sucessfully using Kafka 0.8.x in a storm topology based on Storm
>>> Crawler. I needed though to upgrade to Kafka 0.10.x
>>>
>>> I tried to simulate my enviroment using a Docker environment :
>>> Storm 1.1 and Kafka 2.11-0.10.2.2
>>>
>>> Unfortunately, at the deploy, I get an error on :
>>>
>>> Caused by: java.lang.IllegalArgumentException: Invalid lambda
>>> deserialization
>>>        at
>>> c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24)
>>> ~[stormjar.jar:?]
>>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> ~[?:1.8.0_212]
>>>        at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> ~[?:1.8.0_212]
>>>        at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_212]
>>>        at java.lang.reflect.Method.invoke(Method.java:498)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
>>> ~[?:1.8.0_212]
>>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> ~[?:1.8.0_212]
>>>        at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> ~[?:1.8.0_212]
>>>        at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> ~[?:1.8.0_212]
>>>        at java.lang.reflect.Method.invoke(Method.java:498)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>> ~[?:1.8.0_212]
>>>        at
>>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>>> ~[?:1.8.0_212]
>>>        at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253)
>>> ~[storm-core-1.1.3.jar:1.1.3]
>>>
>>> Where my ConfigurableTopology is only gathering some config utils for
>>> building topology. In particular, it defines the SpoutConfig.
>>>
>>> /**
>>>  * Get a spout config by topic, define the scheme.
>>>  * @param topic
>>>  * @param deserializer deserializer to use from bytes to value.
>>>  * @return
>>>  */
>>> KafkaSpoutConfig getSpoutConfig(
>>>         String topic,
>>>         Object deserializer)
>>> {
>>>
>>>   String topic = (String) this.getConf().get(topic);
>>>
>>>   // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
>>>   KafkaSpoutConfig kafkaConfig = KafkaSpoutConfig.builder(bootstrapServers, topic)
>>>           // Consummer will start from the latest uncommitted offset, or the
earliest offset if any.
>>>           .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>>           // Setup serializers from bytes to string.
>>>           // careful the key is dropped from here.
>>>           .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringSerializer")
>>>           .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
>>>           // Setup deserialization to fields : (String key, String json value)
=> (String Key, Unpacked object from json)
>>>           // .setRecordTranslator(new ByTopicRecordTranslator<>((r) ->
new Values(r.value()), new Fields("FieldNames")))
>>>           .build();
>>>
>>>
>>>   return kafkaConfig;
>>>
>>> }
>>>
>>> I don't understand the origin of the issue. My Maven sets java to 1.8.
>>> Any idea on this issue ?
>>>
>>> Actually, I wanted to set up a RecordTranslator to handle the transition
>>> from the input JSON String to my deserialized JSON object. Deserialization
>>> is handled by Gson.
>>>
>>> Thank you for your help,
>>> BR,
>>> Aurelien
>>>
>>>
>>> --
>>> BR,
>>> Aurelien Violette
>>>
>>
>
> --
> BR,
> Aurelien Violette
>

Mime
View raw message