storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aurelien violette <aurelie...@webgroup-limited.com>
Subject Upgrading Kafka from 0.8.x to 0.10.x with Storm 1.1.x
Date Thu, 06 Jun 2019 10:32:44 GMT
Hello,

I was sucessfully using Kafka 0.8.x in a storm topology based on Storm
Crawler. I needed though to upgrade to Kafka 0.10.x

I tried to simulate my enviroment using a Docker environment :
Storm 1.1 and Kafka 2.11-0.10.2.2

Unfortunately, at the deploy, I get an error on :

Caused by: java.lang.IllegalArgumentException: Invalid lambda
deserialization
       at
c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24)
~[stormjar.jar:?]
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_212]
       at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_212]
       at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_212]
       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
       at
java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
~[?:1.8.0_212]
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_212]
       at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_212]
       at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_212]
       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
       at
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
~[?:1.8.0_212]
       at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_212]
       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
~[?:1.8.0_212]
       at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253)
~[storm-core-1.1.3.jar:1.1.3]

Where my ConfigurableTopology is only gathering some config utils for
building topology. In particular, it defines the SpoutConfig.

/**
 * Get a spout config by topic, define the scheme.
 * @param topic
 * @param deserializer deserializer to use from bytes to value.
 * @return
 */
KafkaSpoutConfig getSpoutConfig(
        String topic,
        Object deserializer)
{

  String topic = (String) this.getConf().get(topic);

  // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
  KafkaSpoutConfig kafkaConfig =
KafkaSpoutConfig.builder(bootstrapServers, topic)
          // Consummer will start from the latest uncommitted offset,
or the earliest offset if any.
          .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
          // Setup serializers from bytes to string.
          // careful the key is dropped from here.
          .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
          .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
          // Setup deserialization to fields : (String key, String
json value) => (String Key, Unpacked object from json)
          // .setRecordTranslator(new ByTopicRecordTranslator<>((r) ->
new Values(r.value()), new Fields("FieldNames")))
          .build();


  return kafkaConfig;

}

I don't understand the origin of the issue. My Maven sets java to 1.8.
Any idea on this issue ?

Actually, I wanted to set up a RecordTranslator to handle the transition
from the input JSON String to my deserialized JSON object. Deserialization
is handled by Gson.

Thank you for your help,
BR,
Aurelien


-- 
BR,
Aurelien Violette

Mime
View raw message