activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Co <...@exist.com>
Subject Re: Retroactive consumer...yes, no, maybe so?
Date Wed, 19 Mar 2008 02:24:00 GMT
As far as I know, retroactive consumers only applies to topics. For 
queues, you should get all messages whether the consumer is online or 
offline. The example in the documentation could be wrong.

Andrew M wrote:
> Aaron,
> My original producer and consumer code are at the bottom of the msg.  Note
> the subscribe method appends retroactive=true.  
> Thanks for any suggestions you may have..
> Andrew
>
>
> -----Original Message-----
> From: ammulder@gmail.com [mailto:ammulder@gmail.com] On Behalf Of Aaron
> Mulder
> Sent: Tuesday, March 18, 2008 10:07 AM
> To: users@activemq.apache.org
> Subject: Re: Retroactive consumer...yes, no, maybe so?
>
> Do you want to post your example that's *not* working?  I last used
> retroactive consumers probably 18 months ago, and they worked fine at
> that time.  I was doing a network of brokers with fail-over, and if I
> took one broker down and caused a consumer to fail over, it missed
> messages during the fail-over operation.  With retroactive consumer
> enabled, it didn't miss any messages (but got some duplicates) once it
> failed over.  I don't have that code/configuration at hand, though --
> just this from my notes:
>
> topic = new ActiveMQTopic("com.example.MyTopic?consumer.retroactive=true");
>
> And I used this to set the retroactive consumers to receive the last
> 30s worth of messages, instead of the default (which I think at the
> time was last 100):
>
> <broker>
>   <destinationPolicy>
>     <policyMap>
>       <defaultEntry>
>         <policyEntry topic="*">
>           <subscriptionRecoveryPolicy>
>             <timedSubscriptionRecoveryPolicy recoverDuration="30000" />
>           </subscriptionRecoveryPolicy>
>         </policyEntry>
>       </defaultEntry>
>     </policyMap>
>   </destinationPolicy>
> </broker>
>
> http://www.activemq.org/site/retroactive-consumer.html
> http://www.activemq.org/site/subscription-recovery-policy.html
>
> Thanks,
>        Aaron
>
> On Tue, Mar 18, 2008 at 10:20 AM, Andrew M <andrew@oc384.net> wrote:
>  
>   
>>  -----Original Message-----
>>  From: Andrew [mailto:andrew@oc384.net]
>>  Sent: Wednesday, March 05, 2008 2:40 PM
>>  To: users@activemq.apache.org
>>  Subject: Retroactive consumer not working...
>>
>>  My broker is not feeding my consumer the messages from the retroactive
>>     
> queue
>   
>>  when the consumer connects.  The producer puts in a 10 min (600000ms) TTL
>>     
> so
>   
>>  I would think when my consumer reconnects it should receive the last 10
>>     
> mins
>   
>>  of msgs.  Otherwise things appear fine, new msgs are received, etc... any
>>  ideas?
>>
>>
>>  On the producer.......
>>
>>     private Session getActiveMqSession() throws JMSException {
>>         String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" +
>>  ACTIVE_MQ_PORT +
>>  "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>>         ActiveMQConnectionFactory connectionFactory = new
>>  ActiveMQConnectionFactory(url);
>>         connection = connectionFactory.createConnection();
>>         ((ActiveMQConnection)connection).setUseAsyncSend(false);
>>         connection.start();
>>         return connection.createSession(false,
>>     
> Session.SESSION_TRANSACTED);
>   
>>     }
>>
>>     Session session = getActiveMqSession();
>>
>>     void send(Object a) throws blah blah blah {
>>         Destination destination = session.createQueue(consumerName);
>>         producer = session.createProducer(destination);
>>         ObjectMessage m = session.createObjectMessage();
>>         m.setObject(a);
>>         //10 min TTL
>>         ((ActiveMQMessageProducer)producer).send(m,
>>     
> DeliveryMode.PERSISTENT,
>   
>>  Message.DEFAULT_PRIORITY, 600000L);
>>     }
>>
>>
>>  ...and on the Consumer...
>>
>>     Session session;
>>
>>     public void run() {
>>         String url =
>>
>>     
> "failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconn
>   
>>  ectAttempts=0";
>>         ActiveMQConnectionFactory connectionFactory = new
>>  ActiveMQConnectionFactory(url);
>>       Connection connection = connectionFactory.createConnection();
>>       connection.start();
>>       connection.setExceptionListener(this);
>>       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>>    }
>>
>>    public void subscribe(String destName, MessageListener l) throws
>>  JMSException {
>>       destName = destName + "?consumer.retroactive=true";
>>       MessageConsumer mc =
>>  session.createConsumer(session.createQueue(destName));
>>       mc.setMessageListener(l);
>>     }
>>
>>
>>     
>  
>
>
>   


Mime
View raw message