activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r563982 [17/32] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/ main/java/org/apache/activemq/blob/ main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/jm...
Date Wed, 08 Aug 2007 18:58:13 GMT
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTopicMessageStore.java Wed Aug  8 11:56:59 2007
@@ -18,6 +18,7 @@
 import java.io.InterruptedIOException;
 import java.util.HashMap;
 import java.util.Iterator;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.JournalTopicAck;
@@ -38,83 +39,82 @@
  * 
  * @version $Revision: 1.13 $
  */
-public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore{
+public class AMQTopicMessageStore extends AMQMessageStore implements TopicMessageStore {
 
-    private static final Log log=LogFactory.getLog(AMQTopicMessageStore.class);
+    private static final Log log = LogFactory.getLog(AMQTopicMessageStore.class);
     private TopicReferenceStore topicReferenceStore;
-    private HashMap<SubscriptionKey,MessageId> ackedLastAckLocations=new HashMap<SubscriptionKey,MessageId>();
+    private HashMap<SubscriptionKey, MessageId> ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
 
-    public AMQTopicMessageStore(AMQPersistenceAdapter adapter,TopicReferenceStore topicReferenceStore,
-            ActiveMQTopic destinationName){
-        super(adapter,topicReferenceStore,destinationName);
-        this.topicReferenceStore=topicReferenceStore;
+    public AMQTopicMessageStore(AMQPersistenceAdapter adapter, TopicReferenceStore topicReferenceStore, ActiveMQTopic destinationName) {
+        super(adapter, topicReferenceStore, destinationName);
+        this.topicReferenceStore = topicReferenceStore;
     }
 
-    public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
-            throws Exception{
+    public void recoverSubscription(String clientId, String subscriptionName, MessageRecoveryListener listener) throws Exception {
         flush();
-        topicReferenceStore.recoverSubscription(clientId,subscriptionName,new RecoveryListenerAdapter(this,listener));
+        topicReferenceStore.recoverSubscription(clientId, subscriptionName, new RecoveryListenerAdapter(this, listener));
     }
 
-    public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
-            final MessageRecoveryListener listener) throws Exception{
-        RecoveryListenerAdapter recoveryListener=new RecoveryListenerAdapter(this,listener);
-        topicReferenceStore.recoverNextMessages(clientId,subscriptionName,maxReturned,recoveryListener);
-        if(recoveryListener.size()==0){
+    public void recoverNextMessages(String clientId, String subscriptionName, int maxReturned, final MessageRecoveryListener listener) throws Exception {
+        RecoveryListenerAdapter recoveryListener = new RecoveryListenerAdapter(this, listener);
+        topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, recoveryListener);
+        if (recoveryListener.size() == 0) {
             flush();
-            topicReferenceStore.recoverNextMessages(clientId,subscriptionName,maxReturned,recoveryListener);
+            topicReferenceStore.recoverNextMessages(clientId, subscriptionName, maxReturned, recoveryListener);
         }
     }
 
-    public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
-        return topicReferenceStore.lookupSubscription(clientId,subscriptionName);
+    public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
+        return topicReferenceStore.lookupSubscription(clientId, subscriptionName);
     }
 
-    public void addSubsciption(SubscriptionInfo subscriptionInfo,boolean retroactive)
-            throws IOException{
-        topicReferenceStore.addSubsciption(subscriptionInfo,retroactive);
+    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
+        topicReferenceStore.addSubsciption(subscriptionInfo, retroactive);
     }
 
     /**
      */
-    public void acknowledge(ConnectionContext context,String clientId,String subscriptionName,final MessageId messageId)
-            throws IOException{
-        final boolean debug=log.isDebugEnabled();
-        JournalTopicAck ack=new JournalTopicAck();
+    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, final MessageId messageId) throws IOException {
+        final boolean debug = log.isDebugEnabled();
+        JournalTopicAck ack = new JournalTopicAck();
         ack.setDestination(destination);
         ack.setMessageId(messageId);
         ack.setMessageSequenceId(messageId.getBrokerSequenceId());
         ack.setSubscritionName(subscriptionName);
         ack.setClientId(clientId);
-        ack.setTransactionId(context.getTransaction()!=null?context.getTransaction().getTransactionId():null);
-        final Location location=peristenceAdapter.writeCommand(ack,false);
-        final SubscriptionKey key=new SubscriptionKey(clientId,subscriptionName);
-        if(!context.isInTransaction()){
-            if(debug)
-                log.debug("Journalled acknowledge for: "+messageId+", at: "+location);
-            acknowledge(messageId,location,key);
-        }else{
-            if(debug)
-                log.debug("Journalled transacted acknowledge for: "+messageId+", at: "+location);
-            synchronized(this){
+        ack.setTransactionId(context.getTransaction() != null ? context.getTransaction().getTransactionId() : null);
+        final Location location = peristenceAdapter.writeCommand(ack, false);
+        final SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
+        if (!context.isInTransaction()) {
+            if (debug) {
+                log.debug("Journalled acknowledge for: " + messageId + ", at: " + location);
+            }
+            acknowledge(messageId, location, key);
+        } else {
+            if (debug) {
+                log.debug("Journalled transacted acknowledge for: " + messageId + ", at: " + location);
+            }
+            synchronized (this) {
                 inFlightTxLocations.add(location);
             }
-            transactionStore.acknowledge(this,ack,location);
-            context.getTransaction().addSynchronization(new Synchronization(){
+            transactionStore.acknowledge(this, ack, location);
+            context.getTransaction().addSynchronization(new Synchronization() {
 
-                public void afterCommit() throws Exception{
-                    if(debug)
-                        log.debug("Transacted acknowledge commit for: "+messageId+", at: "+location);
-                    synchronized(AMQTopicMessageStore.this){
+                public void afterCommit() throws Exception {
+                    if (debug) {
+                        log.debug("Transacted acknowledge commit for: " + messageId + ", at: " + location);
+                    }
+                    synchronized (AMQTopicMessageStore.this) {
                         inFlightTxLocations.remove(location);
-                        acknowledge(messageId,location,key);
+                        acknowledge(messageId, location, key);
                     }
                 }
 
-                public void afterRollback() throws Exception{
-                    if(debug)
-                        log.debug("Transacted acknowledge rollback for: "+messageId+", at: "+location);
-                    synchronized(AMQTopicMessageStore.this){
+                public void afterRollback() throws Exception {
+                    if (debug) {
+                        log.debug("Transacted acknowledge rollback for: " + messageId + ", at: " + location);
+                    }
+                    synchronized (AMQTopicMessageStore.this) {
                         inFlightTxLocations.remove(location);
                     }
                 }
@@ -122,17 +122,15 @@
         }
     }
 
-    public boolean replayAcknowledge(ConnectionContext context,String clientId,String subscritionName,
-            MessageId messageId){
-        try{
-            SubscriptionInfo sub=topicReferenceStore.lookupSubscription(clientId,subscritionName);
-            if(sub!=null){
-                topicReferenceStore.acknowledge(context,clientId,subscritionName,messageId);
+    public boolean replayAcknowledge(ConnectionContext context, String clientId, String subscritionName, MessageId messageId) {
+        try {
+            SubscriptionInfo sub = topicReferenceStore.lookupSubscription(clientId, subscritionName);
+            if (sub != null) {
+                topicReferenceStore.acknowledge(context, clientId, subscritionName, messageId);
                 return true;
             }
-        }catch(Throwable e){
-            log.debug("Could not replay acknowledge for message '"+messageId
-                    +"'.  Message may have already been acknowledged. reason: "+e);
+        } catch (Throwable e) {
+            log.debug("Could not replay acknowledge for message '" + messageId + "'.  Message may have already been acknowledged. reason: " + e);
         }
         return false;
     }
@@ -143,26 +141,27 @@
      * @param key
      * @throws InterruptedIOException
      */
-    protected void acknowledge(MessageId messageId,Location location,SubscriptionKey key) throws InterruptedIOException{
-        synchronized(this){
-            lastLocation=location;
-            ackedLastAckLocations.put(key,messageId);
+    protected void acknowledge(MessageId messageId, Location location, SubscriptionKey key) throws InterruptedIOException {
+        synchronized (this) {
+            lastLocation = location;
+            ackedLastAckLocations.put(key, messageId);
         }
-        try{
+        try {
             asyncWriteTask.wakeup();
-        }catch(InterruptedException e){
+        } catch (InterruptedException e) {
             throw new InterruptedIOException();
         }
     }
 
-    @Override protected Location doAsyncWrite() throws IOException{
-        final HashMap<SubscriptionKey,MessageId> cpAckedLastAckLocations;
+    @Override
+    protected Location doAsyncWrite() throws IOException {
+        final HashMap<SubscriptionKey, MessageId> cpAckedLastAckLocations;
         // swap out the hash maps..
-        synchronized(this){
-            cpAckedLastAckLocations=this.ackedLastAckLocations;
-            this.ackedLastAckLocations=new HashMap<SubscriptionKey,MessageId>();
+        synchronized (this) {
+            cpAckedLastAckLocations = this.ackedLastAckLocations;
+            this.ackedLastAckLocations = new HashMap<SubscriptionKey, MessageId>();
         }
-        Location location=super.doAsyncWrite();
+        Location location = super.doAsyncWrite();
 
         if (cpAckedLastAckLocations != null) {
             transactionTemplate.run(new Callback() {
@@ -172,8 +171,7 @@
                     while (iterator.hasNext()) {
                         SubscriptionKey subscriptionKey = iterator.next();
                         MessageId identity = cpAckedLastAckLocations.get(subscriptionKey);
-                        topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId,
-                                subscriptionKey.subscriptionName, identity);
+                        topicReferenceStore.acknowledge(transactionTemplate.getContext(), subscriptionKey.clientId, subscriptionKey.subscriptionName, identity);
                     }
                 }
             });
@@ -184,24 +182,24 @@
     /**
      * @return Returns the longTermStore.
      */
-    public TopicReferenceStore getTopicReferenceStore(){
+    public TopicReferenceStore getTopicReferenceStore() {
         return topicReferenceStore;
     }
 
-    public void deleteSubscription(String clientId,String subscriptionName) throws IOException{
-        topicReferenceStore.deleteSubscription(clientId,subscriptionName);
+    public void deleteSubscription(String clientId, String subscriptionName) throws IOException {
+        topicReferenceStore.deleteSubscription(clientId, subscriptionName);
     }
 
-    public SubscriptionInfo[] getAllSubscriptions() throws IOException{
+    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
         return topicReferenceStore.getAllSubscriptions();
     }
 
-    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+    public int getMessageCount(String clientId, String subscriberName) throws IOException {
         flush();
-        return topicReferenceStore.getMessageCount(clientId,subscriberName);
+        return topicReferenceStore.getMessageCount(clientId, subscriberName);
     }
 
-    public void resetBatching(String clientId,String subscriptionName){
-        topicReferenceStore.resetBatching(clientId,subscriptionName);
+    public void resetBatching(String clientId, String subscriptionName) {
+        topicReferenceStore.resetBatching(clientId, subscriptionName);
     }
-}
\ No newline at end of file
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTransactionStore.java Wed Aug  8 11:56:59 2007
@@ -21,7 +21,9 @@
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
+
 import javax.transaction.xa.XAException;
+
 import org.apache.activemq.command.JournalTopicAck;
 import org.apache.activemq.command.JournalTransaction;
 import org.apache.activemq.command.Message;
@@ -32,34 +34,34 @@
 import org.apache.activemq.store.TransactionRecoveryListener;
 import org.apache.activemq.store.TransactionStore;
 
-
 /**
  */
-public class AMQTransactionStore implements TransactionStore{
+public class AMQTransactionStore implements TransactionStore {
 
     private final AMQPersistenceAdapter peristenceAdapter;
-    Map<TransactionId,AMQTx> inflightTransactions=new LinkedHashMap<TransactionId,AMQTx>();
-    Map<TransactionId,AMQTx> preparedTransactions=new LinkedHashMap<TransactionId,AMQTx>();
+    Map<TransactionId, AMQTx> inflightTransactions = new LinkedHashMap<TransactionId, AMQTx>();
+    Map<TransactionId, AMQTx> preparedTransactions = new LinkedHashMap<TransactionId, AMQTx>();
     private boolean doingRecover;
 
-    public AMQTransactionStore(AMQPersistenceAdapter adapter){
-        this.peristenceAdapter=adapter;
+    public AMQTransactionStore(AMQPersistenceAdapter adapter) {
+        this.peristenceAdapter = adapter;
     }
 
     /**
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
-    public void prepare(TransactionId txid) throws IOException{
-        AMQTx tx=null;
-        synchronized(inflightTransactions){
-            tx=inflightTransactions.remove(txid);
+    public void prepare(TransactionId txid) throws IOException {
+        AMQTx tx = null;
+        synchronized (inflightTransactions) {
+            tx = inflightTransactions.remove(txid);
         }
-        if(tx==null)
+        if (tx == null) {
             return;
-        peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE,txid,false),true);
-        synchronized(preparedTransactions){
-            preparedTransactions.put(txid,tx);
+        }
+        peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_PREPARE, txid, false), true);
+        synchronized (preparedTransactions) {
+            preparedTransactions.put(txid, tx);
         }
     }
 
@@ -67,26 +69,27 @@
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
      */
-    public void replayPrepare(TransactionId txid) throws IOException{
-        AMQTx tx=null;
-        synchronized(inflightTransactions){
-            tx=inflightTransactions.remove(txid);
+    public void replayPrepare(TransactionId txid) throws IOException {
+        AMQTx tx = null;
+        synchronized (inflightTransactions) {
+            tx = inflightTransactions.remove(txid);
         }
-        if(tx==null)
+        if (tx == null) {
             return;
-        synchronized(preparedTransactions){
-            preparedTransactions.put(txid,tx);
+        }
+        synchronized (preparedTransactions) {
+            preparedTransactions.put(txid, tx);
         }
     }
 
-    public AMQTx getTx(TransactionId txid,Location location){
-        AMQTx tx=null;
-        synchronized(inflightTransactions){
-            tx=inflightTransactions.get(txid);
+    public AMQTx getTx(TransactionId txid, Location location) {
+        AMQTx tx = null;
+        synchronized (inflightTransactions) {
+            tx = inflightTransactions.get(txid);
         }
-        if(tx==null){
-            tx=new AMQTx(location);
-            inflightTransactions.put(txid,tx);
+        if (tx == null) {
+            tx = new AMQTx(location);
+            inflightTransactions.put(txid, tx);
         }
         return tx;
     }
@@ -95,24 +98,24 @@
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public void commit(TransactionId txid,boolean wasPrepared) throws IOException{
+    public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
         AMQTx tx;
-        if(wasPrepared){
-            synchronized(preparedTransactions){
-                tx=preparedTransactions.remove(txid);
+        if (wasPrepared) {
+            synchronized (preparedTransactions) {
+                tx = preparedTransactions.remove(txid);
             }
-        }else{
-            synchronized(inflightTransactions){
-                tx=inflightTransactions.remove(txid);
+        } else {
+            synchronized (inflightTransactions) {
+                tx = inflightTransactions.remove(txid);
             }
         }
-        if(tx==null)
+        if (tx == null) {
             return;
-        if(txid.isXATransaction()){
-            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT,txid,wasPrepared),true);
-        }else{
-            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT,txid,wasPrepared),
-                    true);
+        }
+        if (txid.isXATransaction()) {
+            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_COMMIT, txid, wasPrepared), true);
+        } else {
+            peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_COMMIT, txid, wasPrepared), true);
         }
     }
 
@@ -120,13 +123,13 @@
      * @throws XAException
      * @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
      */
-    public AMQTx replayCommit(TransactionId txid,boolean wasPrepared) throws IOException{
-        if(wasPrepared){
-            synchronized(preparedTransactions){
+    public AMQTx replayCommit(TransactionId txid, boolean wasPrepared) throws IOException {
+        if (wasPrepared) {
+            synchronized (preparedTransactions) {
                 return preparedTransactions.remove(txid);
             }
-        }else{
-            synchronized(inflightTransactions){
+        } else {
+            synchronized (inflightTransactions) {
                 return inflightTransactions.remove(txid);
             }
         }
@@ -136,21 +139,21 @@
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
      */
-    public void rollback(TransactionId txid) throws IOException{
-        AMQTx tx=null;
-        synchronized(inflightTransactions){
-            tx=inflightTransactions.remove(txid);
-        }
-        if(tx!=null)
-            synchronized(preparedTransactions){
-                tx=preparedTransactions.remove(txid);
-            }
-        if(tx!=null){
-            if(txid.isXATransaction()){
-                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK,txid,false),true);
-            }else{
-                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK,txid,false),
-                        true);
+    public void rollback(TransactionId txid) throws IOException {
+        AMQTx tx = null;
+        synchronized (inflightTransactions) {
+            tx = inflightTransactions.remove(txid);
+        }
+        if (tx != null) {
+            synchronized (preparedTransactions) {
+                tx = preparedTransactions.remove(txid);
+            }
+        }
+        if (tx != null) {
+            if (txid.isXATransaction()) {
+                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.XA_ROLLBACK, txid, false), true);
+            } else {
+                peristenceAdapter.writeCommand(new JournalTransaction(JournalTransaction.LOCAL_ROLLBACK, txid, false), true);
             }
         }
     }
@@ -159,42 +162,42 @@
      * @throws IOException
      * @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
      */
-    public void replayRollback(TransactionId txid) throws IOException{
-        boolean inflight=false;
-        synchronized(inflightTransactions){
-            inflight=inflightTransactions.remove(txid)!=null;
+    public void replayRollback(TransactionId txid) throws IOException {
+        boolean inflight = false;
+        synchronized (inflightTransactions) {
+            inflight = inflightTransactions.remove(txid) != null;
         }
-        if(inflight){
-            synchronized(preparedTransactions){
+        if (inflight) {
+            synchronized (preparedTransactions) {
                 preparedTransactions.remove(txid);
             }
         }
     }
 
-    public void start() throws Exception{
+    public void start() throws Exception {
     }
 
-    public void stop() throws Exception{
+    public void stop() throws Exception {
     }
 
-    synchronized public void recover(TransactionRecoveryListener listener) throws IOException{
+    synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
         // All the in-flight transactions get rolled back..
-        synchronized(inflightTransactions){
+        synchronized (inflightTransactions) {
             inflightTransactions.clear();
         }
-        this.doingRecover=true;
-        try{
-            Map<TransactionId,AMQTx> txs=null;
-            synchronized(preparedTransactions){
-                txs=new LinkedHashMap<TransactionId,AMQTx>(preparedTransactions);
-            }
-            for(Iterator<TransactionId> iter=txs.keySet().iterator();iter.hasNext();){
-                Object txid=iter.next();
-                AMQTx tx=txs.get(txid);
-                listener.recover((XATransactionId)txid,tx.getMessages(),tx.getAcks());
+        this.doingRecover = true;
+        try {
+            Map<TransactionId, AMQTx> txs = null;
+            synchronized (preparedTransactions) {
+                txs = new LinkedHashMap<TransactionId, AMQTx>(preparedTransactions);
+            }
+            for (Iterator<TransactionId> iter = txs.keySet().iterator(); iter.hasNext();) {
+                Object txid = iter.next();
+                AMQTx tx = txs.get(txid);
+                listener.recover((XATransactionId)txid, tx.getMessages(), tx.getAcks());
             }
-        }finally{
-            this.doingRecover=false;
+        } finally {
+            this.doingRecover = false;
         }
     }
 
@@ -202,69 +205,70 @@
      * @param message
      * @throws IOException
      */
-    void addMessage(AMQMessageStore store,Message message,Location location) throws IOException{
-        AMQTx tx=getTx(message.getTransactionId(),location);
-        tx.add(store,message,location);
+    void addMessage(AMQMessageStore store, Message message, Location location) throws IOException {
+        AMQTx tx = getTx(message.getTransactionId(), location);
+        tx.add(store, message, location);
     }
 
     /**
      * @param ack
      * @throws IOException
      */
-    public void removeMessage(AMQMessageStore store,MessageAck ack,Location location) throws IOException{
-        AMQTx tx=getTx(ack.getTransactionId(),location);
-        tx.add(store,ack);
+    public void removeMessage(AMQMessageStore store, MessageAck ack, Location location) throws IOException {
+        AMQTx tx = getTx(ack.getTransactionId(), location);
+        tx.add(store, ack);
     }
 
-    public void acknowledge(AMQTopicMessageStore store,JournalTopicAck ack,Location location){
-        AMQTx tx=getTx(ack.getTransactionId(),location);
-        tx.add(store,ack);
+    public void acknowledge(AMQTopicMessageStore store, JournalTopicAck ack, Location location) {
+        AMQTx tx = getTx(ack.getTransactionId(), location);
+        tx.add(store, ack);
     }
 
-    public Location checkpoint() throws IOException{
+    public Location checkpoint() throws IOException {
         // Nothing really to checkpoint.. since, we don't
-        // checkpoint tx operations in to long term store until they are committed.
+        // checkpoint tx operations in to long term store until they are
+        // committed.
         // But we keep track of the first location of an operation
         // that was associated with an active tx. The journal can not
         // roll over active tx records.
-        Location rc=null;
-        synchronized(inflightTransactions){
-            for(Iterator<AMQTx> iter=inflightTransactions.values().iterator();iter.hasNext();){
-                AMQTx tx=iter.next();
-                Location location=tx.getLocation();
-                if(rc==null||rc.compareTo(location)<0){
-                    rc=location;
+        Location rc = null;
+        synchronized (inflightTransactions) {
+            for (Iterator<AMQTx> iter = inflightTransactions.values().iterator(); iter.hasNext();) {
+                AMQTx tx = iter.next();
+                Location location = tx.getLocation();
+                if (rc == null || rc.compareTo(location) < 0) {
+                    rc = location;
                 }
             }
         }
-        synchronized(preparedTransactions){
-            for(Iterator<AMQTx> iter=preparedTransactions.values().iterator();iter.hasNext();){
-                AMQTx tx=iter.next();
-                Location location=tx.getLocation();
-                if(rc==null||rc.compareTo(location)<0){
-                    rc=location;
+        synchronized (preparedTransactions) {
+            for (Iterator<AMQTx> iter = preparedTransactions.values().iterator(); iter.hasNext();) {
+                AMQTx tx = iter.next();
+                Location location = tx.getLocation();
+                if (rc == null || rc.compareTo(location) < 0) {
+                    rc = location;
                 }
             }
             return rc;
         }
     }
 
-    public boolean isDoingRecover(){
+    public boolean isDoingRecover() {
         return doingRecover;
     }
 
     /**
      * @return the preparedTransactions
      */
-    public Map<TransactionId,AMQTx> getPreparedTransactions(){
+    public Map<TransactionId, AMQTx> getPreparedTransactions() {
         return this.preparedTransactions;
     }
 
     /**
      * @param preparedTransactions the preparedTransactions to set
      */
-    public void setPreparedTransactions(Map<TransactionId,AMQTx> preparedTransactions){
-        if(preparedTransactions!=null){
+    public void setPreparedTransactions(Map<TransactionId, AMQTx> preparedTransactions) {
+        if (preparedTransactions != null) {
             this.preparedTransactions.clear();
             this.preparedTransactions.putAll(preparedTransactions);
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTx.java Wed Aug  8 11:56:59 2007
@@ -24,56 +24,59 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.kaha.impl.async.Location;
 
-
 /**
  */
 /**
  * Operations
+ * 
  * @version $Revision: 1.6 $
  */
-public class AMQTx{
+public class AMQTx {
 
     private final Location location;
-    private ArrayList<AMQTxOperation> operations=new ArrayList<AMQTxOperation>();
+    private ArrayList<AMQTxOperation> operations = new ArrayList<AMQTxOperation>();
 
-    public AMQTx(Location location){
-        this.location=location;
+    public AMQTx(Location location) {
+        this.location = location;
     }
 
-    public void add(AMQMessageStore store,Message msg,Location location){
-        operations.add(new AMQTxOperation(AMQTxOperation.ADD_OPERATION_TYPE,store.getDestination(),msg,location));
+    public void add(AMQMessageStore store, Message msg, Location location) {
+        operations.add(new AMQTxOperation(AMQTxOperation.ADD_OPERATION_TYPE, store.getDestination(), msg,
+                                          location));
     }
 
-    public void add(AMQMessageStore store,MessageAck ack){
-        operations.add(new AMQTxOperation(AMQTxOperation.REMOVE_OPERATION_TYPE,store.getDestination(),ack,null));
+    public void add(AMQMessageStore store, MessageAck ack) {
+        operations.add(new AMQTxOperation(AMQTxOperation.REMOVE_OPERATION_TYPE, store.getDestination(), ack,
+                                          null));
     }
 
-    public void add(AMQTopicMessageStore store,JournalTopicAck ack){
-        operations.add(new AMQTxOperation(AMQTxOperation.ACK_OPERATION_TYPE,store.getDestination(),ack,null));
+    public void add(AMQTopicMessageStore store, JournalTopicAck ack) {
+        operations.add(new AMQTxOperation(AMQTxOperation.ACK_OPERATION_TYPE, store.getDestination(), ack,
+                                          null));
     }
 
-    public Message[] getMessages(){
-        ArrayList<Object> list=new ArrayList<Object>();
-        for(Iterator<AMQTxOperation> iter=operations.iterator();iter.hasNext();){
-            AMQTxOperation op=iter.next();
-            if(op.getOperationType()==AMQTxOperation.ADD_OPERATION_TYPE){
+    public Message[] getMessages() {
+        ArrayList<Object> list = new ArrayList<Object>();
+        for (Iterator<AMQTxOperation> iter = operations.iterator(); iter.hasNext();) {
+            AMQTxOperation op = iter.next();
+            if (op.getOperationType() == AMQTxOperation.ADD_OPERATION_TYPE) {
                 list.add(op.getData());
             }
         }
-        Message rc[]=new Message[list.size()];
+        Message rc[] = new Message[list.size()];
         list.toArray(rc);
         return rc;
     }
 
-    public MessageAck[] getAcks(){
-        ArrayList<Object> list=new ArrayList<Object>();
-        for(Iterator<AMQTxOperation> iter=operations.iterator();iter.hasNext();){
-            AMQTxOperation op=iter.next();
-            if(op.getOperationType()==AMQTxOperation.REMOVE_OPERATION_TYPE){
+    public MessageAck[] getAcks() {
+        ArrayList<Object> list = new ArrayList<Object>();
+        for (Iterator<AMQTxOperation> iter = operations.iterator(); iter.hasNext();) {
+            AMQTxOperation op = iter.next();
+            if (op.getOperationType() == AMQTxOperation.REMOVE_OPERATION_TYPE) {
                 list.add(op.getData());
             }
         }
-        MessageAck rc[]=new MessageAck[list.size()];
+        MessageAck rc[] = new MessageAck[list.size()];
         list.toArray(rc);
         return rc;
     }
@@ -81,17 +84,15 @@
     /**
      * @return the location
      */
-    public Location getLocation(){
+    public Location getLocation() {
         return this.location;
     }
 
-    public ArrayList<AMQTxOperation> getOperations(){
+    public ArrayList<AMQTxOperation> getOperations() {
         return operations;
     }
 
-    public void setOperations(ArrayList<AMQTxOperation> operations){
-        this.operations=operations;
+    public void setOperations(ArrayList<AMQTxOperation> operations) {
+        this.operations = operations;
     }
 }
-
-   
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQTxOperation.java Wed Aug  8 11:56:59 2007
@@ -29,14 +29,13 @@
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.wireformat.WireFormat;
 
-
 /**
  */
 public class AMQTxOperation {
 
-    public static final byte ADD_OPERATION_TYPE=0;
-    public static final byte REMOVE_OPERATION_TYPE=1;
-    public static final byte ACK_OPERATION_TYPE=3;
+    public static final byte ADD_OPERATION_TYPE = 0;
+    public static final byte REMOVE_OPERATION_TYPE = 1;
+    public static final byte ACK_OPERATION_TYPE = 3;
     private byte operationType;
     private ActiveMQDestination destination;
     private Object data;
@@ -44,74 +43,73 @@
 
     public AMQTxOperation() {
     }
-    
-    public AMQTxOperation(byte operationType,ActiveMQDestination destination,Object data,Location location){
-        this.operationType=operationType;
-        this.destination=destination;
-        this.data=data;
-        this.location=location;
-        
+
+    public AMQTxOperation(byte operationType, ActiveMQDestination destination, Object data, Location location) {
+        this.operationType = operationType;
+        this.destination = destination;
+        this.data = data;
+        this.location = location;
+
     }
 
     /**
      * @return the data
      */
-    public Object getData(){
+    public Object getData() {
         return this.data;
     }
 
     /**
      * @param data the data to set
      */
-    public void setData(Object data){
-        this.data=data;
+    public void setData(Object data) {
+        this.data = data;
     }
 
     /**
      * @return the location
      */
-    public Location getLocation(){
+    public Location getLocation() {
         return this.location;
     }
 
     /**
      * @param location the location to set
      */
-    public void setLocation(Location location){
-        this.location=location;
+    public void setLocation(Location location) {
+        this.location = location;
     }
 
     /**
      * @return the operationType
      */
-    public byte getOperationType(){
+    public byte getOperationType() {
         return this.operationType;
     }
 
     /**
      * @param operationType the operationType to set
      */
-    public void setOperationType(byte operationType){
-        this.operationType=operationType;
+    public void setOperationType(byte operationType) {
+        this.operationType = operationType;
     }
 
-   
-    public boolean replay(AMQPersistenceAdapter adapter,ConnectionContext context) throws IOException{
-        boolean result=false;
-        AMQMessageStore store=(AMQMessageStore)adapter.createMessageStore(destination);
-        if(operationType==ADD_OPERATION_TYPE){
-            result=store.replayAddMessage(context,(Message)data,location);
-        }else if(operationType==REMOVE_OPERATION_TYPE){
-            result=store.replayRemoveMessage(context,(MessageAck)data);
-        }else{
-            JournalTopicAck ack=(JournalTopicAck)data;
-            result=((AMQTopicMessageStore)store).replayAcknowledge(context,ack.getClientId(),ack.getSubscritionName(),
-                    ack.getMessageId());
+    public boolean replay(AMQPersistenceAdapter adapter, ConnectionContext context) throws IOException {
+        boolean result = false;
+        AMQMessageStore store = (AMQMessageStore)adapter.createMessageStore(destination);
+        if (operationType == ADD_OPERATION_TYPE) {
+            result = store.replayAddMessage(context, (Message)data, location);
+        } else if (operationType == REMOVE_OPERATION_TYPE) {
+            result = store.replayRemoveMessage(context, (MessageAck)data);
+        } else {
+            JournalTopicAck ack = (JournalTopicAck)data;
+            result = ((AMQTopicMessageStore)store).replayAcknowledge(context, ack.getClientId(), ack
+                .getSubscritionName(), ack.getMessageId());
         }
         return result;
     }
-    
-    public void writeExternal(WireFormat wireFormat,DataOutput dos) throws IOException {
+
+    public void writeExternal(WireFormat wireFormat, DataOutput dos) throws IOException {
         location.writeExternal(dos);
         ByteSequence packet = wireFormat.marshal(getData());
         dos.writeInt(packet.length);
@@ -121,16 +119,16 @@
         dos.write(packet.data, packet.offset, packet.length);
     }
 
-    public void readExternal(WireFormat wireFormat,DataInput dis) throws IOException {
-        this.location=new Location();
+    public void readExternal(WireFormat wireFormat, DataInput dis) throws IOException {
+        this.location = new Location();
         this.location.readExternal(dis);
-        int size=dis.readInt();
-        byte[] data=new byte[size];
+        int size = dis.readInt();
+        byte[] data = new byte[size];
         dis.readFully(data);
         setData(wireFormat.unmarshal(new ByteSequence(data)));
-        size=dis.readInt();
-        data=new byte[size];
+        size = dis.readInt();
+        data = new byte[size];
         dis.readFully(data);
-        this.destination=(ActiveMQDestination)wireFormat.unmarshal(new ByteSequence(data));
+        this.destination = (ActiveMQDestination)wireFormat.unmarshal(new ByteSequence(data));
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java Wed Aug  8 11:56:59 2007
@@ -21,53 +21,52 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-final class RecoveryListenerAdapter implements MessageRecoveryListener{
+final class RecoveryListenerAdapter implements MessageRecoveryListener {
 
-    static final private Log log=LogFactory.getLog(RecoveryListenerAdapter.class);
+    static final private Log log = LogFactory.getLog(RecoveryListenerAdapter.class);
     private final MessageStore store;
     private final MessageRecoveryListener listener;
-    private int count=0;
+    private int count = 0;
     private MessageId lastRecovered;
 
-    RecoveryListenerAdapter(MessageStore store,MessageRecoveryListener listener){
-        this.store=store;
-        this.listener=listener;
+    RecoveryListenerAdapter(MessageStore store, MessageRecoveryListener listener) {
+        this.store = store;
+        this.listener = listener;
     }
 
-    
-    public boolean hasSpace(){
+    public boolean hasSpace() {
         return listener.hasSpace();
     }
 
-    public boolean recoverMessage(Message message) throws Exception{
-        if(listener.hasSpace()){
+    public boolean recoverMessage(Message message) throws Exception {
+        if (listener.hasSpace()) {
             listener.recoverMessage(message);
-            lastRecovered=message.getMessageId();
+            lastRecovered = message.getMessageId();
             count++;
             return true;
         }
         return false;
     }
 
-    public boolean recoverMessageReference(MessageId ref) throws Exception{
-        Message message=this.store.getMessage(ref);
-        if(message!=null){
-           return  recoverMessage(message);
-        }else{
-            log.error("Message id "+ref+" could not be recovered from the data store!");
+    public boolean recoverMessageReference(MessageId ref) throws Exception {
+        Message message = this.store.getMessage(ref);
+        if (message != null) {
+            return recoverMessage(message);
+        } else {
+            log.error("Message id " + ref + " could not be recovered from the data store!");
         }
         return false;
     }
-    
+
     MessageId getLastRecoveredMessageId() {
         return lastRecovered;
     }
 
-    int size(){
+    int size() {
         return count;
     }
 
-    void reset(){
-        count=0;
+    void reset() {
+        count = 0;
     }
-}
\ No newline at end of file
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/DataSourceSupport.java Wed Aug  8 11:56:59 2007
@@ -65,7 +65,7 @@
     public DataSource getDataSource() throws IOException {
         if (dataSource == null) {
             dataSource = createDataSource();
-            if (dataSource == null) { 
+            if (dataSource == null) {
                 throw new IllegalArgumentException("No dataSource property has been configured");
             }
         }
@@ -88,9 +88,9 @@
         ds.setCreateDatabase("create");
         return ds;
     }
-    
-    public String toString(){
-        return ""+dataSource;
+
+    public String toString() {
+        return "" + dataSource;
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCAdapter.java Wed Aug  8 11:56:59 2007
@@ -24,69 +24,87 @@
 /**
  * @version $Revision: 1.5 $
  */
-public interface JDBCAdapter{
+public interface JDBCAdapter {
 
     public void setStatements(Statements statementProvider);
 
-    public abstract void doCreateTables(TransactionContext c) throws SQLException,IOException;
+    public abstract void doCreateTables(TransactionContext c) throws SQLException, IOException;
 
-    public abstract void doDropTables(TransactionContext c) throws SQLException,IOException;
+    public abstract void doDropTables(TransactionContext c) throws SQLException, IOException;
 
-    public abstract void doAddMessage(TransactionContext c,MessageId messageID,ActiveMQDestination destination,
-            byte[] data,long expiration) throws SQLException,IOException;
+    public abstract void doAddMessage(TransactionContext c, MessageId messageID,
+                                      ActiveMQDestination destination, byte[] data, long expiration)
+        throws SQLException, IOException;
 
-    public abstract void doAddMessageReference(TransactionContext c,MessageId messageId,
-            ActiveMQDestination destination,long expirationTime,String messageRef) throws SQLException,IOException;
+    public abstract void doAddMessageReference(TransactionContext c, MessageId messageId,
+                                               ActiveMQDestination destination, long expirationTime,
+                                               String messageRef) throws SQLException, IOException;
 
-    public abstract byte[] doGetMessage(TransactionContext c,long seq) throws SQLException,IOException;
+    public abstract byte[] doGetMessage(TransactionContext c, long seq) throws SQLException, IOException;
 
-    public abstract String doGetMessageReference(TransactionContext c,long id) throws SQLException,IOException;
+    public abstract String doGetMessageReference(TransactionContext c, long id) throws SQLException,
+        IOException;
 
-    public abstract void doRemoveMessage(TransactionContext c,long seq) throws SQLException,IOException;
+    public abstract void doRemoveMessage(TransactionContext c, long seq) throws SQLException, IOException;
 
-    public abstract void doRecover(TransactionContext c,ActiveMQDestination destination,
-            JDBCMessageRecoveryListener listener) throws Exception;
+    public abstract void doRecover(TransactionContext c, ActiveMQDestination destination,
+                                   JDBCMessageRecoveryListener listener) throws Exception;
 
-    public abstract void doSetLastAck(TransactionContext c,ActiveMQDestination destination,String clientId,
-            String subscriptionName,long seq) throws SQLException,IOException;
+    public abstract void doSetLastAck(TransactionContext c, ActiveMQDestination destination, String clientId,
+                                      String subscriptionName, long seq) throws SQLException, IOException;
 
-    public abstract void doRecoverSubscription(TransactionContext c,ActiveMQDestination destination,String clientId,
-            String subscriptionName,JDBCMessageRecoveryListener listener) throws Exception;
+    public abstract void doRecoverSubscription(TransactionContext c, ActiveMQDestination destination,
+                                               String clientId, String subscriptionName,
+                                               JDBCMessageRecoveryListener listener) throws Exception;
 
-    public abstract void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,String clientId,
-            String subscriptionName,long seq,int maxReturned,JDBCMessageRecoveryListener listener) throws Exception;
+    public abstract void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination,
+                                               String clientId, String subscriptionName, long seq,
+                                               int maxReturned, JDBCMessageRecoveryListener listener)
+        throws Exception;
 
-    public abstract void doSetSubscriberEntry(TransactionContext c,SubscriptionInfo subscriptionInfo,boolean retroactive) throws SQLException,IOException;
+    public abstract void doSetSubscriberEntry(TransactionContext c, SubscriptionInfo subscriptionInfo,
+                                              boolean retroactive) throws SQLException, IOException;
 
-    public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,ActiveMQDestination destination,
-            String clientId,String subscriptionName) throws SQLException,IOException;
+    public abstract SubscriptionInfo doGetSubscriberEntry(TransactionContext c,
+                                                          ActiveMQDestination destination, String clientId,
+                                                          String subscriptionName) throws SQLException,
+        IOException;
 
-    public abstract long getBrokerSequenceId(TransactionContext c,MessageId messageID) throws SQLException,IOException;
+    public abstract long getBrokerSequenceId(TransactionContext c, MessageId messageID) throws SQLException,
+        IOException;
 
-    public abstract void doRemoveAllMessages(TransactionContext c,ActiveMQDestination destinationName)
-            throws SQLException,IOException;
+    public abstract void doRemoveAllMessages(TransactionContext c, ActiveMQDestination destinationName)
+        throws SQLException, IOException;
 
-    public abstract void doDeleteSubscription(TransactionContext c,ActiveMQDestination destinationName,String clientId,
-            String subscriptionName) throws SQLException,IOException;
+    public abstract void doDeleteSubscription(TransactionContext c, ActiveMQDestination destinationName,
+                                              String clientId, String subscriptionName) throws SQLException,
+        IOException;
 
-    public abstract void doDeleteOldMessages(TransactionContext c) throws SQLException,IOException;
+    public abstract void doDeleteOldMessages(TransactionContext c) throws SQLException, IOException;
 
-    public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException,IOException;
+    public abstract long doGetLastMessageBrokerSequenceId(TransactionContext c) throws SQLException,
+        IOException;
 
-    public abstract Set doGetDestinations(TransactionContext c) throws SQLException,IOException;
+    public abstract Set doGetDestinations(TransactionContext c) throws SQLException, IOException;
 
     public abstract void setUseExternalMessageReferences(boolean useExternalMessageReferences);
 
-    public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,ActiveMQDestination destination)
-            throws SQLException,IOException;
-
-    public int doGetDurableSubscriberMessageCount(TransactionContext c,ActiveMQDestination destination,String clientId,
-            String subscriptionName) throws SQLException,IOException;
-
-    public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException, IOException;
-    
-    public void doRecoverNextMessages(TransactionContext c,ActiveMQDestination destination,long nextSeq,int maxReturned,
-            JDBCMessageRecoveryListener listener) throws Exception;
-    
-    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,ActiveMQDestination destination,String clientId, String subscriberName) throws SQLException,IOException;
-}
\ No newline at end of file
+    public abstract SubscriptionInfo[] doGetAllSubscriptions(TransactionContext c,
+                                                             ActiveMQDestination destination)
+        throws SQLException, IOException;
+
+    public int doGetDurableSubscriberMessageCount(TransactionContext c, ActiveMQDestination destination,
+                                                  String clientId, String subscriptionName)
+        throws SQLException, IOException;
+
+    public int doGetMessageCount(TransactionContext c, ActiveMQDestination destination) throws SQLException,
+        IOException;
+
+    public void doRecoverNextMessages(TransactionContext c, ActiveMQDestination destination, long nextSeq,
+                                      int maxReturned, JDBCMessageRecoveryListener listener) throws Exception;
+
+    public long doGetLastAckedDurableSubscriberMessageId(TransactionContext c,
+                                                         ActiveMQDestination destination, String clientId,
+                                                         String subscriberName) throws SQLException,
+        IOException;
+}

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java Wed Aug  8 11:56:59 2007
@@ -19,6 +19,7 @@
 import java.io.IOException;
 import java.sql.SQLException;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -32,7 +33,6 @@
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
-
 /**
  * @version $Revision: 1.10 $
  */
@@ -44,8 +44,7 @@
     protected final JDBCPersistenceAdapter persistenceAdapter;
     protected AtomicLong lastMessageId = new AtomicLong(-1);
 
-    public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat,
-            ActiveMQDestination destination) {
+    public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQDestination destination) {
         this.persistenceAdapter = persistenceAdapter;
         this.adapter = adapter;
         this.wireFormat = wireFormat;
@@ -53,15 +52,14 @@
     }
 
     public void addMessage(ConnectionContext context, Message message) throws IOException {
-        
+
         // Serialize the Message..
         byte data[];
         try {
             ByteSequence packet = wireFormat.marshal(message);
             data = ByteSequenceData.toByteArray(packet);
         } catch (IOException e) {
-            throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: "
-                    + e, e);
+            throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: " + e, e);
         }
 
         // Get a connection and insert the message into the DB.
@@ -69,9 +67,8 @@
         try {
             adapter.doAddMessage(c, message.getMessageId(), destination, data, message.getExpiration());
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-            throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: "
-                    + e, e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to broker message: " + message.getMessageId() + " in container: " + e, e);
         } finally {
             c.close();
         }
@@ -83,9 +80,8 @@
         try {
             adapter.doAddMessageReference(c, messageId, destination, expirationTime, messageRef);
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: "
-                    + e, e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
         } finally {
             c.close();
         }
@@ -94,7 +90,7 @@
     public Message getMessage(MessageId messageId) throws IOException {
 
         long id = messageId.getBrokerSequenceId();
-        
+
         // Get a connection and pull the message out of the DB
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -102,21 +98,21 @@
             if (data == null)
                 return null;
 
-            Message answer = (Message) wireFormat.unmarshal(new ByteSequence(data));
+            Message answer = (Message)wireFormat.unmarshal(new ByteSequence(data));
             return answer;
         } catch (IOException e) {
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
         } finally {
             c.close();
         }
     }
-    
+
     public String getMessageReference(MessageId messageId) throws IOException {
         long id = messageId.getBrokerSequenceId();
-        
+
         // Get a connection and pull the message out of the DB
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
@@ -124,7 +120,7 @@
         } catch (IOException e) {
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + messageId + " in container: " + e, e);
         } finally {
             c.close();
@@ -139,7 +135,7 @@
         try {
             adapter.doRemoveMessage(c, seq);
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker message: " + ack.getLastMessageId() + " in container: " + e, e);
         } finally {
             c.close();
@@ -154,16 +150,17 @@
             c = persistenceAdapter.getTransactionContext();
             adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
                 public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
-                    Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
+                    Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
                     msg.getMessageId().setBrokerSequenceId(sequenceId);
                     return listener.recoverMessage(msg);
                 }
+
                 public boolean recoverMessageReference(String reference) throws Exception {
                     return listener.recoverMessageReference(new MessageId(reference));
                 }
             });
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to recover container. Reason: " + e, e);
         } finally {
             c.close();
@@ -185,13 +182,13 @@
         try {
             adapter.doRemoveAllMessages(c, destination);
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to broker remove all messages: " + e, e);
         } finally {
             c.close();
         }
     }
-    
+
     public ActiveMQDestination getDestination() {
         return destination;
     }
@@ -200,16 +197,15 @@
         // we can ignore since we don't buffer up messages.
     }
 
-  
-    public int getMessageCount() throws IOException{
+    public int getMessageCount() throws IOException {
         int result = 0;
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            
+
             result = adapter.doGetMessageCount(c, destination);
-               
+
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to get Message Count: " + destination + ". Reason: " + e, e);
         } finally {
             c.close();
@@ -221,50 +217,49 @@
      * @param maxReturned
      * @param listener
      * @throws Exception
-     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int, org.apache.activemq.store.MessageRecoveryListener)
+     * @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
+     *      org.apache.activemq.store.MessageRecoveryListener)
      */
-    public void recoverNextMessages(int maxReturned,final MessageRecoveryListener listener) throws Exception{
-        TransactionContext c=persistenceAdapter.getTransactionContext();
-        
-        try{
-            adapter.doRecoverNextMessages(c,destination,lastMessageId.get(),maxReturned,
-                    new JDBCMessageRecoveryListener(){
-
-                        public  boolean recoverMessage(long sequenceId,byte[] data) throws Exception{
-                            if(listener.hasSpace()){
-                                Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
-                                msg.getMessageId().setBrokerSequenceId(sequenceId);
-                                listener.recoverMessage(msg);
-                                lastMessageId.set(sequenceId);
-                                return true;
-                            }
-                            return false;
-                        }
-
-                        public boolean recoverMessageReference(String reference) throws Exception{
-                            if(listener.hasSpace()) {
-                                listener.recoverMessageReference(new MessageId(reference));
-                                return true;
-                            }
-                            return false;
-                        }
-
-                    });
-        }catch(SQLException e){
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-        }finally{
+    public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
+        TransactionContext c = persistenceAdapter.getTransactionContext();
+
+        try {
+            adapter.doRecoverNextMessages(c, destination, lastMessageId.get(), maxReturned, new JDBCMessageRecoveryListener() {
+
+                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
+                    if (listener.hasSpace()) {
+                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
+                        msg.getMessageId().setBrokerSequenceId(sequenceId);
+                        listener.recoverMessage(msg);
+                        lastMessageId.set(sequenceId);
+                        return true;
+                    }
+                    return false;
+                }
+
+                public boolean recoverMessageReference(String reference) throws Exception {
+                    if (listener.hasSpace()) {
+                        listener.recoverMessageReference(new MessageId(reference));
+                        return true;
+                    }
+                    return false;
+                }
+
+            });
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+        } finally {
             c.close();
         }
-        
+
     }
 
     /**
-     * 
      * @see org.apache.activemq.store.MessageStore#resetBatching()
      */
-    public void resetBatching(){
+    public void resetBatching() {
         lastMessageId.set(-1);
-        
+
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java Wed Aug  8 11:56:59 2007
@@ -60,10 +60,12 @@
  * 
  * @version $Revision: 1.9 $
  */
-public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter, BrokerServiceAware {
+public class JDBCPersistenceAdapter extends DataSourceSupport implements PersistenceAdapter,
+    BrokerServiceAware {
 
     private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
-    private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/apache/activemq/store/jdbc/");
+    private static FactoryFinder factoryFinder = new FactoryFinder(
+                                                                   "META-INF/services/org/apache/activemq/store/jdbc/");
 
     private WireFormat wireFormat = new OpenWireFormat();
     private BrokerService brokerService;
@@ -93,20 +95,16 @@
         try {
             c = getTransactionContext();
             return getAdapter().doGetDestinations(c);
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
             return Collections.EMPTY_SET;
-        }
-        catch (SQLException e) {
+        } catch (SQLException e) {
             JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             return Collections.EMPTY_SET;
-        }
-        finally {
+        } finally {
             if (c != null) {
                 try {
                     c.close();
-                }
-                catch (Throwable e) {
+                } catch (Throwable e) {
                 }
             }
         }
@@ -141,7 +139,7 @@
         try {
             return getAdapter().doGetLastMessageBrokerSequenceId(c);
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to get last broker message id: " + e, e);
         } finally {
             c.close();
@@ -159,19 +157,18 @@
                     getAdapter().doCreateTables(transactionContext);
                 } catch (SQLException e) {
                     log.warn("Cannot create tables due to: " + e);
-                    JDBCPersistenceAdapter.log("Failure Details: ",e);
+                    JDBCPersistenceAdapter.log("Failure Details: ", e);
                 }
             } finally {
                 transactionContext.commit();
             }
         }
-        
+
         if (isUseDatabaseLock()) {
             DatabaseLocker service = getDatabaseLocker();
             if (service == null) {
                 log.warn("No databaseLocker configured for the JDBC Persistence Adapter");
-            }
-            else {
+            } else {
                 service.start();
             }
         }
@@ -209,20 +206,16 @@
             log.debug("Cleaning up old messages.");
             c = getTransactionContext();
             getAdapter().doDeleteOldMessages(c);
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
             log.warn("Old message cleanup failed due to: " + e, e);
-        }
-        catch (SQLException e) {
+        } catch (SQLException e) {
             log.warn("Old message cleanup failed due to: " + e);
             JDBCPersistenceAdapter.log("Failure Details: ", e);
-        }
-        finally {
+        } finally {
             if (c != null) {
                 try {
                     c.close();
-                }
-                catch (Throwable e) {
+                } catch (Throwable e) {
                 }
             }
             log.debug("Cleanup done.");
@@ -253,7 +246,6 @@
         return adapter;
     }
 
-    
     public DatabaseLocker getDatabaseLocker() throws IOException {
         if (databaseLocker == null) {
             databaseLocker = createDatabaseLocker();
@@ -274,7 +266,7 @@
     public void setDatabaseLocker(DatabaseLocker databaseLocker) {
         this.databaseLocker = databaseLocker;
     }
-    
+
     public BrokerService getBrokerService() {
         return brokerService;
     }
@@ -287,7 +279,7 @@
      * @throws IOException
      */
     protected JDBCAdapter createAdapter() throws IOException {
-        JDBCAdapter adapter=null;
+        JDBCAdapter adapter = null;
         TransactionContext c = getTransactionContext();
         try {
 
@@ -298,17 +290,18 @@
                 dirverName = dirverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
 
                 try {
-                    adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(dirverName);
+                    adapter = (DefaultJDBCAdapter)factoryFinder.newInstance(dirverName);
                     log.info("Database driver recognized: [" + dirverName + "]");
                 } catch (Throwable e) {
                     log.warn("Database driver NOT recognized: [" + dirverName
-                            + "].  Will use default JDBC implementation.");
+                             + "].  Will use default JDBC implementation.");
                 }
 
             } catch (SQLException e) {
-                log.warn("JDBC error occurred while trying to detect database type.  Will use default JDBC implementation: "
-                                + e.getMessage());
-                JDBCPersistenceAdapter.log("Failure Details: ",e);
+                log
+                    .warn("JDBC error occurred while trying to detect database type.  Will use default JDBC implementation: "
+                          + e.getMessage());
+                JDBCPersistenceAdapter.log("Failure Details: ", e);
             }
 
             // Use the default JDBC adapter if the
@@ -340,7 +333,7 @@
         if (context == null) {
             return getTransactionContext();
         } else {
-            TransactionContext answer = (TransactionContext) context.getLongTermStoreContext();
+            TransactionContext answer = (TransactionContext)context.getLongTermStoreContext();
             if (answer == null) {
                 answer = new TransactionContext(getDataSource());
                 context.setLongTermStoreContext(answer);
@@ -373,7 +366,8 @@
     }
 
     /**
-     * Sets the number of milliseconds until the database is attempted to be cleaned up for durable topics
+     * Sets the number of milliseconds until the database is attempted to be
+     * cleaned up for durable topics
      */
     public void setCleanupPeriod(int cleanupPeriod) {
         this.cleanupPeriod = cleanupPeriod;
@@ -386,7 +380,7 @@
             getAdapter().setUseExternalMessageReferences(isUseExternalMessageReferences());
             getAdapter().doCreateTables(c);
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create(e);
         } finally {
             c.close();
@@ -400,7 +394,7 @@
     public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
         this.useExternalMessageReferences = useExternalMessageReferences;
     }
-    
+
     public boolean isCreateTablesOnStartup() {
         return createTablesOnStartup;
     }
@@ -417,23 +411,24 @@
     }
 
     /**
-     * Sets whether or not an exclusive database lock should be used to enable JDBC Master/Slave. Enabled by default.
+     * Sets whether or not an exclusive database lock should be used to enable
+     * JDBC Master/Slave. Enabled by default.
      */
     public void setUseDatabaseLock(boolean useDatabaseLock) {
         this.useDatabaseLock = useDatabaseLock;
     }
 
     static public void log(String msg, SQLException e) {
-        String s = msg+e.getMessage();
-        while( e.getNextException() != null ) {
+        String s = msg + e.getMessage();
+        while (e.getNextException() != null) {
             e = e.getNextException();
-            s += ", due to: "+e.getMessage();
+            s += ", due to: " + e.getMessage();
         }
         log.debug(s, e);
     }
 
     public Statements getStatements() {
-        if( statements == null ) {
+        if (statements == null) {
             statements = new Statements();
         }
         return statements;
@@ -444,12 +439,12 @@
     }
 
     /**
-     * @param usageManager The UsageManager that is controlling the destination's memory usage.
+     * @param usageManager The UsageManager that is controlling the
+     *                destination's memory usage.
      */
     public void setUsageManager(UsageManager usageManager) {
     }
 
-
     protected void databaseLockKeepAlive() {
         boolean stop = false;
         try {
@@ -459,8 +454,7 @@
                     stop = true;
                 }
             }
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
             log.error("Failed to get database when trying keepalive: " + e, e);
         }
         if (stop) {
@@ -473,8 +467,7 @@
         log.info("No longer able to keep the exclusive lock so giving up being a master");
         try {
             brokerService.stop();
-        }
-        catch (Exception e) {
+        } catch (Exception e) {
             log.warn("Failed to stop broker");
         }
     }
@@ -482,17 +475,17 @@
     protected DatabaseLocker createDatabaseLocker() throws IOException {
         return new DefaultDatabaseLocker(getDataSource(), getStatements());
     }
-    
-    public void setBrokerName(String brokerName){
+
+    public void setBrokerName(String brokerName) {
     }
-    
-    public String toString(){
-        return "JDBCPersistenceAdaptor("+super.toString()+")";
+
+    public String toString() {
+        return "JDBCPersistenceAdaptor(" + super.toString() + ")";
     }
 
-    public void setDirectory(File dir){        
+    public void setDirectory(File dir) {
     }
 
-    public void checkpoint(boolean sync) throws IOException{        
+    public void checkpoint(boolean sync) throws IOException {
     }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java?view=diff&rev=563982&r1=563981&r2=563982
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCTopicMessageStore.java Wed Aug  8 11:56:59 2007
@@ -21,6 +21,7 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.Message;
@@ -32,29 +33,26 @@
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
-
 /**
  * @version $Revision: 1.6 $
  */
 public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
 
-    private Map subscriberLastMessageMap=new ConcurrentHashMap();
-    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat,
-            ActiveMQTopic topic) {
+    private Map subscriberLastMessageMap = new ConcurrentHashMap();
+
+    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, ActiveMQTopic topic) {
         super(persistenceAdapter, adapter, wireFormat, topic);
     }
 
-    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId)
-            throws IOException {
+    public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
         long seq = messageId.getBrokerSequenceId();
         // Get a connection and insert the message into the DB.
         TransactionContext c = persistenceAdapter.getTransactionContext(context);
         try {
             adapter.doSetLastAck(c, destination, clientId, subscriptionName, seq);
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-            throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message "
-                    + messageId + " in container: " + e, e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to store acknowledgment for: " + clientId + " on message " + messageId + " in container: " + e, e);
         } finally {
             c.close();
         }
@@ -62,91 +60,86 @@
 
     /**
      * @throws Exception
-     * 
      */
-    public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener)
-            throws Exception {
+    public void recoverSubscription(String clientId, String subscriptionName, final MessageRecoveryListener listener) throws Exception {
 
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
-            adapter.doRecoverSubscription(c, destination, clientId, subscriptionName,
-                    new JDBCMessageRecoveryListener() {
-                        public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
-                            Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
-                            msg.getMessageId().setBrokerSequenceId(sequenceId);
-                            return listener.recoverMessage(msg);
-                        }
-                        public boolean  recoverMessageReference(String reference) throws Exception {
-                            return listener.recoverMessageReference(new MessageId(reference));
-                        }
-                        
-                    });
+            adapter.doRecoverSubscription(c, destination, clientId, subscriptionName, new JDBCMessageRecoveryListener() {
+                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
+                    Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
+                    msg.getMessageId().setBrokerSequenceId(sequenceId);
+                    return listener.recoverMessage(msg);
+                }
+
+                public boolean recoverMessageReference(String reference) throws Exception {
+                    return listener.recoverMessageReference(new MessageId(reference));
+                }
+
+            });
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to recover subscription: " + clientId + ". Reason: " + e, e);
         } finally {
             c.close();
         }
     }
 
-    public synchronized void recoverNextMessages(final String clientId,final String subscriptionName,
-            final int maxReturned,final MessageRecoveryListener listener) throws Exception{
-        TransactionContext c=persistenceAdapter.getTransactionContext();
-        String subcriberId=getSubscriptionKey(clientId,subscriptionName);
-        AtomicLong last=(AtomicLong)subscriberLastMessageMap.get(subcriberId);
-        if(last==null){
-            long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c,destination,clientId,subscriptionName);
-            last=new AtomicLong(lastAcked);
-            subscriberLastMessageMap.put(subcriberId,last);
-        }
-        final AtomicLong finalLast=last;
-        try{
-            adapter.doRecoverNextMessages(c,destination,clientId,subscriptionName,last.get(),maxReturned,
-                    new JDBCMessageRecoveryListener(){
-
-                        public boolean recoverMessage(long sequenceId,byte[] data) throws Exception{
-                            if(listener.hasSpace()){
-                                Message msg=(Message)wireFormat.unmarshal(new ByteSequence(data));
-                                msg.getMessageId().setBrokerSequenceId(sequenceId);
-                                listener.recoverMessage(msg);
-                                finalLast.set(sequenceId);
-                                return true;
-                            }
-                            return false;
-                        }
-
-                        public boolean recoverMessageReference(String reference) throws Exception{
-                            return listener.recoverMessageReference(new MessageId(reference));
-                        }
-
-                    });
-        }catch(SQLException e){
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-        }finally{
+    public synchronized void recoverNextMessages(final String clientId, final String subscriptionName, final int maxReturned, final MessageRecoveryListener listener)
+        throws Exception {
+        TransactionContext c = persistenceAdapter.getTransactionContext();
+        String subcriberId = getSubscriptionKey(clientId, subscriptionName);
+        AtomicLong last = (AtomicLong)subscriberLastMessageMap.get(subcriberId);
+        if (last == null) {
+            long lastAcked = adapter.doGetLastAckedDurableSubscriberMessageId(c, destination, clientId, subscriptionName);
+            last = new AtomicLong(lastAcked);
+            subscriberLastMessageMap.put(subcriberId, last);
+        }
+        final AtomicLong finalLast = last;
+        try {
+            adapter.doRecoverNextMessages(c, destination, clientId, subscriptionName, last.get(), maxReturned, new JDBCMessageRecoveryListener() {
+
+                public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
+                    if (listener.hasSpace()) {
+                        Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
+                        msg.getMessageId().setBrokerSequenceId(sequenceId);
+                        listener.recoverMessage(msg);
+                        finalLast.set(sequenceId);
+                        return true;
+                    }
+                    return false;
+                }
+
+                public boolean recoverMessageReference(String reference) throws Exception {
+                    return listener.recoverMessageReference(new MessageId(reference));
+                }
+
+            });
+        } catch (SQLException e) {
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+        } finally {
             c.close();
             last.set(finalLast.get());
         }
     }
-    
-    public void resetBatching(String clientId,String subscriptionName) {
-        String subcriberId=getSubscriptionKey(clientId,subscriptionName);
+
+    public void resetBatching(String clientId, String subscriptionName) {
+        String subcriberId = getSubscriptionKey(clientId, subscriptionName);
         subscriberLastMessageMap.remove(subcriberId);
     }
-    
+
     /**
      * @see org.apache.activemq.store.TopicMessageStore#storeSubsciption(org.apache.activemq.service.SubscriptionInfo,
      *      boolean)
      */
-    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive)
-            throws IOException {
+    public void addSubsciption(SubscriptionInfo subscriptionInfo, boolean retroactive) throws IOException {
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
             c = persistenceAdapter.getTransactionContext();
             adapter.doSetSubscriberEntry(c, subscriptionInfo, retroactive);
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
-            throw IOExceptionSupport
-                    .create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
+            throw IOExceptionSupport.create("Failed to lookup subscription for info: " + subscriptionInfo.getClientId() + ". Reason: " + e, e);
         } finally {
             c.close();
         }
@@ -161,7 +154,7 @@
         try {
             return adapter.doGetSubscriberEntry(c, destination, clientId, subscriptionName);
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to lookup subscription for: " + clientId + ". Reason: " + e, e);
         } finally {
             c.close();
@@ -173,11 +166,11 @@
         try {
             adapter.doDeleteSubscription(c, destination, clientId, subscriptionName);
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to remove subscription for: " + clientId + ". Reason: " + e, e);
         } finally {
             c.close();
-            resetBatching(clientId,subscriptionName);
+            resetBatching(clientId, subscriptionName);
         }
     }
 
@@ -186,40 +179,32 @@
         try {
             return adapter.doGetAllSubscriptions(c, destination);
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to lookup subscriptions. Reason: " + e, e);
         } finally {
             c.close();
         }
     }
 
-    
-    
-    
-
-    public int getMessageCount(String clientId,String subscriberName) throws IOException{
+    public int getMessageCount(String clientId, String subscriberName) throws IOException {
         int result = 0;
         TransactionContext c = persistenceAdapter.getTransactionContext();
         try {
             result = adapter.doGetDurableSubscriberMessageCount(c, destination, clientId, subscriberName);
-               
+
         } catch (SQLException e) {
-            JDBCPersistenceAdapter.log("JDBC Failure: ",e);
+            JDBCPersistenceAdapter.log("JDBC Failure: ", e);
             throw IOExceptionSupport.create("Failed to get Message Count: " + clientId + ". Reason: " + e, e);
         } finally {
             c.close();
         }
         return result;
     }
-    
-    protected String getSubscriptionKey(String clientId,String subscriberName){
-        String result=clientId+":";
-        result+=subscriberName!=null?subscriberName:"NOT_SET";
+
+    protected String getSubscriptionKey(String clientId, String subscriberName) {
+        String result = clientId + ":";
+        result += subscriberName != null ? subscriberName : "NOT_SET";
         return result;
     }
-
-    
-
-    
 
 }



Mime
View raw message