Hi, I am trying the spring storm project and not the breeze project.

Hi. i tried to do the following test:
I added a spring configuration for the topology with setting the spout as a jms listener.

did more similar to the definition of https://github.com/granthenke/storm-spring/blob/master/README.md
The result was that the a class "TopologySubmitter" became my jms listener instead of the spout.
I am adding the configuration i used to this post. please assist me if you can in understanding what my problem is.

Thanks, Michal



<import resource="storm.jms.msg.amq.xml"/>


<!-- Topology Submission -->

<bean id="exampleJmsTopologySubmission" class="storm.contrib.spring.topology.SingleTopologySubmission" c:topologyId="jmsTopology" c:stormTopology-ref="jmsTopology" p:config-ref="config"/>


<!-- Topology Config -->

<bean id="config" class="backtype.storm.Config" p:debug="false" p:numWorkers="3"/>


<!-- Assembled Topologies -->

<bean id="jmsTopology" class="storm.contrib.spring.topology.TopologyFactory" c:spout-ref="topologySpouts" c:bolt-ref="topologyBolts"/>

<util:list id="topologySpouts">

    <bean id="jmsSpout" class="storm.contrib.spring.topology.component.spout.RichSpout" c:componentId="jmsQueueSpout" c:stormSpout-ref="messageListener" p:parallelismHint="5"/>


<util:list id="topologyBolts">

    <bean id="intermediateBolt" class="storm.contrib.spring.topology.component.bolt.RichBolt" c:componentId="intermediateBolt"  c:stormBolt-ref="intermediateBoltImpl" p:boltGroupings-ref="jmsGrouping1" p:parallelismHint="3"/>

    <bean id="finalBolt" class="storm.contrib.spring.topology.component.bolt.RichBolt" c:componentId="finalBolt" c:stormBolt-ref="finalBoltImpl" p:boltGroupings-ref="jmsGrouping2" p:parallelismHint="3"/>



<!-- Grouping For Each Bolt -->

<bean id="jmsGrouping1" class="storm.contrib.spring.topology.component.grouping.ShuffleBoltGrouping" c:componentId="jmsQueueSpout"/>

<bean id="jmsGrouping2" class="storm.contrib.spring.topology.component.grouping.ShuffleBoltGrouping" c:componentId="intermediateBolt"/>


<!-- Bolt And RichSpout Implementations -->

<!-- <bean id="jmsSpout" class="backtype.storm.contrib.jms.spout.JmsSpout" /> -->

<bean id="intermediateBoltImpl" class="backtype.storm.contrib.jms.example.GenericBolt" c:name="INTERMEDIATE_BOLT" c:autoAck="true" c:autoAnchor="true" c:declaredFields-ref="intermediateBoltFields" /> 


<bean id="finalBoltImpl" class="backtype.storm.contrib.jms.example.GenericBolt" c:name="FINAL_BOLT" c:autoAck="true" c:autoAnchor="true"/>


<bean id="intermediateBoltFields" class="backtype.storm.tuple.Fields" c:fields="json" />



  <!-- ********************** Consumer ********************************* --> 
<bean id="messageListener" class="backtype.storm.contrib.jms.spout.JmsSpout">
    <property name="jmsAcknowledgeMode" value="2"/>
    <property name="distributed" value="true"/>
    <property name="recoveryPeriod" value="30000"/>
    <property name="tupleProducer" ref="jsonTupleProducer"/>
<bean id="jsonTupleProducer" class="backtype.storm.contrib.jms.example.JsonTupleProducer"></bean>
 <!-- ************* statistics consumer container ************** -->
   <jms:listener-container concurrency="5"  
                         ><!-- prefetch="0" --> 
<jms:listener destination="backtype.storm.contrib.example.queue" ref="messageListener" method="onMessage"/>
<bean id="connectionFactoryConsumer" 
    <property name="brokerURL" value="failover:(tcp://dev16:61616)?randomize=false&amp;jms.prefetchPolicy.queuePrefetch=0&amp;timeout=3000" />    <!-- Single broker-->
    <property name="messagePrioritySupported" value="true" /> <!-- true is the default -->
    <property name="useAsyncSend" value="true"/><!-- this improves performance, check if this is good for since can result in message loss-->
    <property name="useCompression" value="true"/>
 <bean id="cachingConnectionFactoryConsumer"  
    <constructor-arg ref="connectionFactoryConsumer"/>
    <property name="sessionCacheSize" value="100" /> 
