activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Andrew M" <and...@oc384.net>
Subject RE: Retroactive consumer...yes, no, maybe so?
Date Tue, 18 Mar 2008 17:19:49 GMT
Aaron,
My original producer and consumer code are at the bottom of the msg.  Note
the subscribe method appends retroactive=true.  
Thanks for any suggestions you may have..
Andrew


-----Original Message-----
From: ammulder@gmail.com [mailto:ammulder@gmail.com] On Behalf Of Aaron
Mulder
Sent: Tuesday, March 18, 2008 10:07 AM
To: users@activemq.apache.org
Subject: Re: Retroactive consumer...yes, no, maybe so?

Do you want to post your example that's *not* working?  I last used
retroactive consumers probably 18 months ago, and they worked fine at
that time.  I was doing a network of brokers with fail-over, and if I
took one broker down and caused a consumer to fail over, it missed
messages during the fail-over operation.  With retroactive consumer
enabled, it didn't miss any messages (but got some duplicates) once it
failed over.  I don't have that code/configuration at hand, though --
just this from my notes:

topic = new ActiveMQTopic("com.example.MyTopic?consumer.retroactive=true");

And I used this to set the retroactive consumers to receive the last
30s worth of messages, instead of the default (which I think at the
time was last 100):

<broker>
  <destinationPolicy>
    <policyMap>
      <defaultEntry>
        <policyEntry topic="*">
          <subscriptionRecoveryPolicy>
            <timedSubscriptionRecoveryPolicy recoverDuration="30000" />
          </subscriptionRecoveryPolicy>
        </policyEntry>
      </defaultEntry>
    </policyMap>
  </destinationPolicy>
</broker>

http://www.activemq.org/site/retroactive-consumer.html
http://www.activemq.org/site/subscription-recovery-policy.html

Thanks,
       Aaron

On Tue, Mar 18, 2008 at 10:20 AM, Andrew M <andrew@oc384.net> wrote:
 
>
>  -----Original Message-----
>  From: Andrew [mailto:andrew@oc384.net]
>  Sent: Wednesday, March 05, 2008 2:40 PM
>  To: users@activemq.apache.org
>  Subject: Retroactive consumer not working...
>
>  My broker is not feeding my consumer the messages from the retroactive
queue
>  when the consumer connects.  The producer puts in a 10 min (600000ms) TTL
so
>  I would think when my consumer reconnects it should receive the last 10
mins
>  of msgs.  Otherwise things appear fine, new msgs are received, etc... any
>  ideas?
>
>
>  On the producer.......
>
>     private Session getActiveMqSession() throws JMSException {
>         String url = "failover:(tcp://" + ACTIVE_MQ_SERVER + ":" +
>  ACTIVE_MQ_PORT +
>  "?wireFormat.maxInactivityDuration=0)?maxReconnectAttempts=0";
>         ActiveMQConnectionFactory connectionFactory = new
>  ActiveMQConnectionFactory(url);
>         connection = connectionFactory.createConnection();
>         ((ActiveMQConnection)connection).setUseAsyncSend(false);
>         connection.start();
>         return connection.createSession(false,
Session.SESSION_TRANSACTED);
>     }
>
>     Session session = getActiveMqSession();
>
>     void send(Object a) throws blah blah blah {
>         Destination destination = session.createQueue(consumerName);
>         producer = session.createProducer(destination);
>         ObjectMessage m = session.createObjectMessage();
>         m.setObject(a);
>         //10 min TTL
>         ((ActiveMQMessageProducer)producer).send(m,
DeliveryMode.PERSISTENT,
>  Message.DEFAULT_PRIORITY, 600000L);
>     }
>
>
>  ...and on the Consumer...
>
>     Session session;
>
>     public void run() {
>         String url =
>
"failover:(tcp://tupolev:61616?wireFormat.maxInactivityDuration=0)?maxReconn
>  ectAttempts=0";
>         ActiveMQConnectionFactory connectionFactory = new
>  ActiveMQConnectionFactory(url);
>       Connection connection = connectionFactory.createConnection();
>       connection.start();
>       connection.setExceptionListener(this);
>       session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
>    }
>
>    public void subscribe(String destName, MessageListener l) throws
>  JMSException {
>       destName = destName + "?consumer.retroactive=true";
>       MessageConsumer mc =
>  session.createConsumer(session.createQueue(destName));
>       mc.setMessageListener(l);
>     }
>
>
 


Mime
View raw message