activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aaron Mulder" <ammul...@alumni.princeton.edu>
Subject Re: Retroactive consumer...yes, no, maybe so?
Date Tue, 18 Mar 2008 17:53:34 GMT
Are you sure destName doesn't already have some "?options" in it?  Are
you sure no other consumer gets the messages off the queue before your
retroactive consumer?  Are you sure you shouldn't set the message
listener on the message consumer before registering it as a consumer?

Thanks,
       Aaron

On Tue, Mar 18, 2008 at 1:19 PM, Andrew M <andrew@oc384.net> wrote:
> Aaron,
>  My original producer and consumer code are at the bottom of the msg.  Note
>  the subscribe method appends retroactive=true.
>  Thanks for any suggestions you may have..
>  Andrew
>
>
>
>  -----Original Message-----
>  From: ammulder@gmail.com [mailto:ammulder@gmail.com] On Behalf Of Aaron
>  Mulder
>  Sent: Tuesday, March 18, 2008 10:07 AM
>  To: users@activemq.apache.org
>
>
> Subject: Re: Retroactive consumer...yes, no, maybe so?
>
>  Do you want to post your example that's *not* working?  I last used
>  retroactive consumers probably 18 months ago, and they worked fine at
>  that time.  I was doing a network of brokers with fail-over, and if I
>  took one broker down and caused a consumer to fail over, it missed
>  messages during the fail-over operation.  With retroactive consumer
>  enabled, it didn't miss any messages (but got some duplicates) once it
>  failed over.  I don't have that code/configuration at hand, though --
>  just this from my notes:
>
>  topic = new ActiveMQTopic("com.example.MyTopic?consumer.retroactive=true");
>
>  And I used this to set the retroactive consumers to receive the last
>  30s worth of messages, instead of the default (which I think at the
>  time was last 100):
>
>  <broker>
>   <destinationPolicy>
>     <policyMap>
>       <defaultEntry>
>         <policyEntry topic="*">
>           <subscriptionRecoveryPolicy>
>             <timedSubscriptionRecoveryPolicy recoverDuration="30000" />
>           </subscriptionRecoveryPolicy>
>         </policyEntry>
>       </defaultEntry>
>     </policyMap>
>   </destinationPolicy>
>  </broker>
>
>  http://www.activemq.org/site/retroactive-consumer.html
>  http://www.activemq.org/site/subscription-recovery-policy.html
>
>  Thanks,
>        Aaron
>
>  On Tue, Mar 18, 2008 at 10:20 AM, Andrew M <andrew@oc384.net> wrote:
>
>  >
>
>
> >  -----Original Message-----
>  >  From: Andrew [mailto:andrew@oc384.net]
>  >  Sent: Wednesday, March 05, 2008 2:40 PM
>  >  To: users@activemq.apache.org
>  >  Subject: Retroactive consumer not working...
>  >
>  >  My broker is not feeding my consumer the messages from the retroactive
>  queue
>  >  when the consumer connects.  The producer puts in a 10 min (600000ms) TTL
>  so
>  >  I would think when my consumer reconnects it should receive the last 10
>  mins
>  >  of msgs.  Otherwise things appear fine, new msgs are received, etc... any
>  >  ideas?
>  >
>  >
>  >  On the producer.......
>  >
>  >     private Session getActiveMqSession() throws JMSException {
>  >         String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" +
>  >  ACTIVE_MQ_PORT +
>  >  "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>  >         ActiveMQConnectionFactory connectionFactory = new
>  >  ActiveMQConnectionFactory(url);
>  >         connection = connectionFactory.createConnection();
>  >         ((ActiveMQConnection)connection).setUseAsyncSend(false);
>  >         connection.start();
>  >         return connection.createSession(false,
>  Session.SESSION_TRANSACTED);
>  >     }
>  >
>  >     Session session = getActiveMqSession();
>  >
>  >     void send(Object a) throws blah blah blah {
>  >         Destination destination = session.createQueue(consumerName);
>  >         producer = session.createProducer(destination);
>  >         ObjectMessage m = session.createObjectMessage();
>  >         m.setObject(a);
>  >         //10 min TTL
>  >         ((ActiveMQMessageProducer)producer).send(m,
>  DeliveryMode.PERSISTENT,
>  >  Message.DEFAULT_PRIORITY, 600000L);
>  >     }
>  >
>  >
>  >  ...and on the Consumer...
>  >
>  >     Session session;
>  >
>  >     public void run() {
>  >         String url =
>  >
>  "failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconn
>  >  ectAttempts=0";
>  >         ActiveMQConnectionFactory connectionFactory = new
>  >  ActiveMQConnectionFactory(url);
>  >       Connection connection = connectionFactory.createConnection();
>  >       connection.start();
>  >       connection.setExceptionListener(this);
>  >       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>  >    }
>  >
>  >    public void subscribe(String destName, MessageListener l) throws
>  >  JMSException {
>  >       destName = destName + "?consumer.retroactive=true";
>  >       MessageConsumer mc =
>  >  session.createConsumer(session.createQueue(destName));
>  >       mc.setMessageListener(l);
>  >     }
>  >
>  >
>
>
>
>

Mime
View raw message