<!-- ********************** Queues ********************************* -->
 <bean id="notificationQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="backtype.storm.contrib.example.queue"/>







From: Michal Singer [mailto:michal@leadspace.com]
Sent: Tuesday, January 07, 2014 7:10 AM
To: 'user@storm.incubator.apache.org'
Subject: RE: Spring bolts


Now I did. Thanks.


From: Bakker, Jethro [mailto:j.bakker@flusso.nl]
Sent: Monday, January 06, 2014 3:25 PM
To: user@storm.incubator.apache.org
Subject: Re: Spring bolts


Did you raise an issue in https://github.com/internet-research-network/breeze/ ?


2014/1/6 Michal Singer <michal@leadspace.com>

Ok. I opened an issue.



From: Bakker, Jethro [mailto:j.bakker@flusso.nl]
Sent: Monday, January 06, 2014 2:23 PM

To: user@storm.incubator.apache.org
Subject: Re: Spring bolts


Hi, can you explain why it won't work? 

We use Breeze to read messages from a Redis queue and have completely defined our beans in Spring. 

Please create an issue in the github repo so we can help you further.


2014/1/5 Michal Singer <michal@leadspace.com>

Hi, I checked this out. this project does give a way to run spring beans inside a bolt or a spout.

But if I want for example to define a spout which is a JMS listener of activemq and is configured in spring it won’t work.

I will need to define a regular spout which reads from a spring bean which is a jms listener.

This will be problematic as to a point of failure since I will not read directly from jms queue (Activemq) but instead from an internal queue which will be somhow loaded from the AMQ.



From: Eugene [mailto:edvorkin@gmail.com]
Sent: Wednesday, December 25, 2013 10:11 PM

To: user@storm.incubator.apache.org
Subject: Re: Spring bolts


Did you look into this project:

I did not try it yet, but it may give you what are you looking for.

Merry Christmas!


On Wed, Dec 25, 2013 at 10:54 AM, Michal Singer <michal@leadspace.com> wrote:

Thanks, Merry Christmas to you!


From: Michael Rose [mailto:michael@fullcontact.com]
Sent: Wednesday, December 25, 2013 5:48 PM
To: user@storm.incubator.apache.org
Subject: RE: Spring bolts


Yes, you'll need a Spring context in prepare. Given you have multiple bolts per JVM, its worth ensuring only one creates it in prepare then shares that context.

We do this with Guice injectors and double checked locks.

Each bolt uses the singleton injector to inject its members. I imagine Spring has a similar concept once you have a context.

Life cycle of bolts is quite strange in Storm given they're made before deployment and serialized. There's quite a few gotchas. Bolt constructors can't be trusted, thus prepare.

There may be a spring storm example out there somewhere.

Merry Christmas!

On Dec 25, 2013 8:17 AM, "Michal Singer" <michal@leadspace.com> wrote:

I am not sure I understand.

Spring beans are defined in the spring configuration files. How can I inject them in the members.

What I thought to do is that the bolts will not be spring beans and in the prepare method I will initialize the spring context.

This way, the bolts will call other spring beans which are not bolts and initialized in spring. But of course this is a very limited solution.



From: Michael Rose [mailto:michael@fullcontact.com]
Sent: Wednesday, December 25, 2013 5:06 PM
To: user@storm.incubator.apache.org
Subject: Re: Spring bolts


Make a base spring bolt, in your prepare method inject the members. That's the best I've come up with, as prepare happens server side whereas topology config and static initializers happen at deploy time client side.

On Dec 25, 2013 7:51 AM, "Michal Singer" <michal@leadspace.com> wrote:

Hi, I am trying to understand how to use beans in spring as bolts/spouts.

If I have the definition in spring which is initialized once the bolt or spout is initialized.

But when creating a topology I need to do: new Bolt()….

And cannot get it from spring.

So what is the right way to do this?


Thanks, Michal


Eugene Dvorkin

Software Engineer


phone: 646-250-9649

Connect with me on: