storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kazansky, Michael" <michael.kazan...@jpmchase.com>
Subject RE: submit topology using NimbusClient
Date Fri, 19 Feb 2016 16:53:57 GMT
Thanks Matthias but it did not help

Here is the code

Boolean success = false;
        try {           
            //update storm properties to have dynamically changed number of spouts
            Config conf = new Config();
            TopologyBuilder topoBuilder = new TopologyBuilder();
            conf.put(Config.NIMBUS_HOST, topologyModel.getHost());
            conf.setDebug(true);
            Map stormConf = Utils.readStormConfig();
            stormConf.put(Config.NIMBUS_HOST, topologyModel.getHost());
            
            // topology submit
            //NimbusClient nimbus = new NimbusClient(stormConf, topologyModel.getHost(), null
!= topologyModel.getNimbusPort() ? topologyModel.getNimbusPort() : 6627);
            //String uploadedJarLocation = StormSubmitter.submitJar(stormConf, topologyModel.getArtifactUrl());
            //localjar
            System.setProperty("storm.jar", topologyModel.getArtifactUrl());
            //String jsonConf = JSONValue.toJSONString(stormConf);
            //log.info("REmote jar " + topologyModel.getArtifactUrl());
            //StormTopology topology = buildToplogy(topologyModel);
            //SubmitOptions options = new SubmitOptions();
            //nimbus.getClient().submitTopology(topologyModel.getTopologyName(), uploadedJarLocation,
jsonConf, topoBuilder.createTopology());
            StormSubmitter.submitTopology(topologyModel.getTopologyName(), stormConf, topoBuilder.createTopology());
            success = true;


  Log from JUnitTest

  [2016-02-19 11:22:34,059] [main] Jar not uploaded to master yet. Submitting jar...
