storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aurelien violette <aurelie...@webgroup-limited.com>
Subject Re: Upgrading Kafka from 0.8.x to 0.10.x with Storm 1.1.x
Date Thu, 06 Jun 2019 15:25:22 GMT
Hey,

Well, here it is. It extends ConfigurableTopology from storm-crawler. And
I've tried many simplifications to get rid of any lambda potential code.
The only place I see it, is in the RecordTranslator defaults.

Thank you for you ideas if any about where or what to search for.


import com.digitalpebble.stormcrawler.ConfigurableTopology;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.KafkaSpoutConfig;

import java.io.Serializable;

public abstract class AntConfigurableTopology extends
ConfigurableTopology implements Serializable {

  String topologyName;
  // ZkHosts zkHosts;
  String bootstrapServers = "localhost:9092"; // Default local configuration
  int parallel = 1;

  void init(String topologyId) {
    topologyName = (String) this.getConf().get(topologyId);
    bootstrapServers = (String) this.getConf().get("metadata.broker");
    final Integer parallelismHint = (Integer)
this.getConf().getOrDefault("parallelism", 1);
    parallel = parallelismHint;
    if (!this.getConf().containsKey("zkhost")) {
      this.getConf().put("zkhost", "localhost:2181");
    }
  }


  /**
   * Get a spout config by topic
   * @param topic
   * @return
   */
  KafkaSpoutConfig getSpoutConfig(String topic) {
    return getSpoutConfig(topic, null); //new
AntmarkTupleBuilder(this.getConf()));
  }


  /**
   * Get a spout config by topic, define the scheme.
   * @param topic
   * @param deserializer deserializer to use from bytes to value.
   * @return
   */
  KafkaSpoutConfig getSpoutConfig(
          String topic,
          Object deserializer)
  {

    String addWeblinkTopic = (String) this.getConf().get(topic);

    // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
    KafkaSpoutConfig kafkaAddWeblinkConfig =
KafkaSpoutConfig.builder(bootstrapServers, addWeblinkTopic)
            // Consummer will start from the latest uncommitted
offset, or the earliest offset if any.
            .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
            // Setup serializers from bytes to string.
            // careful the key is dropped from here.
            .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
            .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
            // Setup deserialization to fields : (String key, String
json value) => (String Key, Unpacked object from json)
            // .setRecordTranslator(new
ByTopicRecordTranslator<>((TargetInterface Serializable)(r) -> new
Values(r.value()), new Fields(FieldNames.ANTMARK)))
            //.setRecordTranslator(deserializer, deserializer.getFields())
            .build();


    return kafkaAddWeblinkConfig;
  }
}


}


Le jeu. 6 juin 2019 à 16:39, Stig Rohde Døssing <stigdoessing@gmail.com> a
écrit :

> I don't see anything wrong with the code you posted. Could you post the
> full AntConfigurableTopology code? It's hard to tell from that snippet what
> your topology setup looks like.
>
> Den tor. 6. jun. 2019 kl. 12.33 skrev aurelien violette <
> aurelien.v@webgroup-limited.com>:
>
>> Hello,
>>
>> I was sucessfully using Kafka 0.8.x in a storm topology based on Storm
>> Crawler. I needed though to upgrade to Kafka 0.10.x
>>
>> I tried to simulate my enviroment using a Docker environment :
>> Storm 1.1 and Kafka 2.11-0.10.2.2
>>
>> Unfortunately, at the deploy, I get an error on :
>>
>> Caused by: java.lang.IllegalArgumentException: Invalid lambda
>> deserialization
>>        at
>> c.a.b.storm.topologies.ConfigurableTopology.$deserializeLambda$(AntConfigurableTopology.java:24)
>> ~[stormjar.jar:?]
>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_212]
>>        at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_212]
>>        at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_212]
>>        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
>>        at
>> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
>> ~[?:1.8.0_212]
>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> ~[?:1.8.0_212]
>>        at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> ~[?:1.8.0_212]
>>        at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> ~[?:1.8.0_212]
>>        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
>>        at
>> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1260)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2078)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>> ~[?:1.8.0_212]
>>        at
>> java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
>> ~[?:1.8.0_212]
>>        at org.apache.storm.utils.Utils.javaDeserialize(Utils.java:253)
>> ~[storm-core-1.1.3.jar:1.1.3]
>>
>> Where my ConfigurableTopology is only gathering some config utils for
>> building topology. In particular, it defines the SpoutConfig.
>>
>> /**
>>  * Get a spout config by topic, define the scheme.
>>  * @param topic
>>  * @param deserializer deserializer to use from bytes to value.
>>  * @return
>>  */
>> KafkaSpoutConfig getSpoutConfig(
>>         String topic,
>>         Object deserializer)
>> {
>>
>>   String topic = (String) this.getConf().get(topic);
>>
>>   // With Kafka 0.10.x, we use the KafkaSpoutConfig.builder
>>   KafkaSpoutConfig kafkaConfig = KafkaSpoutConfig.builder(bootstrapServers, topic)
>>           // Consummer will start from the latest uncommitted offset, or the earliest
offset if any.
>>           .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
>>           // Setup serializers from bytes to string.
>>           // careful the key is dropped from here.
>>           .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,  "org.apache.kafka.common.serialization.StringSerializer")
>>           .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
>>           // Setup deserialization to fields : (String key, String json value) =>
(String Key, Unpacked object from json)
>>           // .setRecordTranslator(new ByTopicRecordTranslator<>((r) -> new
Values(r.value()), new Fields("FieldNames")))
>>           .build();
>>
>>
>>   return kafkaConfig;
>>
>> }
>>
>> I don't understand the origin of the issue. My Maven sets java to 1.8.
>> Any idea on this issue ?
>>
>> Actually, I wanted to set up a RecordTranslator to handle the transition
>> from the input JSON String to my deserialized JSON object. Deserialization
>> is handled by Gson.
>>
>> Thank you for your help,
>> BR,
>> Aurelien
>>
>>
>> --
>> BR,
>> Aurelien Violette
>>
>

-- 
BR,
Aurelien Violette

Mime
View raw message