storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Jose <paul...@ugamsolutions.com>
Subject Flux 2.2.0
Date Mon, 21 Sep 2020 06:35:26 GMT
Hi,

My name is Paul. Previously I was using storm 0.10.0 and now have to upgrade to the latest
2.2.0.
I am using flux. But it isn't working. This is my config:


name: "AuditLogConsumerTopology"

components:

  - id: "kafkaConfigBuilder"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig$Builder"
    constructorArgs:
      # bootstrap servers
      - "localhost:9092"
      # topics
      - ["topic"]

  #Spout Config
  - id: "SpoutConfig"
    className: "org.apache.storm.kafka.spout.KafkaSpoutConfig"
    constructorArgs:
      # brokerHosts
      - ref: "kafkaConfigBuilder"
    properties:
      - name: "group.id"
        value: "test-group"
      - name: "key.deserializer"
        value: "org.apache.kafka.common.serialization.StringDeserializer"
      - name: "value.deserializer"
        value: "org.apache.kafka.common.serialization.StringDeserializer"


config:
  topology.workers: 1
  topology.max.spout.pending: 1000
  topology.acker.executors: 1
  topology.executor.send.buffer.size: 16384
  topology.executor.receive.buffer.size: 16384
  topology.transfer.buffer.size: 32
  zookeeperHost: "localhost"
  zookeeperBasePath: "platformaudit"
  kafka.broker.properties:
    #bootstrap.servers: "localhost:9092"
    metadata.broker.list: "localhost:9092"
    request.required.acks: "1"
  topic: "topic"


spouts:
  - id: "TestSpout"
    className: "org.apache.storm.kafka.spout.KafkaSpout"
    parallelism: 1
    constructorArgs:
      - ref: "SpoutConfig"


bolts:
  - id: "TestBolt"
    className: "auditlog.TestBolt"
    parallelism: 1
    constructorArgs:
        #index
      - "auditlog"
        #type
      - "_doc"
        #hostname
      - localhost
        #port
      - 9200


streams:
  - name: "TestSpout --> TestBolt"
    from: "TesstSpout"
    to: "TestBolt"
    grouping:
      type: SHUFFLE


My changing the configuration by referring to the actual storm consumer 2.2.0 code I was able
to make this yaml flux file.


However I am unable to specify any properties from this yaml file for the consumer.

I tried giving "group.id" and it gives an errror:

Exception in thread "main" java.lang.NoSuchFieldException: groupId

at java.lang.Class.getField(Class.java:1703)
at org.apache.storm.flux.FluxBuilder.findPublicField(FluxBuilder.java:298)
at org.apache.storm.flux.FluxBuilder.applyProperties(FluxBuilder.java:288)
at org.apache.storm.flux.FluxBuilder.buildObject(FluxBuilder.java:390)
at org.apache.storm.flux.FluxBuilder.buildComponents(FluxBuilder.java:428)
at org.apache.storm.flux.FluxBuilder.buildTopology(FluxBuilder.java:102)
at org.apache.storm.flux.Flux.runCli(Flux.java:174)
at org.apache.storm.flux.Flux.main(Flux.java:119)

Without specifying properties the topology does get submitted but it throws an error in the
spout saying:


java.lang.RuntimeException: org.apache.kafka.common.config.ConfigException: Missing required
configuration "key.deserializer" which has no default value. at org.apache.storm.utils.Utils$1.run(Utils.java:409)
at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.kafka.common.config.ConfigException:
Missing required configuration "key.deserializer" which has no default value. at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:478)
at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:468) at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:108)
at org.apache.kafka.common.config.AbstractConfig.<init>(AbstractConfig.java:129) at
org.apache.kafka.clients.consumer.ConsumerConfig.<init>(ConsumerConfig.java:607) at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:632) at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:613)
at org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault.createConsumer(ConsumerFactoryDefault.java:26)
at org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault.createConsumer(ConsumerFactoryDefault.java:22)
at org.apache.storm.kafka.spout.KafkaSpout.open(KafkaSpout.java:147) at org.apache.storm.executor.spout.SpoutExecutor.init(SpoutExecutor.java:149)
at org.apache.storm.executor.spout.SpoutExecutor.call(SpoutExecutor.java:159) at org.apache.storm.executor.spout.SpoutExecutor.call(SpoutExecutor.java:56)
at org.apache.storm.utils.Utils$1.run(Utils.java:389) ... 1 more

I went through a bit of the source code and found this. The properties aren't there in the
"org.apache.storm.kafka.spout.KafkaSpoutConfig" file but in the "ConsumerConfig" file. So
when flux tries to get the field name given in the properties file it fails.

Previously in version 0.10.0 the configuration fields were available in the "storm.kafka.SpoutConfig"
file itself.


Can I please get some help on how to resolve this issue? The flux kafka spout example at "https://storm.apache.org/releases/2.2.0/flux.html"
is also wrong. I am not sure if it has been updated.


Best Regards,

Paul Jose
---------------------------------------------------------------------------------------Disclaimer----------------------------------------------------------------------------------------------


****Views and opinions expressed in this e-mail belong to  their author and do not necessarily
represent views and opinions  of Ugam. 
Our employees are obliged not to make any defamatory statement or infringe any legal right.

Therefore, Ugam does not accept any responsibility or liability for such statements. The content
of this email is confidential and intended for the recipient specified in message only. It
is strictly forbidden to share any part of this message with any third party, without a written
consent of the sender.
If you have received this message by mistake, please reply to this message and follow with
its deletion, so that we can ensure such a mistake does not occur in the future. 
Warning: Sufficient measures have been taken to scan any presence of viruses however the recipient
should check this email and any attachments for the presence of viruses as full security of
the email cannot be ensured despite our best efforts.
Therefore, Ugam accepts no liability for any damage inflicted by viewing the content of this
email.. ****

Please do not print this email unless it is necessary. Every unprinted email helps the environment.


Mime
View raw message