Hello, 

I was sucessfully using Kafka 0.8.x in a storm topology based on Storm Crawler. I needed though to upgrade to Kafka 0.10.x

I tried to simulate my enviroment using a Docker environment : 
Storm 1.1 and Kafka 2.11-0.10.2.2

Unfortunately, at the deploy, I get an error on : 

Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
       at c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24) ~[stormjar.jar:?]
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
       at java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230) ~[?:1.8.0_212]
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
       at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
       at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
       at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
       at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573) ~[?:1.8.0_212]
       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_212]
       at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253) ~[storm-core-1.1.3.jar:1.1.3]

Where my ConfigurableTopology is only gathering some config utils for building topology. In particular, it defines the SpoutConfig. 

/**
* Get a spout config by topic, define the scheme.
* @param topic
* @param deserializer deserializer to use from bytes to value.
* @return
*/
KafkaSpoutConfig getSpoutConfig(
String topic,
Object deserializer)
{

String topic = (String) this.getConf().get(topic);

// With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
KafkaSpoutConfig kafkaConfig = KafkaSpoutConfig.builder(bootstrapServers, topic)
// Consummer will start from the latest uncommitted offset, or the earliest offset if any.
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
// Setup serializers from bytes to string.
// careful the key is dropped from here.
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
// Setup deserialization to fields : (String key, String json value) => (String Key, Unpacked object from json)
// .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> new Values(r.value()), new Fields("FieldNames")))
.build();


return kafkaConfig;
}

I don't understand the origin of the issue. My Maven sets java to 1.8.
Any idea on this issue ? 

Actually, I wanted to set up a RecordTranslator to handle the transition from the input JSON String to my deserialized JSON object. Deserialization is handled by Gson. 

Thank you for your help,
BR,
Aurelien


--
BR,
Aurelien Violette