Hello,
I was sucessfully using Kafka 0.8.x in a storm topology based on Storm
Crawler. I needed though to upgrade to Kafka 0.10.x
I tried to simulate my enviroment using a Docker environment :
Storm 1.1 and Kafka 2.11-0.10.2.2
Unfortunately, at the deploy, I get an error on :
Caused by: java.lang.IllegalArgumentException: Invalid lambda
deserialization
at
c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24)
~[stormjar.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_212]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_212]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_212]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
at
java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
~[?:1.8.0_212]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_212]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_212]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_212]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
at
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
~[?:1.8.0_212]
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
~[?:1.8.0_212]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
~[?:1.8.0_212]
at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253)
~[storm-core-1.1.3.jar:1.1.3]
Where my ConfigurableTopology is only gathering some config utils for
building topology. In particular, it defines the SpoutConfig.
/**
* Get a spout config by topic, define the scheme.
* @param topic
* @param deserializer deserializer to use from bytes to value.
* @return
*/
KafkaSpoutConfig getSpoutConfig(
String topic,
Object deserializer)
{
String topic = (String) this.getConf().get(topic);
// With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
KafkaSpoutConfig kafkaConfig =
KafkaSpoutConfig.builder(bootstrapServers, topic)
// Consummer will start from the latest uncommitted offset,
or the earliest offset if any.
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
// Setup serializers from bytes to string.
// careful the key is dropped from here.
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
// Setup deserialization to fields : (String key, String
json value) => (String Key, Unpacked object from json)
// .setRecordTranslator(new ByTopicRecordTranslator<>((r) ->
new Values(r.value()), new Fields("FieldNames")))
.build();
return kafkaConfig;
}
I don't understand the origin of the issue. My Maven sets java to 1.8.
Any idea on this issue ?
Actually, I wanted to set up a RecordTranslator to handle the transition
from the input JSON String to my deserialized JSON object. Deserialization
is handled by Gson.
Thank you for your help,
BR,
Aurelien
--
BR,
Aurelien Violette
|