[2016-02-19 11:22:34,143] [main] Uploading topology jar <path_to_jar>/target/test-classes/sin-example.jar
to assigned location: <remote_nimbus_path>/nimbus/inbox/stormjar-f0b699c6-88f7-42ca-a0a9-95d9cad61c18.jar
[2016-02-19 11:22:43,168] [main] Successfully uploaded topology jar to assigned location:
<remote_nimbus_path>/nimbus/inbox/stormjar-f0b699c6-88f7-42ca-a0a9-95d9cad61c18.jar
[2016-02-19 11:22:43,169] [main] Submitting topology example-topology-1 in distributed mode
with conf {"storm.messaging.netty.min_wait_ms":100,"topology.enable.message.timeouts":true,"storm.messaging.netty.buffer_size":5242880,"storm.messaging.netty.transfer.batch.size":262144,"storm.messaging.netty.client_worker_threads":1,"topology.tasks":null,"transactional.zookeeper.root":"\/transactional","topology.tuple.serializer":"backtype.storm.serialization.types.ListDelegateSerializer","topology.spout.wait.strategy":"backtype.storm.spout.SleepSpoutWaitStrategy","drpc.port":3772,"topology.max.spout.pending":null,"topology.transfer.buffer.size":1024,"supervisor.worker.timeout.secs":30,"topology.worker.receiver.thread.count":1,"logviewer.port":8000,"worker.childopts":"-Xmx768m","nimbus.file.copy.expiration.secs":600,"drpc.childopts":"-Xmx768m","nimbus.task.launch.secs":120,"logviewer.childopts":"-Xmx128m","storm.zookeeper.servers":["localhost"],"storm.messaging.transport":"backtype.storm.messaging.netty.Context","topology.workers":1,"storm.messaging.netty.flush.check.interval.ms":10,"topology.environment":null,"drpc.invocations.port":3773,"topology.kryo.factory":"backtype.storm.serialization.DefaultKryoFactory","nimbus.cleanup.inbox.freq.secs":600,"topology.fall.back.on.java.serialization":true,"logviewer.appender.name":"A1","storm.messaging.netty.server_worker_threads":1,"supervisor.slots.ports":[6700,6701,6702,6703],"storm.zookeeper.connection.timeout":15000,"topology.tick.tuple.freq.secs":null,"topology.stats.sample.rate":0.05,"storm.local.dir":"storm-local","nimbus.inbox.jar.expiration.secs":3600,"storm.messaging.netty.max_retries":300,"topology.debug":false,"storm.zookeeper.retry.interval":1000,"topology.receiver.buffer.size":8,"storm.zookeeper.retry.times":5,"java.library.path":"\/usr\/local\/lib:\/opt\/local\/lib:\/usr\/lib","zmq.linger.millis":5000,"topology.multilang.serializer":"backtype.storm.multilang.JsonSerializer","drpc.request.timeout.secs":600,"zmq.threads":1,"topology.state.synchronization.timeout.secs":60,"topology.worker.shared.thread.pool.size":4,"topology.executor.receive.buffer.size":1024,"supervisor.monitor.frequency.secs":3,"nimbus.host":"cib2l4280.svr.us.jpmchase.net","transactional.zookeeper.port":null,"storm.zookeeper.port":2181,"storm.zookeeper.retry.intervalceiling.millis":30000,"nimbus.thrift.port":6627,"topology.classpath":null,"topology.worker.childopts":null,"topology.max.task.parallelism":null,"topology.acker.executors":null,"supervisor.worker.start.timeout.secs":120,"nimbus.reassign":true,"supervisor.heartbeat.frequency.secs":5,"topology.trident.batch.emit.interval.millis":500,"task.heartbeat.frequency.secs":3,"supervisor.enable":true,"storm.thrift.transport":"backtype.storm.security.auth.SimpleTransportPlugin","drpc.worker.threads":64,"ui.port":8080,"drpc.queue.size":128,"topology.message.timeout.secs":30,"topology.error.throttle.interval.secs":10,"nimbus.childopts":"-Xmx1024m","topology.builtin.metrics.bucket.size.secs":60,"nimbus.monitor.freq.secs":10,"topology.executor.send.buffer.size":1024,"storm.local.mode.zmq":false,"transactional.zookeeper.servers":null,"nimbus.task.timeout.secs":30,"storm.meta.serialization.delegate":"backtype.storm.serialization.DefaultSerializationDelegate","dev.zookeeper.path":"\/tmp\/dev-storm-zookeeper","nimbus.supervisor.timeout.secs":60,"topology.skip.missing.kryo.registrations":false,"storm.zookeeper.root":"\/storm","storm.zookeeper.session.timeout":20000,"zmq.hwm":0,"topology.disruptor.wait.strategy":"com.lmax.disruptor.BlockingWaitStrategy","topology.sleep.spout.wait.strategy.time.ms":1,"nimbus.topology.validator":"backtype.storm.nimbus.DefaultTopologyValidator","storm.cluster.mode":"distributed","ui.childopts":"-Xmx768m","task.refresh.poll.secs":10,"supervisor.childopts":"-Xmx256m","worker.heartbeat.frequency.secs":1,"storm.messaging.netty.max_wait_ms":1000,"topology.max.error.report.per.interval":5,"nimbus.thrift.max_buffer_size":1048576}
[2016-02-19 11:22:46,127] [main] Finished submitting topology: example-topology-1
Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 19.605 sec


I see that jar on remote machine has correct size. But in Storm UI I still do  not see any
Bolts and Spouts.

I wish Storm had easy way to submit topologies from, which are inside the jar. The same way
like command line do. I also added Main-Class to MANIFEST.MF hoping it would help but it did
not.

Thanks,
Michael Kazansky

-----Original Message-----
From: Matthias J. Sax [mailto:mjsax@apache.org] 
Sent: Friday, February 19, 2016 5:02 AM
To: user@storm.apache.org
Subject: Re: submit topology using NimbusClient

Why do you not submit via StormSubmitter instead of NimbusClient??

> StormSubmitter.submitTopology("topologyName", conf, 
> builder.createTopology());

For this, you also need to set system property "storm.jar" which must point to the *client
local* path to the topology jar file.

-Matthias

