Hey,
Well, here it is. It extends ConfigurableTopology from storm-crawler. And
I've tried many simplifications to get rid of any lambda potential code.
The only place I see it, is in the RecordTranslator defaults.
Thank you for you ideas if any about where or what to search for.
import com.digitalpebble.stormcrawler.ConfigurableTopology;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;
import java.io.Serializable;
public abstract class AntConfigurableTopology extends
ConfigurableTopology implements Serializable {
String topologyName;
// ZkHosts zkHosts;
String bootstrapServers = "localhost:9092"; // Default local configuration
int parallel = 1;
void init(String topologyId) {
topologyName = (String) this.getConf().get(topologyId);
bootstrapServers = (String) this.getConf().get("metadata.broker");
final Integer parallelismHint = (Integer)
this.getConf().getOrDefault("parallelism", 1);
parallel = parallelismHint;
if (!this.getConf().containsKey("zkhost")) {
this.getConf().put("zkhost", "localhost:2181");
}
}
/**
* Get a spout config by topic
* @param topic
* @return
*/
KafkaSpoutConfig getSpoutConfig(String topic) {
return getSpoutConfig(topic, null); //new
AntmarkTupleBuilder(this.getConf()));
}
/**
* Get a spout config by topic, define the scheme.
* @param topic
* @param deserializer deserializer to use from bytes to value.
* @return
*/
KafkaSpoutConfig getSpoutConfig(
String topic,
Object deserializer)
{
String addWeblinkTopic = (String) this.getConf().get(topic);
// With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
KafkaSpoutConfig kafkaAddWeblinkConfig =
KafkaSpoutConfig.builder(bootstrapServers, addWeblinkTopic)
// Consummer will start from the latest uncommitted
offset, or the earliest offset if any.
.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
// Setup serializers from bytes to string.
// careful the key is dropped from here.
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
// Setup deserialization to fields : (String key, String
json value) => (String Key, Unpacked object from json)
// .setRecordTranslator(new
ByTopicRecordTranslator<>((TargetInterface Serializable)(r) -> new
Values(r.value()), new Fields(FieldNames.ANTMARK)))
//.setRecordTranslator(deserializer, deserializer.getFields())
.build();
return kafkaAddWeblinkConfig;
}
}
}
Le jeu. 6 juin 2019 à 16:39, Stig Rohde Døssing <stigdoessing@gmail.com> a
écrit :
> I don't see anything wrong with the code you posted. Could you post the
> full AntConfigurableTopology code? It's hard to tell from that snippet what
> your topology setup looks like.
>
> Den tor. 6. jun. 2019 kl. 12.33 skrev aurelien violette <
> aurelien.v@webgroup-limited.com>:
>
>> Hello,
>>
>> I was sucessfully using Kafka 0.8.x in a storm topology based on Storm
>> Crawler. I needed though to upgrade to Kafka 0.10.x
>>
>> I tried to simulate my enviroment using a Docker environment :
>> Storm 1.1 and Kafka 2.11-0.10.2.2
>>
>> Unfortunately, at the deploy, I get an error on :
>>
>> Caused by: java.lang.IllegalArgumentException: Invalid lambda
>> deserialization
>> at
>> c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24)
>> ~[stormjar.jar:?]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_212]
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_212]
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_212]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
>> at
>> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
>> ~[?:1.8.0_212]
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_212]
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_212]
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_212]
>> at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
>> at
>> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_212]
>> at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>> ~[?:1.8.0_212]
>> at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253)
>> ~[storm-core-1.1.3.jar:1.1.3]
>>
>> Where my ConfigurableTopology is only gathering some config utils for
>> building topology. In particular, it defines the SpoutConfig.
>>
>> /**
>> * Get a spout config by topic, define the scheme.
>> * @param topic
>> * @param deserializer deserializer to use from bytes to value.
>> * @return
>> */
>> KafkaSpoutConfig getSpoutConfig(
>> String topic,
>> Object deserializer)
>> {
>>
>> String topic = (String) this.getConf().get(topic);
>>
>> // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
>> KafkaSpoutConfig kafkaConfig = KafkaSpoutConfig.builder(bootstrapServers, topic)
>> // Consummer will start from the latest uncommitted offset, or the earliest
offset if any.
>> .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>> // Setup serializers from bytes to string.
>> // careful the key is dropped from here.
>> .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
>> .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
>> // Setup deserialization to fields : (String key, String json value) =>
(String Key, Unpacked object from json)
>> // .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> new
Values(r.value()), new Fields("FieldNames")))
>> .build();
>>
>>
>> return kafkaConfig;
>>
>> }
>>
>> I don't understand the origin of the issue. My Maven sets java to 1.8.
>> Any idea on this issue ?
>>
>> Actually, I wanted to set up a RecordTranslator to handle the transition
>> from the input JSON String to my deserialized JSON object. Deserialization
>> is handled by Gson.
>>
>> Thank you for your help,
>> BR,
>> Aurelien
>>
>>
>> --
>> BR,
>> Aurelien Violette
>>
>
--
BR,
Aurelien Violette
|