On 02/18/2016 10:22 PM, Kazansky, Michael wrote:
> Yeah but it is not going to work for me. We already have direct 
> topology submission by calling python script from Java app.
> 
> This requires to know location of the script on the nimbus machine. We 
> are trying to avoid that.
> 
>  
> 
> Thanks,
> 
> Michael Kazansky
> 
>  
> 
> *From:*Rahul R [mailto:rahul8590@gmail.com]
> *Sent:* Thursday, February 18, 2016 4:17 PM
> *To:* user@storm.apache.org
> *Subject:* Re: submit topology using NimbusClient
> 
>  
> 
> I had a similar use case.  I just created a form to upload the jar as 
> a file upload and execute it on nimbus hosts.
> 
> ./R
> 
>  
> 
> On Thu, Feb 18, 2016 at 11:16 AM, Kazansky, Michael 
> <michael.kazansky@jpmchase.com <mailto:michael.kazansky@jpmchase.com>>
> wrote:
> 
> Hi Storm gurus,
> 
> I am working on the project which should allow users to submit their 
> topology jars to Storm. We successfully using command line but now I 
> need to write application which allows user to submit jar through let 
> say browser.
> 
> Here is what I am trying to do
> 
>  
> 
>            Config conf = new Config();
> 
>             TopologyBuilder topoBuilder = new TopologyBuilder();
> 
>             conf.put(Config.NIMBUS_HOST, topologyModel.getHost());
> 
>             conf.setDebug(true);
> 
>             Map stormConf = Utils.readStormConfig();
> 
>             stormConf.put(Config.NIMBUS_HOST, 
> topologyModel.getHost());
> 
>            
> 
>             // topology submit
> 
>             NimbusClient nimbus = new NimbusClient(stormConf, 
> topologyModel.getHost(), null != topologyModel.getNimbusPort() ?
> topologyModel.getNimbusPort() : 6627);
> 
>             String uploadedJarLocation = 
> StormSubmitter.submitJar(stormConf, topologyModel.getArtifactUrl());
> 
>             System.setProperty("storm.jar", uploadedJarLocation.concat("
> ").concat(topologyModel.getMainClass()));
> 
>             String jsonConf = JSONValue.toJSONString(stormConf);
> 
>             //StormTopology topology = buildToplogy(topologyModel);
> 
>             //SubmitOptions options = new SubmitOptions();
> 
>             //client.jartransformer.class
> 
>            
> nimbus.getClient().submitTopology(topologyModel.getTopologyName(),
> uploadedJarLocation, jsonConf, topoBuilder.createTopology());
> 
>             success = true;
> 
>  
> 
>  
> 
> I looked at the storm python script and saw that main class name is 
> concatenated to JVM_OPTS
> 
> That’s why I added this line
> 
>  
> 
> System.setProperty("storm.jar", uploadedJarLocation.concat("
> ").concat(topologyModel.getMainClass()));
> 
>  
> 
> But when I execute the code in test I only see empty topology without 
> spouts and bolts in Storm UI with only one worker and one executor.
> Main class in the jar has the code to submit spouts and bolts with 
> parallelism 3 so most definitely main class wasn’t picked up.
> 
> Is there any way to implement this command python 
> ${storm_home}/bin/storm jar ${dsl_apps_dir}/dist/${jarname} $mainclass 
> in Java besides calling it (which is already done) directly?
> 
>  
> 
>  
> 
> Thanks,
> 
> Michael Kazansky
> 
>  
> 
> This communication is for informational purposes only. It is not 
> intended as an offer or solicitation for the purchase or sale of any 
> financial instrument or as an official confirmation of any transaction.
> All market prices, data and other information are not warranted as to 
> completeness or accuracy and are subject to change without notice. Any 
> comments or statements made herein do not necessarily reflect those of 
> JPMorgan Chase & Co., its subsidiaries and affiliates (collectively, 
> "JPMC"). This transmission may contain information that is 
> proprietary, privileged, confidential and/or exempt from disclosure 
> under applicable law. If you are not the intended recipient, you are 
> hereby notified that any disclosure, copying, distribution, or use of 
> the information contained herein (including any reliance thereon) is 
> STRICTLY PROHIBITED. If you received this transmission in error, 
> please immediately contact the sender and destroy the material in its 
> entirety, whether in electronic or hard copy format. Although this 
> transmission and any attachments are believed to be free of any virus 
> or other defect that might affect any computer system into which it is 
> received and opened, it is the responsibility of the recipient to 
> ensure that it is virus free and no responsibility is accepted by JPMC 
> for any loss or damage arising in any way from its use. Please note 
> that any electronic communication that is conducted within or through 
> JPMC's systems is subject to interception, monitoring, review, 
> retention and external production in accordance with JPMC's policy and 
> local laws, rules and regulations; may be stored or otherwise 
> processed in countries other than the country in which you are 
> located; and will be treated in accordance with JPMC policies and applicable laws and
regulations.
> Please refer to http://www.jpmorgan.com/pages/disclosures for 
> disclosures relating to European legal entities.
> 
>  
> 
> This communication is for informational purposes only. It is not 
> intended as an offer or solicitation for the purchase or sale of any 
> financial instrument or as an official confirmation of any transaction.
> All market prices, data and other information are not warranted as to 
> completeness or accuracy and are subject to change without notice. Any 
> comments or statements made herein do not necessarily reflect those of 
> JPMorgan Chase & Co., its subsidiaries and affiliates (collectively, 
> "JPMC"). This transmission may contain information that is 
> proprietary, privileged, confidential and/or exempt from disclosure 
> under applicable law. If you are not the intended recipient, you are 
> hereby notified that any disclosure, copying, distribution, or use of 
> the information contained herein (including any reliance thereon) is 
> STRICTLY PROHIBITED. If you received this transmission in error, 
> please immediately contact the sender and destroy the material in its 
> entirety, whether in electronic or hard copy format. Although this 
> transmission and any attachments are believed to be free of any virus 
> or other defect that might affect any computer system into which it is 
> received and opened, it is the responsibility of the recipient to 
> ensure that it is virus free and no responsibility is accepted by JPMC 
> for any loss or damage arising in any way from its use. Please note 
> that any electronic communication that is conducted within or through 
> JPMC's systems is subject to interception, monitoring, review, 
> retention and external production in accordance with JPMC's policy and 
> local laws, rules and regulations; may be stored or otherwise 
> processed in countries other than the country in which you are 
> located; and will be treated in accordance with JPMC policies and applicable laws and
regulations.
> Please refer to http://www.jpmorgan.com/pages/disclosures for 
> disclosures relating to European legal entities.
> 


This communication is for informational purposes only.  It is not intended as an offer or
solicitation for the purchase or sale of any financial instrument or as an official confirmation
of any transaction.  All market prices, data and other information are not warranted as to
completeness or accuracy and are subject to change without notice.  Any comments or statements
made herein do not necessarily reflect those of JPMorgan Chase & Co., its subsidiaries
and affiliates (collectively, "JPMC").

This transmission may contain information that is proprietary, privileged, confidential and/or
exempt from disclosure under applicable law.  If you are not the intended recipient, you are
hereby notified that any disclosure, copying, distribution, or use of the information contained
herein (including any reliance thereon) is STRICTLY PROHIBITED.  If you received this transmission
in error, please immediately contact the sender and destroy the material in its entirety,
whether in electronic or hard copy format.  Although this transmission and any attachments
are believed to be free of any virus or other defect that might affect any computer system
into which it is received and opened, it is the responsibility of the recipient to ensure
that it is virus free and no responsibility is accepted by JPMC for any loss or damage arising
in any way from its use.  Please note that any electronic communication that is conducted
within or through JPMC's systems is subject to interception, monitoring, review, retention
and external production in accordance with JPMC's policy and local laws, rules and regulations;
may be stored or otherwise processed in countries other than the country in which you are
located; and will be treated in accordance with JPMC policies and applicable laws and regulations.

Please refer to http://www.jpmorgan.com/pages/disclosures for disclosures relating to European
legal entities.
Mime
View raw message